Merge branch 'debug' into main

This commit is contained in:
SengokuCola
2025-03-12 00:20:54 +08:00
committed by GitHub
64 changed files with 6255 additions and 2846 deletions

View File

@@ -1,9 +1,11 @@
import asyncio
import base64
import hashlib
import os
import random
import time
import traceback
from typing import Optional
from typing import Optional, Tuple
from loguru import logger
from nonebot import get_driver
@@ -11,34 +13,37 @@ from nonebot import get_driver
from ...common.database import Database
from ..chat.config import global_config
from ..chat.utils import get_embedding
from ..chat.utils_image import image_path_to_base64
from ..chat.utils_image import ImageManager, image_path_to_base64
from ..models.utils_model import LLM_request
driver = get_driver()
config = driver.config
image_manager = ImageManager()
class EmojiManager:
_instance = None
EMOJI_DIR = "data/emoji" # 表情包存储目录
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.db = None
cls._instance._initialized = False
return cls._instance
def __init__(self):
self.db = Database.get_instance()
self._scan_task = None
self.vlm = LLM_request(model=global_config.vlm, temperature=0.3, max_tokens=1000)
self.llm_emotion_judge = LLM_request(model=global_config.llm_normal_minor, max_tokens=60,temperature=0.8) #更高的温度更少的token后续可以根据情绪来调整温度
self.llm_emotion_judge = LLM_request(model=global_config.llm_emotion_judge, max_tokens=60,
temperature=0.8) # 更高的温度更少的token后续可以根据情绪来调整温度
def _ensure_emoji_dir(self):
"""确保表情存储目录存在"""
os.makedirs(self.EMOJI_DIR, exist_ok=True)
def initialize(self):
"""初始化数据库连接和表情目录"""
if not self._initialized:
@@ -49,16 +54,16 @@ class EmojiManager:
self._initialized = True
# 启动时执行一次完整性检查
self.check_emoji_file_integrity()
except Exception as e:
logger.error(f"初始化表情管理器失败: {str(e)}")
except Exception:
logger.exception("初始化表情管理器失败")
def _ensure_db(self):
"""确保数据库已初始化"""
if not self._initialized:
self.initialize()
if not self._initialized:
raise RuntimeError("EmojiManager not initialized")
def _ensure_emoji_collection(self):
"""确保emoji集合存在并创建索引
@@ -74,9 +79,8 @@ class EmojiManager:
if 'emoji' not in self.db.db.list_collection_names():
self.db.db.create_collection('emoji')
self.db.db.emoji.create_index([('embedding', '2dsphere')])
self.db.db.emoji.create_index([('tags', 1)])
self.db.db.emoji.create_index([('filename', 1)], unique=True)
def record_usage(self, emoji_id: str):
"""记录表情使用次数"""
try:
@@ -88,7 +92,7 @@ class EmojiManager:
except Exception as e:
logger.error(f"记录表情使用失败: {str(e)}")
async def get_emoji_for_text(self, text: str) -> Optional[str]:
async def get_emoji_for_text(self, text: str) -> Optional[Tuple[str,str]]:
"""根据文本内容获取相关表情包
Args:
text: 输入文本
@@ -102,9 +106,9 @@ class EmojiManager:
"""
try:
self._ensure_db()
# 获取文本的embedding
text_for_search= await self._get_kimoji_for_text(text)
text_for_search = await self._get_kimoji_for_text(text)
if not text_for_search:
logger.error("无法获取文本的情绪")
return None
@@ -112,15 +116,15 @@ class EmojiManager:
if not text_embedding:
logger.error("无法获取文本的embedding")
return None
try:
# 获取所有表情包
all_emojis = list(self.db.db.emoji.find({}, {'_id': 1, 'path': 1, 'embedding': 1, 'description': 1}))
if not all_emojis:
logger.warning("数据库中没有任何表情包")
return None
# 计算余弦相似度并排序
def cosine_similarity(v1, v2):
if not v1 or not v2:
@@ -131,25 +135,25 @@ class EmojiManager:
if norm_v1 == 0 or norm_v2 == 0:
return 0
return dot_product / (norm_v1 * norm_v2)
# 计算所有表情包与输入文本的相似度
emoji_similarities = [
(emoji, cosine_similarity(text_embedding, emoji.get('embedding', [])))
for emoji in all_emojis
]
# 按相似度降序排序
emoji_similarities.sort(key=lambda x: x[1], reverse=True)
# 获取前3个最相似的表情包
top_3_emojis = emoji_similarities[:3]
top_10_emojis = emoji_similarities[:10 if len(emoji_similarities) > 10 else len(emoji_similarities)]
if not top_3_emojis:
if not top_10_emojis:
logger.warning("未找到匹配的表情包")
return None
# 从前3个中随机选择一个
selected_emoji, similarity = random.choice(top_3_emojis)
selected_emoji, similarity = random.choice(top_10_emojis)
if selected_emoji and 'path' in selected_emoji:
# 更新使用次数
@@ -157,57 +161,61 @@ class EmojiManager:
{'_id': selected_emoji['_id']},
{'$inc': {'usage_count': 1}}
)
logger.success(f"找到匹配的表情包: {selected_emoji.get('description', '无描述')} (相似度: {similarity:.4f})")
logger.success(
f"找到匹配的表情包: {selected_emoji.get('description', '无描述')} (相似度: {similarity:.4f})")
# 稍微改一下文本描述,不然容易产生幻觉,描述已经包含 表情包 了
return selected_emoji['path'],"[ %s ]" % selected_emoji.get('description', '无描述')
return selected_emoji['path'], "[ %s ]" % selected_emoji.get('description', '无描述')
except Exception as search_error:
logger.error(f"搜索表情包失败: {str(search_error)}")
return None
return None
except Exception as e:
logger.error(f"获取表情包失败: {str(e)}")
return None
async def _get_emoji_description(self, image_base64: str) -> str:
"""获取表情包的标签"""
async def _get_emoji_discription(self, image_base64: str) -> str:
"""获取表情包的标签使用image_manager的描述生成功能"""
try:
prompt = '这是一个表情包,使用中文简洁的描述一下表情包的内容和表情包所表达的情感'
content, _ = await self.vlm.generate_response_for_image(prompt, image_base64)
logger.debug(f"输出描述: {content}")
return content
# 使用image_manager获取描述去掉前后的方括号和"表情包:"前缀
description = await image_manager.get_emoji_description(image_base64)
# 去掉[表情包xxx]的格式,只保留描述内容
description = description.strip('[]').replace('表情包:', '')
return description
except Exception as e:
logger.error(f"获取标签失败: {str(e)}")
return None
async def _check_emoji(self, image_base64: str) -> str:
try:
prompt = f'这是一个表情包,请回答这个表情包是否满足\"{global_config.EMOJI_CHECK_PROMPT}\"的要求,是则回答是,否则回答否,不要出现任何其他内容'
content, _ = await self.vlm.generate_response_for_image(prompt, image_base64)
logger.debug(f"输出描述: {content}")
return content
except Exception as e:
logger.error(f"获取标签失败: {str(e)}")
return None
async def _get_kimoji_for_text(self, text:str):
async def _get_kimoji_for_text(self, text: str):
try:
prompt = f'这是{global_config.BOT_NICKNAME}将要发送的消息内容:\n{text}\n若要为其配上表情包,请你输出这个表情包应该表达怎样的情感,应该给人什么样的感觉,不要太简洁也不要太长,注意不要输出任何对消息内容的分析内容,只输出\"一种什么样的感觉\"中间的形容词部分。'
content, _ = await self.llm_emotion_judge.generate_response_async(prompt)
content, _ = await self.llm_emotion_judge.generate_response_async(prompt,temperature=1.5)
logger.info(f"输出描述: {content}")
return content
except Exception as e:
logger.error(f"获取标签失败: {str(e)}")
return None
async def scan_new_emojis(self):
"""扫描新的表情包"""
try:
@@ -215,62 +223,122 @@ class EmojiManager:
os.makedirs(emoji_dir, exist_ok=True)
# 获取所有支持的图片文件
files_to_process = [f for f in os.listdir(emoji_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif'))]
files_to_process = [f for f in os.listdir(emoji_dir) if
f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif'))]
for filename in files_to_process:
image_path = os.path.join(emoji_dir, filename)
# 检查是否已经注册过
existing_emoji = self.db.db['emoji'].find_one({'filename': filename})
if existing_emoji:
continue
# 压缩图片并获取base64编码
# 获取图片的base64编码和哈希值
image_base64 = image_path_to_base64(image_path)
if image_base64 is None:
os.remove(image_path)
continue
# 获取表情包的描述
description = await self._get_emoji_description(image_base64)
image_bytes = base64.b64decode(image_base64)
image_hash = hashlib.md5(image_bytes).hexdigest()
# 检查是否已经注册过
existing_emoji = self.db.db['emoji'].find_one({'filename': filename})
description = None
if existing_emoji:
# 即使表情包已存在也检查是否需要同步到images集合
description = existing_emoji.get('discription')
# 检查是否在images集合中存在
existing_image = image_manager.db.db.images.find_one({'hash': image_hash})
if not existing_image:
# 同步到images集合
image_doc = {
'hash': image_hash,
'path': image_path,
'type': 'emoji',
'description': description,
'timestamp': int(time.time())
}
image_manager.db.db.images.update_one(
{'hash': image_hash},
{'$set': image_doc},
upsert=True
)
# 保存描述到image_descriptions集合
image_manager._save_description_to_db(image_hash, description, 'emoji')
logger.success(f"同步已存在的表情包到images集合: {filename}")
continue
# 检查是否在images集合中已有描述
existing_description = image_manager._get_description_from_db(image_hash, 'emoji')
if existing_description:
description = existing_description
else:
# 获取表情包的描述
description = await self._get_emoji_discription(image_base64)
if global_config.EMOJI_CHECK:
check = await self._check_emoji(image_base64)
if '' not in check:
os.remove(image_path)
logger.info(f"描述: {description}")
logger.info(f"描述: {description}")
logger.info(f"其不满足过滤规则,被剔除 {check}")
continue
logger.info(f"check通过 {check}")
embedding = await get_embedding(description)
if description is not None:
embedding = await get_embedding(description)
if description is not None:
embedding = await get_embedding(description)
# 准备数据库记录
emoji_record = {
'filename': filename,
'path': image_path,
'embedding':embedding,
'description': description,
'embedding': embedding,
'discription': description,
'hash': image_hash,
'timestamp': int(time.time())
}
# 保存到数据库
# 保存到emoji数据库
self.db.db['emoji'].insert_one(emoji_record)
logger.success(f"注册新表情包: {filename}")
logger.info(f"描述: {description}")
# 保存到images数据库
image_doc = {
'hash': image_hash,
'path': image_path,
'type': 'emoji',
'description': description,
'timestamp': int(time.time())
}
image_manager.db.db.images.update_one(
{'hash': image_hash},
{'$set': image_doc},
upsert=True
)
# 保存描述到image_descriptions集合
image_manager._save_description_to_db(image_hash, description, 'emoji')
logger.success(f"同步保存到images集合: {filename}")
else:
logger.warning(f"跳过表情包: {filename}")
except Exception as e:
logger.error(f"扫描表情包失败: {str(e)}")
logger.error(traceback.format_exc())
except Exception:
logger.exception("扫描表情包失败")
async def _periodic_scan(self, interval_MINS: int = 10):
"""定期扫描新表情包"""
while True:
print("\033[1;36m[表情包]\033[0m 开始扫描新表情包...")
logger.info("开始扫描新表情包...")
await self.scan_new_emojis()
await asyncio.sleep(interval_MINS * 60) # 每600秒扫描一次
def check_emoji_file_integrity(self):
"""检查表情包文件完整性
如果文件已被删除,则从数据库中移除对应记录
@@ -281,7 +349,7 @@ class EmojiManager:
all_emojis = list(self.db.db.emoji.find())
removed_count = 0
total_count = len(all_emojis)
for emoji in all_emojis:
try:
if 'path' not in emoji:
@@ -289,27 +357,27 @@ class EmojiManager:
self.db.db.emoji.delete_one({'_id': emoji['_id']})
removed_count += 1
continue
if 'embedding' not in emoji:
logger.warning(f"发现过时记录缺少embedding字段ID: {emoji.get('_id', 'unknown')}")
self.db.db.emoji.delete_one({'_id': emoji['_id']})
removed_count += 1
continue
# 检查文件是否存在
if not os.path.exists(emoji['path']):
logger.warning(f"表情包文件已被删除: {emoji['path']}")
# 从数据库中删除记录
result = self.db.db.emoji.delete_one({'_id': emoji['_id']})
if result.deleted_count > 0:
logger.success(f"成功删除数据库记录: {emoji['_id']}")
logger.debug(f"成功删除数据库记录: {emoji['_id']}")
removed_count += 1
else:
logger.error(f"删除数据库记录失败: {emoji['_id']}")
except Exception as item_error:
logger.error(f"处理表情包记录时出错: {str(item_error)}")
continue
# 验证清理结果
remaining_count = self.db.db.emoji.count_documents({})
if removed_count > 0:
@@ -317,7 +385,7 @@ class EmojiManager:
logger.info(f"清理前总数: {total_count} | 清理后总数: {remaining_count}")
else:
logger.info(f"已检查 {total_count} 个表情包记录")
except Exception as e:
logger.error(f"检查表情包完整性失败: {str(e)}")
logger.error(traceback.format_exc())
@@ -328,6 +396,8 @@ class EmojiManager:
await asyncio.sleep(interval_MINS * 60)
# 创建全局单例
emoji_manager = EmojiManager()
emoji_manager = EmojiManager()