428 lines
19 KiB
Python
428 lines
19 KiB
Python
import asyncio
|
||
import base64
|
||
import hashlib
|
||
import os
|
||
import random
|
||
import time
|
||
import traceback
|
||
from typing import Optional, Tuple
|
||
from PIL import Image
|
||
import io
|
||
|
||
from ...common.database import db
|
||
from ..config.config import global_config
|
||
from ..chat.utils import get_embedding
|
||
from ..chat.utils_image import ImageManager, image_path_to_base64
|
||
from ..models.utils_model import LLM_request
|
||
from src.common.logger import get_module_logger
|
||
|
||
logger = get_module_logger("emoji")
|
||
|
||
|
||
image_manager = ImageManager()
|
||
|
||
|
||
class EmojiManager:
|
||
_instance = None
|
||
EMOJI_DIR = os.path.join("data", "emoji") # 表情包存储目录
|
||
|
||
def __new__(cls):
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
cls._instance._initialized = False
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
self._scan_task = None
|
||
self.vlm = LLM_request(model=global_config.vlm, temperature=0.3, max_tokens=1000, request_type="emoji")
|
||
self.llm_emotion_judge = LLM_request(
|
||
model=global_config.llm_emotion_judge, max_tokens=600, temperature=0.8, request_type="emoji"
|
||
) # 更高的温度,更少的token(后续可以根据情绪来调整温度)
|
||
|
||
def _ensure_emoji_dir(self):
|
||
"""确保表情存储目录存在"""
|
||
os.makedirs(self.EMOJI_DIR, exist_ok=True)
|
||
|
||
def initialize(self):
|
||
"""初始化数据库连接和表情目录"""
|
||
if not self._initialized:
|
||
try:
|
||
self._ensure_emoji_collection()
|
||
self._ensure_emoji_dir()
|
||
self._initialized = True
|
||
# 启动时执行一次完整性检查
|
||
self.check_emoji_file_integrity()
|
||
except Exception:
|
||
logger.exception("初始化表情管理器失败")
|
||
|
||
def _ensure_db(self):
|
||
"""确保数据库已初始化"""
|
||
if not self._initialized:
|
||
self.initialize()
|
||
if not self._initialized:
|
||
raise RuntimeError("EmojiManager not initialized")
|
||
|
||
def _ensure_emoji_collection(self):
|
||
"""确保emoji集合存在并创建索引
|
||
|
||
这个函数用于确保MongoDB数据库中存在emoji集合,并创建必要的索引。
|
||
|
||
索引的作用是加快数据库查询速度:
|
||
- embedding字段的2dsphere索引: 用于加速向量相似度搜索,帮助快速找到相似的表情包
|
||
- tags字段的普通索引: 加快按标签搜索表情包的速度
|
||
- filename字段的唯一索引: 确保文件名不重复,同时加快按文件名查找的速度
|
||
|
||
没有索引的话,数据库每次查询都需要扫描全部数据,建立索引后可以大大提高查询效率。
|
||
"""
|
||
if "emoji" not in db.list_collection_names():
|
||
db.create_collection("emoji")
|
||
db.emoji.create_index([("embedding", "2dsphere")])
|
||
db.emoji.create_index([("filename", 1)], unique=True)
|
||
|
||
def record_usage(self, emoji_id: str):
|
||
"""记录表情使用次数"""
|
||
try:
|
||
self._ensure_db()
|
||
db.emoji.update_one({"_id": emoji_id}, {"$inc": {"usage_count": 1}})
|
||
except Exception as e:
|
||
logger.error(f"记录表情使用失败: {str(e)}")
|
||
|
||
async def get_emoji_for_text(self, text: str) -> Optional[Tuple[str, str]]:
|
||
"""根据文本内容获取相关表情包
|
||
Args:
|
||
text: 输入文本
|
||
Returns:
|
||
Optional[str]: 表情包文件路径,如果没有找到则返回None
|
||
|
||
|
||
可不可以通过 配置文件中的指令 来自定义使用表情包的逻辑?
|
||
我觉得可行
|
||
|
||
"""
|
||
try:
|
||
self._ensure_db()
|
||
|
||
# 获取文本的embedding
|
||
text_for_search = await self._get_kimoji_for_text(text)
|
||
if not text_for_search:
|
||
logger.error("无法获取文本的情绪")
|
||
return None
|
||
text_embedding = await get_embedding(text_for_search, request_type="emoji")
|
||
if not text_embedding:
|
||
logger.error("无法获取文本的embedding")
|
||
return None
|
||
|
||
try:
|
||
# 获取所有表情包
|
||
all_emojis = [
|
||
e
|
||
for e in db.emoji.find({}, {"_id": 1, "path": 1, "embedding": 1, "description": 1, "blacklist": 1})
|
||
if "blacklist" not in e
|
||
]
|
||
|
||
if not all_emojis:
|
||
logger.warning("数据库中没有任何表情包")
|
||
return None
|
||
|
||
# 计算余弦相似度并排序
|
||
def cosine_similarity(v1, v2):
|
||
if not v1 or not v2:
|
||
return 0
|
||
dot_product = sum(a * b for a, b in zip(v1, v2))
|
||
norm_v1 = sum(a * a for a in v1) ** 0.5
|
||
norm_v2 = sum(b * b for b in v2) ** 0.5
|
||
if norm_v1 == 0 or norm_v2 == 0:
|
||
return 0
|
||
return dot_product / (norm_v1 * norm_v2)
|
||
|
||
# 计算所有表情包与输入文本的相似度
|
||
emoji_similarities = [
|
||
(emoji, cosine_similarity(text_embedding, emoji.get("embedding", []))) for emoji in all_emojis
|
||
]
|
||
|
||
# 按相似度降序排序
|
||
emoji_similarities.sort(key=lambda x: x[1], reverse=True)
|
||
|
||
# 获取前3个最相似的表情包
|
||
top_10_emojis = emoji_similarities[: 10 if len(emoji_similarities) > 10 else len(emoji_similarities)]
|
||
|
||
if not top_10_emojis:
|
||
logger.warning("未找到匹配的表情包")
|
||
return None
|
||
|
||
# 从前3个中随机选择一个
|
||
selected_emoji, similarity = random.choice(top_10_emojis)
|
||
|
||
if selected_emoji and "path" in selected_emoji:
|
||
# 更新使用次数
|
||
db.emoji.update_one({"_id": selected_emoji["_id"]}, {"$inc": {"usage_count": 1}})
|
||
|
||
logger.info(
|
||
f"[匹配] 找到表情包: {selected_emoji.get('description', '无描述')} (相似度: {similarity:.4f})"
|
||
)
|
||
# 稍微改一下文本描述,不然容易产生幻觉,描述已经包含 表情包 了
|
||
return selected_emoji["path"], "[ %s ]" % selected_emoji.get("description", "无描述")
|
||
|
||
except Exception as search_error:
|
||
logger.error(f"[错误] 搜索表情包失败: {str(search_error)}")
|
||
return None
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"[错误] 获取表情包失败: {str(e)}")
|
||
return None
|
||
|
||
async def _get_emoji_description(self, image_base64: str) -> str:
|
||
"""获取表情包的标签,使用image_manager的描述生成功能"""
|
||
|
||
try:
|
||
# 使用image_manager获取描述,去掉前后的方括号和"表情包:"前缀
|
||
description = await image_manager.get_emoji_description(image_base64)
|
||
# 去掉[表情包:xxx]的格式,只保留描述内容
|
||
description = description.strip("[]").replace("表情包:", "")
|
||
return description
|
||
|
||
except Exception as e:
|
||
logger.error(f"[错误] 获取表情包描述失败: {str(e)}")
|
||
return None
|
||
|
||
async def _check_emoji(self, image_base64: str, image_format: str) -> str:
|
||
try:
|
||
prompt = (
|
||
f'这是一个表情包,请回答这个表情包是否满足"{global_config.EMOJI_CHECK_PROMPT}"的要求,是则回答是,'
|
||
f"否则回答否,不要出现任何其他内容"
|
||
)
|
||
|
||
content, _ = await self.vlm.generate_response_for_image(prompt, image_base64, image_format)
|
||
logger.debug(f"[检查] 表情包检查结果: {content}")
|
||
return content
|
||
|
||
except Exception as e:
|
||
logger.error(f"[错误] 表情包检查失败: {str(e)}")
|
||
return None
|
||
|
||
async def _get_kimoji_for_text(self, text: str):
|
||
try:
|
||
prompt = (
|
||
f"这是{global_config.BOT_NICKNAME}将要发送的消息内容:\n{text}\n若要为其配上表情包,"
|
||
f"请你输出这个表情包应该表达怎样的情感,应该给人什么样的感觉,不要太简洁也不要太长,"
|
||
f'注意不要输出任何对消息内容的分析内容,只输出"一种什么样的感觉"中间的形容词部分。'
|
||
)
|
||
|
||
content, _ = await self.llm_emotion_judge.generate_response_async(prompt, temperature=1.5)
|
||
logger.info(f"[情感] 表情包情感描述: {content}")
|
||
return content
|
||
|
||
except Exception as e:
|
||
logger.error(f"[错误] 获取表情包情感失败: {str(e)}")
|
||
return None
|
||
|
||
async def scan_new_emojis(self):
|
||
"""扫描新的表情包"""
|
||
try:
|
||
emoji_dir = self.EMOJI_DIR
|
||
os.makedirs(emoji_dir, exist_ok=True)
|
||
|
||
# 获取所有支持的图片文件
|
||
files_to_process = [
|
||
f for f in os.listdir(emoji_dir) if f.lower().endswith((".jpg", ".jpeg", ".png", ".gif"))
|
||
]
|
||
|
||
for filename in files_to_process:
|
||
image_path = os.path.join(emoji_dir, filename)
|
||
|
||
# 获取图片的base64编码和哈希值
|
||
image_base64 = image_path_to_base64(image_path)
|
||
if image_base64 is None:
|
||
os.remove(image_path)
|
||
continue
|
||
|
||
image_bytes = base64.b64decode(image_base64)
|
||
image_hash = hashlib.md5(image_bytes).hexdigest()
|
||
image_format = Image.open(io.BytesIO(image_bytes)).format.lower()
|
||
# 检查是否已经注册过
|
||
existing_emoji_by_path = db["emoji"].find_one({"filename": filename})
|
||
existing_emoji_by_hash = db["emoji"].find_one({"hash": image_hash})
|
||
if existing_emoji_by_path and existing_emoji_by_hash:
|
||
if existing_emoji_by_path["_id"] != existing_emoji_by_hash["_id"]:
|
||
logger.error(f"[错误] 表情包已存在但记录不一致: {filename}")
|
||
db.emoji.delete_one({"_id": existing_emoji_by_path["_id"]})
|
||
db.emoji.delete_one({"_id": existing_emoji_by_hash["_id"]})
|
||
existing_emoji = None
|
||
else:
|
||
existing_emoji = existing_emoji_by_hash
|
||
elif existing_emoji_by_hash:
|
||
logger.error(f"[错误] 表情包hash已存在但path不存在: {filename}")
|
||
db.emoji.delete_one({"_id": existing_emoji_by_hash["_id"]})
|
||
existing_emoji = None
|
||
elif existing_emoji_by_path:
|
||
logger.error(f"[错误] 表情包path已存在但hash不存在: {filename}")
|
||
db.emoji.delete_one({"_id": existing_emoji_by_path["_id"]})
|
||
existing_emoji = None
|
||
else:
|
||
existing_emoji = None
|
||
|
||
description = None
|
||
|
||
if existing_emoji:
|
||
# 即使表情包已存在,也检查是否需要同步到images集合
|
||
description = existing_emoji.get("description")
|
||
# 检查是否在images集合中存在
|
||
existing_image = db.images.find_one({"hash": image_hash})
|
||
if not existing_image:
|
||
# 同步到images集合
|
||
image_doc = {
|
||
"hash": image_hash,
|
||
"path": image_path,
|
||
"type": "emoji",
|
||
"description": description,
|
||
"timestamp": int(time.time()),
|
||
}
|
||
db.images.update_one({"hash": image_hash}, {"$set": image_doc}, upsert=True)
|
||
# 保存描述到image_descriptions集合
|
||
image_manager._save_description_to_db(image_hash, description, "emoji")
|
||
logger.success(f"[同步] 已同步表情包到images集合: {filename}")
|
||
continue
|
||
|
||
# 检查是否在images集合中已有描述
|
||
existing_description = image_manager._get_description_from_db(image_hash, "emoji")
|
||
|
||
if existing_description:
|
||
description = existing_description
|
||
else:
|
||
# 获取表情包的描述
|
||
description = await self._get_emoji_description(image_base64)
|
||
|
||
if global_config.EMOJI_CHECK:
|
||
check = await self._check_emoji(image_base64, image_format)
|
||
if "是" not in check:
|
||
os.remove(image_path)
|
||
logger.info(f"[过滤] 表情包描述: {description}")
|
||
logger.info(f"[过滤] 表情包不满足规则,已移除: {check}")
|
||
continue
|
||
logger.info(f"[检查] 表情包检查通过: {check}")
|
||
|
||
if description is not None:
|
||
embedding = await get_embedding(description, request_type="emoji")
|
||
# 准备数据库记录
|
||
emoji_record = {
|
||
"filename": filename,
|
||
"path": image_path,
|
||
"embedding": embedding,
|
||
"description": description,
|
||
"hash": image_hash,
|
||
"timestamp": int(time.time()),
|
||
}
|
||
|
||
# 保存到emoji数据库
|
||
db["emoji"].insert_one(emoji_record)
|
||
logger.success(f"[注册] 新表情包: {filename}")
|
||
logger.info(f"[描述] {description}")
|
||
|
||
# 保存到images数据库
|
||
image_doc = {
|
||
"hash": image_hash,
|
||
"path": image_path,
|
||
"type": "emoji",
|
||
"description": description,
|
||
"timestamp": int(time.time()),
|
||
}
|
||
db.images.update_one({"hash": image_hash}, {"$set": image_doc}, upsert=True)
|
||
# 保存描述到image_descriptions集合
|
||
image_manager._save_description_to_db(image_hash, description, "emoji")
|
||
logger.success(f"[同步] 已保存到images集合: {filename}")
|
||
else:
|
||
logger.warning(f"[跳过] 表情包: {filename}")
|
||
|
||
except Exception:
|
||
logger.exception("[错误] 扫描表情包失败")
|
||
|
||
async def _periodic_scan(self):
|
||
"""定期扫描新表情包"""
|
||
while True:
|
||
logger.info("[扫描] 开始扫描新表情包...")
|
||
await self.scan_new_emojis()
|
||
await asyncio.sleep(global_config.EMOJI_CHECK_INTERVAL * 60)
|
||
|
||
def check_emoji_file_integrity(self):
|
||
"""检查表情包文件完整性
|
||
如果文件已被删除,则从数据库中移除对应记录
|
||
"""
|
||
try:
|
||
self._ensure_db()
|
||
# 获取所有表情包记录
|
||
all_emojis = list(db.emoji.find())
|
||
removed_count = 0
|
||
total_count = len(all_emojis)
|
||
|
||
for emoji in all_emojis:
|
||
try:
|
||
if "path" not in emoji:
|
||
logger.warning(f"[检查] 发现无效记录(缺少path字段),ID: {emoji.get('_id', 'unknown')}")
|
||
db.emoji.delete_one({"_id": emoji["_id"]})
|
||
removed_count += 1
|
||
continue
|
||
|
||
if "embedding" not in emoji:
|
||
logger.warning(f"[检查] 发现过时记录(缺少embedding字段),ID: {emoji.get('_id', 'unknown')}")
|
||
db.emoji.delete_one({"_id": emoji["_id"]})
|
||
removed_count += 1
|
||
continue
|
||
|
||
# 检查文件是否存在
|
||
if not os.path.exists(emoji["path"]):
|
||
logger.warning(f"[检查] 表情包文件已被删除: {emoji['path']}")
|
||
# 从数据库中删除记录
|
||
result = db.emoji.delete_one({"_id": emoji["_id"]})
|
||
if result.deleted_count > 0:
|
||
logger.debug(f"[清理] 成功删除数据库记录: {emoji['_id']}")
|
||
removed_count += 1
|
||
else:
|
||
logger.error(f"[错误] 删除数据库记录失败: {emoji['_id']}")
|
||
continue
|
||
|
||
if "hash" not in emoji:
|
||
logger.warning(f"[检查] 发现缺失记录(缺少hash字段),ID: {emoji.get('_id', 'unknown')}")
|
||
hash = hashlib.md5(open(emoji["path"], "rb").read()).hexdigest()
|
||
db.emoji.update_one({"_id": emoji["_id"]}, {"$set": {"hash": hash}})
|
||
else:
|
||
file_hash = hashlib.md5(open(emoji["path"], "rb").read()).hexdigest()
|
||
if emoji["hash"] != file_hash:
|
||
logger.warning(f"[检查] 表情包文件hash不匹配,ID: {emoji.get('_id', 'unknown')}")
|
||
db.emoji.delete_one({"_id": emoji["_id"]})
|
||
removed_count += 1
|
||
|
||
# 修复拼写错误
|
||
if "discription" in emoji:
|
||
desc = emoji["discription"]
|
||
db.emoji.update_one(
|
||
{"_id": emoji["_id"]}, {"$unset": {"discription": ""}, "$set": {"description": desc}}
|
||
)
|
||
|
||
except Exception as item_error:
|
||
logger.error(f"[错误] 处理表情包记录时出错: {str(item_error)}")
|
||
continue
|
||
|
||
# 验证清理结果
|
||
remaining_count = db.emoji.count_documents({})
|
||
if removed_count > 0:
|
||
logger.success(f"[清理] 已清理 {removed_count} 个失效的表情包记录")
|
||
logger.info(f"[统计] 清理前: {total_count} | 清理后: {remaining_count}")
|
||
else:
|
||
logger.info(f"[检查] 已检查 {total_count} 个表情包记录")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[错误] 检查表情包完整性失败: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
|
||
async def start_periodic_check(self):
|
||
while True:
|
||
self.check_emoji_file_integrity()
|
||
await asyncio.sleep(global_config.EMOJI_CHECK_INTERVAL * 60)
|
||
|
||
|
||
# 创建全局单例
|
||
|
||
emoji_manager = EmojiManager()
|