Merge branch 'debug' of https://github.com/SengokuCola/MaiMBot into refactor/unified_request

# Conflicts:
#	src/plugins/models/utils_model.py
This commit is contained in:
KawaiiYusora
2025-03-06 23:52:39 +08:00
4 changed files with 144 additions and 146 deletions

View File

@@ -171,7 +171,7 @@ class ChatBot:
await relationship_manager.update_relationship_value(message.user_id, relationship_value=valuedict[emotion[0]])
if random() < global_config.emoji_chance:
emoji_path = await emoji_manager.get_emoji_for_emotion(emotion)
emoji_path = await emoji_manager.get_emoji_for_text(response)
if emoji_path:
emoji_cq = CQCode.create_emoji_cq(emoji_path)

View File

@@ -30,6 +30,7 @@ class BotConfig:
forget_memory_interval: int = 300 # 记忆遗忘间隔(秒)
EMOJI_CHECK_INTERVAL: int = 120 # 表情包检查间隔(分钟)
EMOJI_REGISTER_INTERVAL: int = 10 # 表情包注册间隔(分钟)
EMOJI_CHECK_PROMPT: str = "不要包含违反公序良俗的内容" # 表情包过滤要求
ban_words = set()
@@ -96,6 +97,7 @@ class BotConfig:
emoji_config = toml_dict["emoji"]
config.EMOJI_CHECK_INTERVAL = emoji_config.get("check_interval", config.EMOJI_CHECK_INTERVAL)
config.EMOJI_REGISTER_INTERVAL = emoji_config.get("register_interval", config.EMOJI_REGISTER_INTERVAL)
config.EMOJI_CHECK_PROMPT = emoji_config.get('check_prompt',config.EMOJI_CHECK_PROMPT)
if "cq_code" in toml_dict:
cq_code_config = toml_dict["cq_code"]

View File

@@ -14,10 +14,13 @@ import asyncio
import time
from PIL import Image
import io
from loguru import logger
import traceback
from nonebot import get_driver
from ..chat.config import global_config
from ..models.utils_model import LLM_request
from ..chat.utils import get_embedding
driver = get_driver()
config = driver.config
@@ -26,7 +29,7 @@ config = driver.config
class EmojiManager:
_instance = None
EMOJI_DIR = "data/emoji" # 表情包存储目录
EMOTION_KEYWORDS = {
'happy': ['开心', '快乐', '高兴', '欢喜', '', '喜悦', '兴奋', '愉快', '', ''],
'angry': ['生气', '愤怒', '恼火', '不爽', '火大', '', '气愤', '恼怒', '发火', '不满'],
@@ -47,7 +50,8 @@ class EmojiManager:
def __init__(self):
self.db = Database.get_instance()
self._scan_task = None
self.llm = LLM_request(model=global_config.vlm, temperature=0.3, max_tokens=50)
self.llm = LLM_request(model=global_config.vlm, temperature=0.3, max_tokens=1000)
self.lm = LLM_request(model=global_config.llm_reasoning_minor, max_tokens=1000)
def _ensure_emoji_dir(self):
"""确保表情存储目录存在"""
@@ -64,7 +68,7 @@ class EmojiManager:
# 启动时执行一次完整性检查
self.check_emoji_file_integrity()
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 初始化表情管理器失败: {str(e)}")
logger.error(f"初始化表情管理器失败: {str(e)}")
def _ensure_db(self):
"""确保数据库已初始化"""
@@ -77,6 +81,7 @@ class EmojiManager:
"""确保emoji集合存在并创建索引"""
if 'emoji' not in self.db.db.list_collection_names():
self.db.db.create_collection('emoji')
self.db.db.emoji.create_index([('embedding', '2dsphere')])
self.db.db.emoji.create_index([('tags', 1)])
self.db.db.emoji.create_index([('filename', 1)], unique=True)
@@ -89,79 +94,8 @@ class EmojiManager:
{'$inc': {'usage_count': 1}}
)
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 记录表情使用失败: {str(e)}")
logger.error(f"记录表情使用失败: {str(e)}")
async def _get_emotion_from_text(self, text: str) -> List[str]:
"""从文本中识别情感关键词
Args:
text: 输入文本
Returns:
List[str]: 匹配到的情感标签列表
"""
try:
prompt = f'分析这段文本:"{text}",从"happy,angry,sad,surprised,disgusted,fearful,neutral"中选出最匹配的1个情感标签。只需要返回标签不要输出其他任何内容。'
content, _ = await self.llm.generate_response(prompt)
emotion = content.strip().lower()
if emotion in self.EMOTION_KEYWORDS:
print(f"\033[1;32m[成功]\033[0m 识别到的情感: {emotion}")
return [emotion]
return ['neutral']
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 情感分析失败: {str(e)}")
return ['neutral']
async def get_emoji_for_emotion(self, emotion_tag: str) -> Optional[str]:
try:
self._ensure_db()
# 构建查询条件:标签匹配任一情感
query = {'tags': {'$in': emotion_tag}}
# print(f"\033[1;34m[调试]\033[0m 表情查询条件: {query}")
try:
# 随机获取一个匹配的表情
emoji = self.db.db.emoji.aggregate([
{'$match': query},
{'$sample': {'size': 1}}
]).next()
print(f"\033[1;32m[成功]\033[0m 找到匹配的表情")
if emoji and 'path' in emoji:
# 更新使用次数
self.db.db.emoji.update_one(
{'_id': emoji['_id']},
{'$inc': {'usage_count': 1}}
)
return emoji['path']
except StopIteration:
# 如果没有匹配的表情,从所有表情中随机选择一个
print(f"\033[1;33m[提示]\033[0m 未找到匹配的表情,随机选择一个")
try:
emoji = self.db.db.emoji.aggregate([
{'$sample': {'size': 1}}
]).next()
if emoji and 'path' in emoji:
# 更新使用次数
self.db.db.emoji.update_one(
{'_id': emoji['_id']},
{'$inc': {'usage_count': 1}}
)
return emoji['path']
except StopIteration:
print(f"\033[1;31m[错误]\033[0m 数据库中没有任何表情")
return None
return None
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 获取表情包失败: {str(e)}")
return None
async def get_emoji_for_text(self, text: str) -> Optional[str]:
"""根据文本内容获取相关表情包
Args:
@@ -171,54 +105,69 @@ class EmojiManager:
"""
try:
self._ensure_db()
# 获取情感标签
emotions = await self._get_emotion_from_text(text)
print(""+ str(text) + " 获取到的情感标签为:" + str(emotions))
if not emotions:
return None
# 构建查询条件:标签匹配任一情感
query = {'tags': {'$in': emotions}}
print(f"\033[1;34m[调试]\033[0m 表情查询条件: {query}")
print(f"\033[1;34m[调试]\033[0m 匹配到的情感: {emotions}")
# 获取文本的embedding
text_for_search= await self._get_kimoji_for_text(text)
text_embedding = get_embedding(text_for_search)
if not text_embedding:
logger.error("无法获取文本的embedding")
return None
try:
# 随机获取一个匹配的表情
emoji = self.db.db.emoji.aggregate([
{'$match': query},
{'$sample': {'size': 1}}
]).next()
print(f"\033[1;32m[成功]\033[0m 找到匹配的表情")
if emoji and 'path' in emoji:
# 获取所有表情
all_emojis = list(self.db.db.emoji.find({}, {'_id': 1, 'path': 1, 'embedding': 1, 'discription': 1}))
if not all_emojis:
logger.warning("数据库中没有任何表情包")
return None
# 计算余弦相似度并排序
def cosine_similarity(v1, v2):
if not v1 or not v2:
return 0
dot_product = sum(a * b for a, b in zip(v1, v2))
norm_v1 = sum(a * a for a in v1) ** 0.5
norm_v2 = sum(b * b for b in v2) ** 0.5
if norm_v1 == 0 or norm_v2 == 0:
return 0
return dot_product / (norm_v1 * norm_v2)
# 计算所有表情包与输入文本的相似度
emoji_similarities = [
(emoji, cosine_similarity(text_embedding, emoji.get('embedding', [])))
for emoji in all_emojis
]
# 按相似度降序排序
emoji_similarities.sort(key=lambda x: x[1], reverse=True)
# 获取前3个最相似的表情包
top_3_emojis = emoji_similarities[:3]
if not top_3_emojis:
logger.warning("未找到匹配的表情包")
return None
# 从前3个中随机选择一个
selected_emoji, similarity = random.choice(top_3_emojis)
if selected_emoji and 'path' in selected_emoji:
# 更新使用次数
self.db.db.emoji.update_one(
{'_id': emoji['_id']},
{'_id': selected_emoji['_id']},
{'$inc': {'usage_count': 1}}
)
return emoji['path']
except StopIteration:
# 如果没有匹配的表情,从所有表情中随机选择一个
print(f"\033[1;33m[提示]\033[0m 未找到匹配的表情,随机选择一个")
try:
emoji = self.db.db.emoji.aggregate([
{'$sample': {'size': 1}}
]).next()
if emoji and 'path' in emoji:
# 更新使用次数
self.db.db.emoji.update_one(
{'_id': emoji['_id']},
{'$inc': {'usage_count': 1}}
)
return emoji['path']
except StopIteration:
print(f"\033[1;31m[错误]\033[0m 数据库中没有任何表情")
return None
logger.success(f"找到匹配的表情包: {selected_emoji.get('discription', '无描述')} (相似度: {similarity:.4f})")
return selected_emoji['path']
except Exception as search_error:
logger.error(f"搜索表情包失败: {str(search_error)}")
return None
return None
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 获取表情包失败: {str(e)}")
logger.error(f"获取表情包失败: {str(e)}")
return None
async def _get_emoji_tag(self, image_base64: str) -> str:
@@ -237,11 +186,48 @@ class EmojiManager:
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 获取标签失败: {str(e)}")
return "skip"
return "neutral"
print(f"\033[1;32m[调试信息]\033[0m 使用默认标签: neutral")
return "skip" # 默认标签
return "neutral" # 默认标签
async def _get_emoji_discription(self, image_base64: str) -> str:
"""获取表情包的标签"""
try:
prompt = '这是一个表情包,使用中文简洁的描述一下表情包的内容和表情包所表达的情感'
content, _ = await self.llm.generate_response_for_image(prompt, image_base64)
logger.debug(f"输出描述: {content}")
return content
except Exception as e:
logger.error(f"获取标签失败: {str(e)}")
return None
async def _check_emoji(self, image_base64: str) -> str:
try:
prompt = f'这是一个表情包,请回答这个表情包是否满足\"{global_config.EMOJI_CHECK_PROMPT}\"的要求,是则回答是,否则回答否,不要出现任何其他内容'
content, _ = await self.llm.generate_response_for_image(prompt, image_base64)
logger.debug(f"输出描述: {content}")
return content
except Exception as e:
logger.error(f"获取标签失败: {str(e)}")
return None
async def _get_kimoji_for_text(self, text:str):
try:
prompt = f'这是{global_config.BOT_NICKNAME}将要发送的消息内容:\n{text}\n若要为其配上表情包,请你输出这个表情包应该表达怎样的情感,应该给人什么样的感觉,不要太简洁也不要太长,注意不要输出任何对内容的分析内容,只输出\"一种什么样的感觉\"中间的形容词部分。'
content, _ = await self.lm.generate_response_async(prompt)
logger.info(f"输出描述: {content}")
return content
except Exception as e:
logger.error(f"获取标签失败: {str(e)}")
return None
async def _compress_image(self, image_path: str, target_size: int = 0.8 * 1024 * 1024) -> Optional[str]:
"""压缩图片并返回base64编码
Args:
@@ -303,12 +289,12 @@ class EmojiManager:
# 获取压缩后的数据并转换为base64
compressed_data = output_buffer.getvalue()
print(f"\033[1;32m[成功]\033[0m 压缩图片: {os.path.basename(image_path)} ({original_width}x{original_height} -> {new_width}x{new_height})")
logger.success(f"压缩图片: {os.path.basename(image_path)} ({original_width}x{original_height} -> {new_width}x{new_height})")
return base64.b64encode(compressed_data).decode('utf-8')
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 压缩图片失败: {os.path.basename(image_path)}, 错误: {str(e)}")
logger.error(f"压缩图片失败: {os.path.basename(image_path)}, 错误: {str(e)}")
return None
async def scan_new_emojis(self):
@@ -334,29 +320,39 @@ class EmojiManager:
os.remove(image_path)
continue
# 获取表情包的情感标签
# 获取表情包的描述
discription = await self._get_emoji_discription(image_base64)
check = await self._check_emoji(image_base64)
if '' not in check:
os.remove(image_path)
logger.info(f"描述: {discription}")
logger.info(f"其不满足过滤规则,被剔除 {check}")
continue
logger.info(f"check通过 {check}")
tag = await self._get_emoji_tag(image_base64)
if not tag == "skip":
embedding = get_embedding(discription)
if discription is not None:
# 准备数据库记录
emoji_record = {
'filename': filename,
'path': image_path,
'tags': [tag],
'embedding':embedding,
'discription': discription,
'tag':tag,
'timestamp': int(time.time())
}
# 保存到数据库
self.db.db['emoji'].insert_one(emoji_record)
print(f"\033[1;32m[成功]\033[0m 注册新表情包: {filename}")
print(f"标签: {tag}")
logger.success(f"注册新表情包: {filename}")
logger.info(f"描述: {discription}")
else:
print(f"\033[1;33m[警告]\033[0m 跳过表情包: {filename}")
logger.warning(f"跳过表情包: {filename}")
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 扫描表情包失败: {str(e)}")
import traceback
print(traceback.format_exc())
logger.error(f"扫描表情包失败: {str(e)}")
logger.error(traceback.format_exc())
async def _periodic_scan(self, interval_MINS: int = 10):
"""定期扫描新表情包"""
while True:
@@ -364,6 +360,7 @@ class EmojiManager:
await self.scan_new_emojis()
await asyncio.sleep(interval_MINS * 60) # 每600秒扫描一次
def check_emoji_file_integrity(self):
"""检查表情包文件完整性
如果文件已被删除,则从数据库中移除对应记录
@@ -378,44 +375,42 @@ class EmojiManager:
for emoji in all_emojis:
try:
if 'path' not in emoji:
print(f"\033[1;33m[提示]\033[0m 发现无效记录缺少path字段ID: {emoji.get('_id', 'unknown')}")
logger.warning(f"发现无效记录缺少path字段ID: {emoji.get('_id', 'unknown')}")
self.db.db.emoji.delete_one({'_id': emoji['_id']})
removed_count += 1
continue
if 'embedding' not in emoji:
logger.warning(f"发现过时记录缺少embedding字段ID: {emoji.get('_id', 'unknown')}")
self.db.db.emoji.delete_one({'_id': emoji['_id']})
removed_count += 1
continue
# 检查文件是否存在
if not os.path.exists(emoji['path']):
print(f"\033[1;33m[提示]\033[0m 表情包文件已被删除: {emoji['path']}")
logger.warning(f"表情包文件已被删除: {emoji['path']}")
# 从数据库中删除记录
result = self.db.db.emoji.delete_one({'_id': emoji['_id']})
if result.deleted_count > 0:
print(f"\033[1;32m[成功]\033[0m 成功删除数据库记录: {emoji['_id']}")
logger.success(f"成功删除数据库记录: {emoji['_id']}")
removed_count += 1
else:
print(f"\033[1;31m[错误]\033[0m 删除数据库记录失败: {emoji['_id']}")
logger.error(f"删除数据库记录失败: {emoji['_id']}")
except Exception as item_error:
print(f"\033[1;31m[错误]\033[0m 处理表情包记录时出错: {str(item_error)}")
logger.error(f"处理表情包记录时出错: {str(item_error)}")
continue
# 验证清理结果
remaining_count = self.db.db.emoji.count_documents({})
if removed_count > 0:
print(f"\033[1;32m[成功]\033[0m 已清理 {removed_count} 个失效的表情包记录")
print(f"\033[1;34m[统计]\033[0m 清理前总数: {total_count} | 清理后总数: {remaining_count}")
# print(f"\033[1;34m[统计]\033[0m 应删除数量: {removed_count} | 实际删除数量: {total_count - remaining_count}")
# 执行数据库压缩
try:
self.db.db.command({"compact": "emoji"})
print(f"\033[1;32m[成功]\033[0m 数据库集合压缩完成")
except Exception as compact_error:
print(f"\033[1;31m[错误]\033[0m 数据库压缩失败: {str(compact_error)}")
logger.success(f"已清理 {removed_count} 个失效的表情包记录")
logger.info(f"清理前总数: {total_count} | 清理后总数: {remaining_count}")
else:
print(f"\033[1;36m[表情包]\033[0m 已检查 {total_count} 个表情包记录")
logger.info(f"已检查 {total_count} 个表情包记录")
except Exception as e:
print(f"\033[1;31m[错误]\033[0m 检查表情包完整性失败: {str(e)}")
import traceback
print(f"\033[1;31m[错误追踪]\033[0m\n{traceback.format_exc()}")
logger.error(f"检查表情包完整性失败: {str(e)}")
logger.error(traceback.format_exc())
async def start_periodic_check(self, interval_MINS: int = 120):
while True:

View File

@@ -24,6 +24,7 @@ class ResponseGenerator:
self.model_r1 = LLM_request(model=global_config.llm_reasoning, temperature=0.7,max_tokens=1000)
self.model_v3 = LLM_request(model=global_config.llm_normal, temperature=0.7,max_tokens=1000)
self.model_r1_distill = LLM_request(model=global_config.llm_reasoning_minor, temperature=0.7,max_tokens=1000)
self.model_v25 = LLM_request(model=global_config.llm_normal_minor, temperature=0.7,max_tokens=1000)
self.db = Database.get_instance()
self.current_model_type = 'r1' # 默认使用 R1
@@ -139,7 +140,7 @@ class ResponseGenerator:
内容:{content}
输出:
'''
content, _ = await self.model_v3.generate_response(prompt)
content, _ = await self.model_v25.generate_response(prompt)
content=content.strip()
if content in ['happy','angry','sad','surprised','disgusted','fearful','neutral']:
return [content]