Files
Mofox-Core/src/plugins/chat/utils.py
2025-03-07 01:32:15 +08:00

406 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import random
from typing import List
from .message import Message
import requests
import numpy as np
from .config import global_config
import re
from typing import Dict
from collections import Counter
import math
from nonebot import get_driver
from ..models.utils_model import LLM_request
import aiohttp
import jieba
from ..utils.typo_generator import ChineseTypoGenerator
driver = get_driver()
config = driver.config
def combine_messages(messages: List[Message]) -> str:
"""将消息列表组合成格式化的字符串
Args:
messages: Message对象列表
Returns:
str: 格式化后的消息字符串
"""
result = ""
for message in messages:
time_str = time.strftime("%m-%d %H:%M:%S", time.localtime(message.time))
name = message.user_nickname or f"用户{message.user_id}"
content = message.processed_plain_text or message.plain_text
result += f"[{time_str}] {name}: {content}\n"
return result
def db_message_to_str(message_dict: Dict) -> str:
print(f"message_dict: {message_dict}")
time_str = time.strftime("%m-%d %H:%M:%S", time.localtime(message_dict["time"]))
try:
name = "[(%s)%s]%s" % (
message_dict['user_id'], message_dict.get("user_nickname", ""), message_dict.get("user_cardname", ""))
except:
name = message_dict.get("user_nickname", "") or f"用户{message_dict['user_id']}"
content = message_dict.get("processed_plain_text", "")
result = f"[{time_str}] {name}: {content}\n"
print(f"result: {result}")
return result
def is_mentioned_bot_in_message(message: Message) -> bool:
"""检查消息是否提到了机器人"""
keywords = [global_config.BOT_NICKNAME]
for keyword in keywords:
if keyword in message.processed_plain_text:
return True
return False
def is_mentioned_bot_in_txt(message: str) -> bool:
"""检查消息是否提到了机器人"""
keywords = [global_config.BOT_NICKNAME]
for keyword in keywords:
if keyword in message:
return True
return False
async def get_embedding(text):
"""获取文本的embedding向量"""
llm = LLM_request(model=global_config.embedding)
# return llm.get_embedding_sync(text)
return await llm.get_embedding(text)
def cosine_similarity(v1, v2):
dot_product = np.dot(v1, v2)
norm1 = np.linalg.norm(v1)
norm2 = np.linalg.norm(v2)
return dot_product / (norm1 * norm2)
def calculate_information_content(text):
"""计算文本的信息量(熵)"""
char_count = Counter(text)
total_chars = len(text)
entropy = 0
for count in char_count.values():
probability = count / total_chars
entropy -= probability * math.log2(probability)
return entropy
def get_cloest_chat_from_db(db, length: int, timestamp: str):
"""从数据库中获取最接近指定时间戳的聊天记录,并记录读取次数"""
chat_text = ''
closest_record = db.db.messages.find_one({"time": {"$lte": timestamp}}, sort=[('time', -1)])
if closest_record and closest_record.get('memorized', 0) < 4:
closest_time = closest_record['time']
group_id = closest_record['group_id'] # 获取groupid
# 获取该时间戳之后的length条消息且groupid相同
chat_records = list(db.db.messages.find(
{"time": {"$gt": closest_time}, "group_id": group_id}
).sort('time', 1).limit(length))
# 更新每条消息的memorized属性
for record in chat_records:
# 检查当前记录的memorized值
current_memorized = record.get('memorized', 0)
if current_memorized > 3:
# print(f"消息已读取3次跳过")
return ''
# 更新memorized值
db.db.messages.update_one(
{"_id": record["_id"]},
{"$set": {"memorized": current_memorized + 1}}
)
chat_text += record["detailed_plain_text"]
return chat_text
# print(f"消息已读取3次跳过")
return ''
async def get_recent_group_messages(db, group_id: int, limit: int = 12) -> list:
"""从数据库获取群组最近的消息记录
Args:
db: Database实例
group_id: 群组ID
limit: 获取消息数量默认12条
Returns:
list: Message对象列表按时间正序排列
"""
# 从数据库获取最近消息
recent_messages = list(db.db.messages.find(
{"group_id": group_id},
# {
# "time": 1,
# "user_id": 1,
# "user_nickname": 1,
# "message_id": 1,
# "raw_message": 1,
# "processed_text": 1
# }
).sort("time", -1).limit(limit))
if not recent_messages:
return []
# 转换为 Message对象列表
from .message import Message
message_objects = []
for msg_data in recent_messages:
try:
msg = Message(
time=msg_data["time"],
user_id=msg_data["user_id"],
user_nickname=msg_data.get("user_nickname", ""),
message_id=msg_data["message_id"],
raw_message=msg_data["raw_message"],
processed_plain_text=msg_data.get("processed_text", ""),
group_id=group_id
)
await msg.initialize()
message_objects.append(msg)
except KeyError:
print("[WARNING] 数据库中存在无效的消息")
continue
# 按时间正序排列
message_objects.reverse()
return message_objects
def get_recent_group_detailed_plain_text(db, group_id: int, limit: int = 12, combine=False):
recent_messages = list(db.db.messages.find(
{"group_id": group_id},
{
"time": 1, # 返回时间字段
"user_id": 1, # 返回用户ID字段
"user_nickname": 1, # 返回用户昵称字段
"message_id": 1, # 返回消息ID字段
"detailed_plain_text": 1 # 返回处理后的文本字段
}
).sort("time", -1).limit(limit))
if not recent_messages:
return []
message_detailed_plain_text = ''
message_detailed_plain_text_list = []
# 反转消息列表,使最新的消息在最后
recent_messages.reverse()
if combine:
for msg_db_data in recent_messages:
message_detailed_plain_text += str(msg_db_data["detailed_plain_text"])
return message_detailed_plain_text
else:
for msg_db_data in recent_messages:
message_detailed_plain_text_list.append(msg_db_data["detailed_plain_text"])
return message_detailed_plain_text_list
def split_into_sentences_w_remove_punctuation(text: str) -> List[str]:
"""将文本分割成句子,但保持书名号中的内容完整
Args:
text: 要分割的文本字符串
Returns:
List[str]: 分割后的句子列表
"""
len_text = len(text)
if len_text < 5:
if random.random() < 0.01:
return list(text) # 如果文本很短且触发随机条件,直接按字符分割
else:
return [text]
if len_text < 12:
split_strength = 0.3
elif len_text < 32:
split_strength = 0.7
else:
split_strength = 0.9
# 先移除换行符
# print(f"split_strength: {split_strength}")
# print(f"处理前的文本: {text}")
# 统一将英文逗号转换为中文逗号
text = text.replace(',', '')
text = text.replace('\n', ' ')
# print(f"处理前的文本: {text}")
text_no_1 = ''
for letter in text:
# print(f"当前字符: {letter}")
if letter in ['!', '', '?', '']:
# print(f"当前字符: {letter}, 随机数: {random.random()}")
if random.random() < split_strength:
letter = ''
if letter in ['', '']:
# print(f"当前字符: {letter}, 随机数: {random.random()}")
if random.random() < 1 - split_strength:
letter = ''
text_no_1 += letter
# 对每个逗号单独判断是否分割
sentences = [text_no_1]
new_sentences = []
for sentence in sentences:
parts = sentence.split('')
current_sentence = parts[0]
for part in parts[1:]:
if random.random() < split_strength:
new_sentences.append(current_sentence.strip())
current_sentence = part
else:
current_sentence += '' + part
# 处理空格分割
space_parts = current_sentence.split(' ')
current_sentence = space_parts[0]
for part in space_parts[1:]:
if random.random() < split_strength:
new_sentences.append(current_sentence.strip())
current_sentence = part
else:
current_sentence += ' ' + part
new_sentences.append(current_sentence.strip())
sentences = [s for s in new_sentences if s] # 移除空字符串
# print(f"分割后的句子: {sentences}")
sentences_done = []
for sentence in sentences:
sentence = sentence.rstrip(',')
if random.random() < split_strength * 0.5:
sentence = sentence.replace('', '').replace(',', '')
elif random.random() < split_strength:
sentence = sentence.replace('', ' ').replace(',', ' ')
sentences_done.append(sentence)
print(f"处理后的句子: {sentences_done}")
return sentences_done
def random_remove_punctuation(text: str) -> str:
"""随机处理标点符号,模拟人类打字习惯
Args:
text: 要处理的文本
Returns:
str: 处理后的文本
"""
result = ''
text_len = len(text)
for i, char in enumerate(text):
if char == '' and i == text_len - 1: # 结尾的句号
if random.random() > 0.4: # 80%概率删除结尾句号
continue
elif char == '':
rand = random.random()
if rand < 0.25: # 5%概率删除逗号
continue
elif rand < 0.25: # 20%概率把逗号变成空格
result += ' '
continue
result += char
return result
def process_llm_response(text: str) -> List[str]:
# processed_response = process_text_with_typos(content)
if len(text) > 300:
print(f"回复过长 ({len(text)} 字符),返回默认回复")
return ['懒得说']
# 处理长消息
typo_generator = ChineseTypoGenerator(
error_rate=0.03,
min_freq=7,
tone_error_rate=0.2,
word_replace_rate=0.02
)
typoed_text = typo_generator.create_typo_sentence(text)[0]
sentences = split_into_sentences_w_remove_punctuation(typoed_text)
# 检查分割后的消息数量是否过多超过3条
if len(sentences) > 4:
print(f"分割后消息数量过多 ({len(sentences)} 条),返回默认回复")
return [f'{global_config.BOT_NICKNAME}不知道哦']
return sentences
def calculate_typing_time(input_string: str, chinese_time: float = 0.2, english_time: float = 0.1) -> float:
"""
计算输入字符串所需的时间,中文和英文字符有不同的输入时间
input_string (str): 输入的字符串
chinese_time (float): 中文字符的输入时间默认为0.3秒
english_time (float): 英文字符的输入时间默认为0.15秒
"""
total_time = 0.0
for char in input_string:
if '\u4e00' <= char <= '\u9fff': # 判断是否为中文字符
total_time += chinese_time
else: # 其他字符(如英文)
total_time += english_time
return total_time
def cosine_similarity(v1, v2):
"""计算余弦相似度"""
dot_product = np.dot(v1, v2)
norm1 = np.linalg.norm(v1)
norm2 = np.linalg.norm(v2)
if norm1 == 0 or norm2 == 0:
return 0
return dot_product / (norm1 * norm2)
def text_to_vector(text):
"""将文本转换为词频向量"""
# 分词
words = jieba.lcut(text)
# 统计词频
word_freq = Counter(words)
return word_freq
def find_similar_topics_simple(text: str, topics: list, top_k: int = 5) -> list:
"""使用简单的余弦相似度计算文本相似度"""
# 将输入文本转换为词频向量
text_vector = text_to_vector(text)
# 计算每个主题的相似度
similarities = []
for topic in topics:
topic_vector = text_to_vector(topic)
# 获取所有唯一词
all_words = set(text_vector.keys()) | set(topic_vector.keys())
# 构建向量
v1 = [text_vector.get(word, 0) for word in all_words]
v2 = [topic_vector.get(word, 0) for word in all_words]
# 计算相似度
similarity = cosine_similarity(v1, v2)
similarities.append((topic, similarity))
# 按相似度降序排序并返回前k个
return sorted(similarities, key=lambda x: x[1], reverse=True)[:top_k]