This commit is contained in:
SengokuCola
2025-04-24 23:45:49 +08:00
parent af08ef9b04
commit 3ab3979047
6 changed files with 117 additions and 130 deletions

View File

@@ -12,7 +12,6 @@ import re
from ...common.database import db
from ...config.config import global_config
from ..chat.utils import get_embedding
from ..chat.utils_image import image_path_to_base64, image_manager
from ..models.utils_model import LLMRequest
from src.common.logger import get_module_logger, LogConfig, EMOJI_STYLE_CONFIG
@@ -31,6 +30,7 @@ EMOJI_REGISTED_DIR = os.path.join("data", "emoji_registed") # 已注册的表
class MaiEmoji:
"""定义一个表情包"""
def __init__(self, filename: str, path: str):
self.path = path # 存储目录路径
self.filename = filename
@@ -43,13 +43,13 @@ class MaiEmoji:
self.register_time = time.time()
self.is_deleted = False # 标记是否已被删除
self.format = ""
async def initialize_hash_format(self):
"""从文件创建表情包实例
参数:
file_path: 文件的完整路径
返回:
MaiEmoji: 创建的表情包实例如果失败则返回None
"""
@@ -58,26 +58,24 @@ class MaiEmoji:
if not os.path.exists(file_path):
logger.error(f"[错误] 表情包文件不存在: {file_path}")
return None
image_base64 = image_path_to_base64(file_path)
if image_base64 is None:
logger.error(f"[错误] 无法读取图片: {file_path}")
return None
# 计算哈希值
image_bytes = base64.b64decode(image_base64)
self.hash = hashlib.md5(image_bytes).hexdigest()
# 获取图片格式
self.format = Image.open(io.BytesIO(image_bytes)).format.lower()
except Exception as e:
logger.error(f"[错误] 初始化表情包失败: {str(e)}")
logger.error(traceback.format_exc())
return None
async def register_to_db(self):
"""
注册表情包
@@ -110,30 +108,26 @@ class MaiEmoji:
self.path = EMOJI_REGISTED_DIR
except Exception as move_error:
logger.error(f"[错误] 移动文件失败: {str(move_error)}")
return False # 文件移动失败,不继续
return False # 文件移动失败,不继续
# --- 数据库操作 ---
try:
# 准备数据库记录 for emoji collection
emoji_record = {
"filename": self.filename,
"path": os.path.join(self.path, self.filename), # 使用更新后的路径
"path": os.path.join(self.path, self.filename), # 使用更新后的路径
"embedding": self.embedding,
"description": self.description,
"emotion": self.emotion, # 添加情感标签字段
"hash": self.hash,
"format": self.format,
"timestamp": int(self.register_time), # 使用实例的注册时间
"timestamp": int(self.register_time), # 使用实例的注册时间
"usage_count": self.usage_count,
"last_used_time": self.last_used_time
"last_used_time": self.last_used_time,
}
# 使用upsert确保记录存在或被更新
db["emoji"].update_one(
{"hash": self.hash},
{"$set": emoji_record},
upsert=True
)
db["emoji"].update_one({"hash": self.hash}, {"$set": emoji_record}, upsert=True)
logger.success(f"[注册] 表情包信息保存到数据库: {self.description}")
return True
@@ -147,12 +141,12 @@ class MaiEmoji:
logger.error(f"[错误] 注册表情包失败: {str(e)}")
logger.error(traceback.format_exc())
return False
async def delete(self):
"""删除表情包
删除表情包的文件和数据库记录
返回:
bool: 是否成功删除
"""
@@ -165,21 +159,21 @@ class MaiEmoji:
except Exception as e:
logger.error(f"[错误] 删除文件失败 {os.path.join(self.path, self.filename)}: {str(e)}")
# 继续执行,即使文件删除失败也尝试删除数据库记录
# 2. 删除数据库记录
result = db.emoji.delete_one({"hash": self.hash})
deleted_in_db = result.deleted_count > 0
if deleted_in_db:
logger.success(f"[删除] 成功删除表情包记录: {self.description}")
# 3. 标记对象已被删除
self.is_deleted = True
return True
else:
logger.error(f"[错误] 删除表情包记录失败: {self.hash}")
return False
except Exception as e:
logger.error(f"[错误] 删除表情包失败: {str(e)}")
return False
@@ -188,7 +182,6 @@ class MaiEmoji:
class EmojiManager:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
@@ -213,7 +206,6 @@ class EmojiManager:
"""确保表情存储目录存在"""
os.makedirs(EMOJI_DIR, exist_ok=True)
def initialize(self):
"""初始化数据库连接和表情目录"""
if not self._initialized:
@@ -286,7 +278,7 @@ class EmojiManager:
emotions = emoji.emotion
if not emotions:
continue
# 计算与每个emotion标签的相似度取最大值
max_similarity = 0
for emotion in emotions:
@@ -295,7 +287,7 @@ class EmojiManager:
max_len = max(len(text_emotion), len(emotion))
similarity = 1 - (distance / max_len if max_len > 0 else 0)
max_similarity = max(max_similarity, similarity)
emoji_similarities.append((emoji, max_similarity))
# 按相似度降序排序
@@ -314,10 +306,8 @@ class EmojiManager:
# 更新使用次数
db.emoji.update_one({"hash": selected_emoji.hash}, {"$inc": {"usage_count": 1}})
logger.info(
f"[匹配] 找到表情包: {selected_emoji.description} (相似度: {similarity:.4f})"
)
logger.info(f"[匹配] 找到表情包: {selected_emoji.description} (相似度: {similarity:.4f})")
time_end = time.time()
logger.info(f"[匹配] 搜索表情包用时: {time_end - time_start:.2f}")
return os.path.join(selected_emoji.path, selected_emoji.filename), f"[ {selected_emoji.description} ]"
@@ -328,11 +318,11 @@ class EmojiManager:
def _levenshtein_distance(self, s1: str, s2: str) -> int:
"""计算两个字符串的编辑距离
Args:
s1: 第一个字符串
s2: 第二个字符串
Returns:
int: 编辑距离
"""
@@ -363,7 +353,7 @@ class EmojiManager:
if not self.emoji_objects:
logger.warning("[检查] emoji_objects为空跳过完整性检查")
return
total_count = len(self.emoji_objects)
removed_count = 0
# 使用列表复制进行遍历,因为我们会在遍历过程中修改列表
@@ -403,7 +393,7 @@ class EmojiManager:
logger.info("[扫描] 开始检查表情包完整性...")
self.check_emoji_file_integrity()
logger.info("[扫描] 开始扫描新表情包...")
# 检查表情包目录是否存在
if not os.path.exists(EMOJI_DIR):
logger.warning(f"[警告] 表情包目录不存在: {EMOJI_DIR}")
@@ -411,24 +401,27 @@ class EmojiManager:
logger.info(f"[创建] 已创建表情包目录: {EMOJI_DIR}")
await asyncio.sleep(global_config.EMOJI_CHECK_INTERVAL * 60)
continue
# 检查目录是否为空
files = os.listdir(EMOJI_DIR)
if not files:
logger.warning(f"[警告] 表情包目录为空: {EMOJI_DIR}")
await asyncio.sleep(global_config.EMOJI_CHECK_INTERVAL * 60)
continue
# 检查是否需要处理表情包(数量超过最大值或不足)
if (self.emoji_num > self.emoji_num_max and global_config.max_reach_deletion) or (self.emoji_num < self.emoji_num_max):
if (self.emoji_num > self.emoji_num_max and global_config.max_reach_deletion) or (
self.emoji_num < self.emoji_num_max
):
try:
# 获取目录下所有图片文件
files_to_process = [
f for f in files
if os.path.isfile(os.path.join(EMOJI_DIR, f))
f
for f in files
if os.path.isfile(os.path.join(EMOJI_DIR, f))
and f.lower().endswith((".jpg", ".jpeg", ".png", ".gif"))
]
# 处理每个符合条件的文件
for filename in files_to_process:
# 尝试注册表情包
@@ -443,24 +436,24 @@ class EmojiManager:
logger.warning(f"[清理] 删除注册失败的表情包文件: {filename}")
except Exception as e:
logger.error(f"[错误] 扫描表情包目录失败: {str(e)}")
await asyncio.sleep(global_config.EMOJI_CHECK_INTERVAL * 60)
async def get_all_emoji_from_db(self):
"""获取所有表情包并初始化为MaiEmoji类对象
参数:
hash: 可选,如果提供则只返回指定哈希值的表情包
返回:
list[MaiEmoji]: 表情包对象列表
"""
try:
self._ensure_db()
# 获取所有表情包
all_emoji_data = list(db.emoji.find())
# 将数据库记录转换为MaiEmoji对象
emoji_objects = []
for emoji_data in all_emoji_data:
@@ -468,7 +461,7 @@ class EmojiManager:
filename=emoji_data.get("filename", ""),
path=emoji_data.get("path", ""),
)
# 设置额外属性
emoji.usage_count = emoji_data.get("usage_count", 0)
emoji.last_used_time = emoji_data.get("last_used_time", emoji_data.get("timestamp", time.time()))
@@ -476,33 +469,33 @@ class EmojiManager:
emoji.description = emoji_data.get("description", "")
emoji.emotion = emoji_data.get("emotion", []) # 添加情感标签的加载
emoji_objects.append(emoji)
# 存储到EmojiManager中
self.emoji_objects = emoji_objects
except Exception as e:
logger.error(f"[错误] 获取所有表情包对象失败: {str(e)}")
async def get_emoji_from_db(self, hash=None):
"""获取所有表情包并初始化为MaiEmoji类对象
参数:
hash: 可选,如果提供则只返回指定哈希值的表情包
返回:
list[MaiEmoji]: 表情包对象列表
"""
try:
self._ensure_db()
# 准备查询条件
query = {}
if hash:
query = {"hash": hash}
# 获取所有表情包
all_emoji_data = list(db.emoji.find(query))
# 将数据库记录转换为MaiEmoji对象
emoji_objects = []
for emoji_data in all_emoji_data:
@@ -510,28 +503,28 @@ class EmojiManager:
filename=emoji_data.get("filename", ""),
path=emoji_data.get("path", ""),
)
# 设置额外属性
emoji.usage_count = emoji_data.get("usage_count", 0)
emoji.last_used_time = emoji_data.get("last_used_time", emoji_data.get("timestamp", time.time()))
emoji.register_time = emoji_data.get("timestamp", time.time())
emoji.description = emoji_data.get("description", "")
emoji.emotion = emoji_data.get("emotion", []) # 添加情感标签的加载
emoji_objects.append(emoji)
# 存储到EmojiManager中
self.emoji_objects = emoji_objects
return emoji_objects
except Exception as e:
logger.error(f"[错误] 获取所有表情包对象失败: {str(e)}")
return []
async def get_emoji_from_manager(self, hash) -> MaiEmoji:
"""从EmojiManager中获取表情包
参数:
hash:如果提供则只返回指定哈希值的表情包
"""
@@ -539,43 +532,41 @@ class EmojiManager:
if emoji.hash == hash:
return emoji
return None
async def delete_emoji(self, emoji_hash: str) -> bool:
"""根据哈希值删除表情包
Args:
emoji_hash: 表情包的哈希值
Returns:
bool: 是否成功删除
"""
try:
self._ensure_db()
# 从emoji_objects中查找表情包对象
emoji = await self.get_emoji_from_manager(emoji_hash)
if not emoji:
logger.warning(f"[警告] 未找到哈希值为 {emoji_hash} 的表情包")
return False
# 使用MaiEmoji对象的delete方法删除表情包
success = await emoji.delete()
if success:
# 从emoji_objects列表中移除该对象
self.emoji_objects = [e for e in self.emoji_objects if e.hash != emoji_hash]
# 更新计数
self.emoji_num -= 1
logger.info(f"[统计] 当前表情包数量: {self.emoji_num}")
return True
else:
logger.error(f"[错误] 删除表情包失败: {emoji_hash}")
return False
except Exception as e:
logger.error(f"[错误] 删除表情包失败: {str(e)}")
logger.error(traceback.format_exc())
@@ -583,10 +574,10 @@ class EmojiManager:
def _emoji_objects_to_readable_list(self, emoji_objects):
"""将表情包对象列表转换为可读的字符串列表
参数:
emoji_objects: MaiEmoji对象列表
返回:
list[str]: 可读的表情包信息字符串列表
"""
@@ -596,32 +587,29 @@ class EmojiManager:
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(emoji.register_time))
# 构建每个表情包的信息字符串
emoji_info = (
f"编号: {i+1}\n"
f"描述: {emoji.description}\n"
f"使用次数: {emoji.usage_count}\n"
f"添加时间: {time_str}\n"
f"编号: {i + 1}\n描述: {emoji.description}\n使用次数: {emoji.usage_count}\n添加时间: {time_str}\n"
)
emoji_info_list.append(emoji_info)
return emoji_info_list
async def replace_a_emoji(self, new_emoji: MaiEmoji):
"""替换一个表情包
Args:
new_emoji: 新表情包对象
Returns:
bool: 是否成功替换表情包
"""
try:
self._ensure_db()
# 获取所有表情包对象
all_emojis = self.emoji_objects
# 将表情包信息转换为可读的字符串
emoji_info_list = self._emoji_objects_to_readable_list(all_emojis)
# 构建提示词
prompt = (
f"{global_config.BOT_NICKNAME}的表情包存储已满({self.emoji_num}/{self.emoji_num_max})"
@@ -629,34 +617,34 @@ class EmojiManager:
f"新表情包信息:\n"
f"描述: {new_emoji.description}\n\n"
f"现有表情包列表:\n" + "\n".join(emoji_info_list) + "\n\n"
f"请决定:\n"
f"1. 是否要删除某个现有表情包来为新表情包腾出空间?\n"
f"2. 如果要删除,应该删除哪一个(给出编号)\n"
f"请只回答:'不删除''删除编号X'(X为表情包编号)。"
"请决定:\n"
"1. 是否要删除某个现有表情包来为新表情包腾出空间?\n"
"2. 如果要删除,应该删除哪一个(给出编号)\n"
"请只回答:'不删除''删除编号X'(X为表情包编号)。"
)
# 调用大模型进行决策
decision, _ = await self.llm_emotion_judge.generate_response_async(prompt, temperature=0.8)
logger.info(f"[决策] 大模型决策结果: {decision}")
# 解析决策结果
if "不删除" in decision:
logger.info("[决策] 决定不删除任何表情包")
return False
# 尝试从决策中提取表情包编号
match = re.search(r'删除编号(\d+)', decision)
match = re.search(r"删除编号(\d+)", decision)
if match:
emoji_index = int(match.group(1)) - 1 # 转换为0-based索引
# 检查索引是否有效
if 0 <= emoji_index < len(all_emojis):
emoji_to_delete = all_emojis[emoji_index]
# 删除选定的表情包
logger.info(f"[决策] 决定删除表情包: {emoji_to_delete.description}")
delete_success = await self.delete_emoji(emoji_to_delete.hash)
if delete_success:
# 修复:等待异步注册完成
register_success = await new_emoji.register_to_db()
@@ -669,26 +657,26 @@ class EmojiManager:
logger.error(f"[错误] 注册表情包到数据库失败: {new_emoji.filename}")
return False
else:
logger.error(f"[错误] 删除表情包失败,无法完成替换")
logger.error("[错误] 删除表情包失败,无法完成替换")
return False
else:
logger.error(f"[错误] 无效的表情包编号: {emoji_index+1}")
logger.error(f"[错误] 无效的表情包编号: {emoji_index + 1}")
else:
logger.error(f"[错误] 无法从决策中提取表情包编号: {decision}")
return False
except Exception as e:
logger.error(f"[错误] 替换表情包失败: {str(e)}")
logger.error(traceback.format_exc())
return False
async def build_emoji_description(self, image_base64: str) -> Tuple[str, list]:
"""获取表情包描述和情感列表
Args:
image_base64: 图片的base64编码
Returns:
Tuple[str, list]: 返回表情包描述和情感列表
"""
@@ -705,7 +693,7 @@ class EmojiManager:
else:
prompt = "这是一个表情包,请详细描述一下表情包所表达的情感和内容,请关注其幽默和讽刺意味"
description, _ = await self.vlm.generate_response_for_image(prompt, image_base64, image_format)
# 审核表情包
if global_config.EMOJI_CHECK:
prompt = f'''
@@ -721,31 +709,30 @@ class EmojiManager:
return None, []
# 分析情感含义
emotion_prompt = f'''
emotion_prompt = f"""
基于这个表情包的描述:'{description}'请列出1-3个可能的情感标签每个标签用一个词组表示格式如下
幽默的讽刺
悲伤的无奈
愤怒的抗议
愤怒的讽刺
直接输出词组,词组检用逗号分隔。'''
直接输出词组,词组检用逗号分隔。"""
emotions_text, _ = await self.llm_emotion_judge.generate_response_async(emotion_prompt, temperature=0.7)
# 处理情感列表
emotions = [e.strip() for e in emotions_text.split(',') if e.strip()]
emotions = [e.strip() for e in emotions_text.split(",") if e.strip()]
return f"[表情包:{description}]", emotions
except Exception as e:
logger.error(f"获取表情包描述失败: {str(e)}")
return "", []
async def register_emoji_by_filename(self, filename: str) -> bool:
"""读取指定文件名的表情包图片,分析并注册到数据库
Args:
filename: 表情包文件名必须位于EMOJI_DIR目录下
Returns:
bool: 注册是否成功
"""
@@ -765,7 +752,7 @@ class EmojiManager:
if await self.get_emoji_from_manager(new_emoji.hash):
logger.warning(f"[警告] 表情包已存在: {filename}")
return False
if self.emoji_num >= self.emoji_num_max:
logger.warning(f"表情包数量已达到上限({self.emoji_num}/{self.emoji_num_max})")
replaced = await self.replace_a_emoji(new_emoji)
@@ -783,7 +770,7 @@ class EmojiManager:
else:
logger.error(f"[错误] 注册表情包到数据库失败: {filename}")
return False
except Exception as e:
logger.error(f"[错误] 注册表情包失败: {str(e)}")
logger.error(traceback.format_exc())