更新表情管理器中的类型注解,确保函数返回类型明确,并调整LLMUsage表初始化日志级别为调试

This commit is contained in:
墨梓柒
2025-05-16 17:45:50 +08:00
parent 335c62c50f
commit 13ae323a1c
2 changed files with 31 additions and 77 deletions

View File

@@ -5,7 +5,7 @@ import os
import random import random
import time import time
import traceback import traceback
from typing import Optional, Tuple from typing import Optional, Tuple, List, Any
from PIL import Image from PIL import Image
import io import io
import re import re
@@ -54,7 +54,7 @@ class MaiEmoji:
self.is_deleted = False # 标记是否已被删除 self.is_deleted = False # 标记是否已被删除
self.format = "" self.format = ""
async def initialize_hash_format(self): async def initialize_hash_format(self) -> Optional[bool]:
"""从文件创建表情包实例, 计算哈希值和格式""" """从文件创建表情包实例, 计算哈希值和格式"""
try: try:
# 使用 full_path 检查文件是否存在 # 使用 full_path 检查文件是否存在
@@ -107,7 +107,7 @@ class MaiEmoji:
self.is_deleted = True self.is_deleted = True
return None return None
async def register_to_db(self): async def register_to_db(self) -> bool:
""" """
注册表情包 注册表情包
将表情包对应的文件从当前路径移动到EMOJI_REGISTED_DIR目录下 将表情包对应的文件从当前路径移动到EMOJI_REGISTED_DIR目录下
@@ -176,7 +176,7 @@ class MaiEmoji:
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
return False return False
async def delete(self): async def delete(self) -> bool:
"""删除表情包 """删除表情包
删除表情包的文件和数据库记录 删除表情包的文件和数据库记录
@@ -223,7 +223,7 @@ class MaiEmoji:
return False return False
def _emoji_objects_to_readable_list(emoji_objects): def _emoji_objects_to_readable_list(emoji_objects: List['MaiEmoji']) -> List[str]:
"""将表情包对象列表转换为可读的字符串列表 """将表情包对象列表转换为可读的字符串列表
参数: 参数:
@@ -242,7 +242,7 @@ def _emoji_objects_to_readable_list(emoji_objects):
return emoji_info_list return emoji_info_list
def _to_emoji_objects(data): def _to_emoji_objects(data: Any) -> Tuple[List['MaiEmoji'], int]:
emoji_objects = [] emoji_objects = []
load_errors = 0 load_errors = 0
# data is now an iterable of Peewee Emoji model instances # data is now an iterable of Peewee Emoji model instances
@@ -292,13 +292,13 @@ def _to_emoji_objects(data):
return emoji_objects, load_errors return emoji_objects, load_errors
def _ensure_emoji_dir(): def _ensure_emoji_dir() -> None:
"""确保表情存储目录存在""" """确保表情存储目录存在"""
os.makedirs(EMOJI_DIR, exist_ok=True) os.makedirs(EMOJI_DIR, exist_ok=True)
os.makedirs(EMOJI_REGISTED_DIR, exist_ok=True) os.makedirs(EMOJI_REGISTED_DIR, exist_ok=True)
async def clear_temp_emoji(): async def clear_temp_emoji() -> None:
"""清理临时表情包 """清理临时表情包
清理/data/emoji和/data/image目录下的所有文件 清理/data/emoji和/data/image目录下的所有文件
当目录中文件数超过100时会全部删除 当目录中文件数超过100时会全部删除
@@ -320,7 +320,7 @@ async def clear_temp_emoji():
logger.success("[清理] 完成") logger.success("[清理] 完成")
async def clean_unused_emojis(emoji_dir, emoji_objects): async def clean_unused_emojis(emoji_dir: str, emoji_objects: List['MaiEmoji']) -> None:
"""清理指定目录中未被 emoji_objects 追踪的表情包文件""" """清理指定目录中未被 emoji_objects 追踪的表情包文件"""
if not os.path.exists(emoji_dir): if not os.path.exists(emoji_dir):
logger.warning(f"[清理] 目标目录不存在,跳过清理: {emoji_dir}") logger.warning(f"[清理] 目标目录不存在,跳过清理: {emoji_dir}")
@@ -360,13 +360,13 @@ async def clean_unused_emojis(emoji_dir, emoji_objects):
class EmojiManager: class EmojiManager:
_instance = None _instance = None
def __new__(cls): def __new__(cls) -> 'EmojiManager':
if cls._instance is None: if cls._instance is None:
cls._instance = super().__new__(cls) cls._instance = super().__new__(cls)
cls._instance._initialized = False cls._instance._initialized = False
return cls._instance return cls._instance
def __init__(self): def __init__(self) -> None:
self._initialized = None self._initialized = None
self._scan_task = None self._scan_task = None
self.vlm = LLMRequest(model=global_config.vlm, temperature=0.3, max_tokens=1000, request_type="emoji") self.vlm = LLMRequest(model=global_config.vlm, temperature=0.3, max_tokens=1000, request_type="emoji")
@@ -377,33 +377,26 @@ class EmojiManager:
self.emoji_num = 0 self.emoji_num = 0
self.emoji_num_max = global_config.max_emoji_num self.emoji_num_max = global_config.max_emoji_num
self.emoji_num_max_reach_deletion = global_config.max_reach_deletion self.emoji_num_max_reach_deletion = global_config.max_reach_deletion
self.emoji_objects: list[MaiEmoji] = [] # 存储MaiEmoji对象的列表使用类型注解明确列表元素类型 self.emoji_objects: List[MaiEmoji] = [] # 存储MaiEmoji对象的列表使用类型注解明确列表元素类型
logger.info("启动表情包管理器") logger.info("启动表情包管理器")
def initialize(self): def initialize(self) -> None:
"""初始化数据库连接和表情目录""" """初始化数据库连接和表情目录"""
peewee_db.connect(reuse_if_open=True) peewee_db.connect(reuse_if_open=True)
if peewee_db.is_closed(): if peewee_db.is_closed():
raise RuntimeError("数据库连接失败") raise RuntimeError("数据库连接失败")
_ensure_emoji_dir() _ensure_emoji_dir()
Emoji.create_table(safe=True) # Ensures table exists Emoji.create_table(safe=True) # Ensures table exists
# if not self._initialized:
# try:
# # Ensure Peewee database connection is up and tables are created
# self._initialized = True def _ensure_db(self) -> None:
# except Exception as e:
# logger.exception(f"初始化表情管理器失败: {e}")
def _ensure_db(self):
"""确保数据库已初始化""" """确保数据库已初始化"""
if not self._initialized: if not self._initialized:
self.initialize() self.initialize()
if not self._initialized: if not self._initialized:
raise RuntimeError("EmojiManager not initialized") raise RuntimeError("EmojiManager not initialized")
def record_usage(self, emoji_hash: str): def record_usage(self, emoji_hash: str) -> None:
"""记录表情使用次数""" """记录表情使用次数"""
try: try:
emoji_update = Emoji.get(Emoji.emoji_hash == emoji_hash) emoji_update = Emoji.get(Emoji.emoji_hash == emoji_hash)
@@ -431,7 +424,6 @@ class EmojiManager:
if not all_emojis: if not all_emojis:
logger.warning("内存中没有任何表情包对象") logger.warning("内存中没有任何表情包对象")
# 可以考虑再查一次数据库?或者依赖定期任务更新
return None return None
# 计算每个表情包与输入文本的最大情感相似度 # 计算每个表情包与输入文本的最大情感相似度
@@ -447,18 +439,18 @@ class EmojiManager:
# 计算与每个emotion标签的相似度取最大值 # 计算与每个emotion标签的相似度取最大值
max_similarity = 0 max_similarity = 0
best_matching_emotion = "" # 记录最匹配的 emotion 喵~ best_matching_emotion = ""
for emotion in emotions: for emotion in emotions:
# 使用编辑距离计算相似度 # 使用编辑距离计算相似度
distance = self._levenshtein_distance(text_emotion, emotion) distance = self._levenshtein_distance(text_emotion, emotion)
max_len = max(len(text_emotion), len(emotion)) max_len = max(len(text_emotion), len(emotion))
similarity = 1 - (distance / max_len if max_len > 0 else 0) similarity = 1 - (distance / max_len if max_len > 0 else 0)
if similarity > max_similarity: # 如果找到更相似的喵~ if similarity > max_similarity:
max_similarity = similarity max_similarity = similarity
best_matching_emotion = emotion # 就记下这个 emotion 喵~ best_matching_emotion = emotion
if best_matching_emotion: # 确保有匹配的情感才添加喵~ if best_matching_emotion:
emoji_similarities.append((emoji, max_similarity, best_matching_emotion)) # 把 emotion 也存起来喵~ emoji_similarities.append((emoji, max_similarity, best_matching_emotion))
# 按相似度降序排序 # 按相似度降序排序
emoji_similarities.sort(key=lambda x: x[1], reverse=True) emoji_similarities.sort(key=lambda x: x[1], reverse=True)
@@ -466,21 +458,21 @@ class EmojiManager:
# 获取前10个最相似的表情包 # 获取前10个最相似的表情包
top_emojis = ( top_emojis = (
emoji_similarities[:10] if len(emoji_similarities) > 10 else emoji_similarities emoji_similarities[:10] if len(emoji_similarities) > 10 else emoji_similarities
) # 改个名字,更清晰喵~ )
if not top_emojis: if not top_emojis:
logger.warning("未找到匹配的表情包") logger.warning("未找到匹配的表情包")
return None return None
# 从前几个中随机选择一个 # 从前几个中随机选择一个
selected_emoji, similarity, matched_emotion = random.choice(top_emojis) # 把匹配的 emotion 也拿出来喵~ selected_emoji, similarity, matched_emotion = random.choice(top_emojis)
# 更新使用次数 # 更新使用次数
self.record_usage(selected_emoji.emoji_hash) self.record_usage(selected_emoji.emoji_hash)
_time_end = time.time() _time_end = time.time()
logger.info( # 使用匹配到的 emotion 记录日志喵~ logger.info(
f"为[{text_emotion}]找到表情包: {matched_emotion} ({selected_emoji.filename}), Similarity: {similarity:.4f}" f"为[{text_emotion}]找到表情包: {matched_emotion} ({selected_emoji.filename}), Similarity: {similarity:.4f}"
) )
# 返回完整文件路径和描述 # 返回完整文件路径和描述
@@ -518,7 +510,7 @@ class EmojiManager:
return previous_row[-1] return previous_row[-1]
async def check_emoji_file_integrity(self): async def check_emoji_file_integrity(self) -> None:
"""检查表情包文件完整性 """检查表情包文件完整性
遍历self.emoji_objects中的所有对象检查文件是否存在 遍历self.emoji_objects中的所有对象检查文件是否存在
如果文件已被删除,则执行对象的删除方法并从列表中移除 如果文件已被删除,则执行对象的删除方法并从列表中移除
@@ -583,7 +575,7 @@ class EmojiManager:
logger.error(f"[错误] 检查表情包完整性失败: {str(e)}") logger.error(f"[错误] 检查表情包完整性失败: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
async def start_periodic_check_register(self): async def start_periodic_check_register(self) -> None:
"""定期检查表情包完整性和数量""" """定期检查表情包完整性和数量"""
await self.get_all_emoji_from_db() await self.get_all_emoji_from_db()
while True: while True:
@@ -637,7 +629,7 @@ class EmojiManager:
await asyncio.sleep(global_config.EMOJI_CHECK_INTERVAL * 60) await asyncio.sleep(global_config.EMOJI_CHECK_INTERVAL * 60)
async def get_all_emoji_from_db(self): async def get_all_emoji_from_db(self) -> None:
"""获取所有表情包并初始化为MaiEmoji类对象更新 self.emoji_objects""" """获取所有表情包并初始化为MaiEmoji类对象更新 self.emoji_objects"""
try: try:
self._ensure_db() self._ensure_db()
@@ -659,7 +651,7 @@ class EmojiManager:
self.emoji_objects = [] # 加载失败则清空列表 self.emoji_objects = [] # 加载失败则清空列表
self.emoji_num = 0 self.emoji_num = 0
async def get_emoji_from_db(self, emoji_hash=None): async def get_emoji_from_db(self, emoji_hash: Optional[str] = None) -> List['MaiEmoji']:
"""获取指定哈希值的表情包并初始化为MaiEmoji类对象列表 (主要用于调试或特定查找) """获取指定哈希值的表情包并初始化为MaiEmoji类对象列表 (主要用于调试或特定查找)
参数: 参数:
@@ -691,7 +683,7 @@ class EmojiManager:
logger.error(f"[错误] 从数据库获取表情包对象失败: {str(e)}") logger.error(f"[错误] 从数据库获取表情包对象失败: {str(e)}")
return [] return []
async def get_emoji_from_manager(self, emoji_hash) -> Optional[MaiEmoji]: async def get_emoji_from_manager(self, emoji_hash: str) -> Optional['MaiEmoji']:
"""从内存中的 emoji_objects 列表获取表情包 """从内存中的 emoji_objects 列表获取表情包
参数: 参数:
@@ -744,7 +736,7 @@ class EmojiManager:
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
return False return False
async def replace_a_emoji(self, new_emoji: MaiEmoji): async def replace_a_emoji(self, new_emoji: 'MaiEmoji') -> bool:
"""替换一个表情包 """替换一个表情包
Args: Args:
@@ -833,7 +825,7 @@ class EmojiManager:
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
return False return False
async def build_emoji_description(self, image_base64: str) -> Tuple[str, list]: async def build_emoji_description(self, image_base64: str) -> Tuple[str, List[str]]:
"""获取表情包描述和情感列表 """获取表情包描述和情感列表
Args: Args:
@@ -894,44 +886,6 @@ class EmojiManager:
logger.error(f"获取表情包描述失败: {str(e)}") logger.error(f"获取表情包描述失败: {str(e)}")
return "", [] return "", []
# async def register_emoji_by_filename(self, filename: str) -> bool:
# if global_config.EMOJI_CHECK:
# prompt = f'''
# 这是一个表情包,请对这个表情包进行审核,标准如下:
# 1. 必须符合"{global_config.EMOJI_CHECK_PROMPT}"的要求
# 2. 不能是色情、暴力、等违法违规内容,必须符合公序良俗
# 3. 不能是任何形式的截图,聊天记录或视频截图
# 4. 不要出现5个以上文字
# 请回答这个表情包是否满足上述要求,是则回答是,否则回答否,不要出现任何其他内容
# '''
# content, _ = await self.vlm.generate_response_for_image(prompt, image_base64, image_format)
# if content == "否":
# return "", []
# # 分析情感含义
# emotion_prompt = f"""
# 请你识别这个表情包的含义和适用场景给我简短的描述每个描述不要超过15个字
# 这是一个基于这个表情包的描述:'{description}'
# 你可以关注其幽默和讽刺意味,动用贴吧,微博,小红书的知识,必须从互联网梗,meme的角度去分析
# 请直接输出描述,不要出现任何其他内容,如果有多个描述,可以用逗号分隔
# """
# emotions_text, _ = await self.llm_emotion_judge.generate_response_async(emotion_prompt, temperature=0.7)
# # 处理情感列表
# emotions = [e.strip() for e in emotions_text.split(",") if e.strip()]
# # 根据情感标签数量随机选择喵~超过5个选3个超过2个选2个
# if len(emotions) > 5:
# emotions = random.sample(emotions, 3)
# elif len(emotions) > 2:
# emotions = random.sample(emotions, 2)
# return f"[表情包:{description}]", emotions
# except Exception as e:
# logger.error(f"获取表情包描述失败: {str(e)}")
# return "", []
async def register_emoji_by_filename(self, filename: str) -> bool: async def register_emoji_by_filename(self, filename: str) -> bool:
"""读取指定文件名的表情包图片,分析并注册到数据库 """读取指定文件名的表情包图片,分析并注册到数据库

View File

@@ -135,7 +135,7 @@ class LLMRequest:
try: try:
# 使用 Peewee 创建表safe=True 表示如果表已存在则不会抛出错误 # 使用 Peewee 创建表safe=True 表示如果表已存在则不会抛出错误
db.create_tables([LLMUsage], safe=True) db.create_tables([LLMUsage], safe=True)
logger.info("LLMUsage 表已初始化/确保存在。") logger.debug("LLMUsage 表已初始化/确保存在。")
except Exception as e: except Exception as e:
logger.error(f"创建 LLMUsage 表失败: {str(e)}") logger.error(f"创建 LLMUsage 表失败: {str(e)}")