重构表情管理

This commit is contained in:
LuiKlee
2025-12-15 18:08:59 +08:00
parent 1730a62363
commit 5e708fd1de
7 changed files with 508 additions and 409 deletions

View File

@@ -0,0 +1,22 @@
# 表情替换候选数量说明
## 背景
`MAX_EMOJI_FOR_PROMPT` 用于 `replace_a_emoji` 等场景,限制送入 LLM 的候选表情数量,避免上下文过长导致响应变慢或 token 开销过大。
## 为什么是 20
- 平衡:超过十几项后决策收益递减,但 token/时间成本线性增加。
- 性能在常用模型和硬件下20 个描述可在可接受延迟内返回决策。
- 兼容:历史实现也使用 20保持行为稳定。
## 何时调整
- 设备/模型更强且希望更广覆盖:可提升到 30-40但注意延迟和费用。
- 低算力或对延迟敏感:可下调到 10-15 以加快决策。
- 特殊场景(主题集中、库很小):下调有助于避免无意义的冗余候选。
## 如何修改
- 常量位置:`src/chat/emoji_system/emoji_constants.py` 中的 `MAX_EMOJI_FOR_PROMPT`
- 如需动态配置,可将其迁移到 `global_config.emoji` 下的配置项并在 `emoji_manager` 读取。
## 建议
- 调整后观察:替换决策耗时、模型费用、误删率(删除的表情是否被实际需要)。
- 如继续扩展表情库规模,建议为候选列表增加基于使用频次或时间的预筛选策略。

View File

@@ -0,0 +1,33 @@
# 表情系统重构说明
日期2025-12-15
## 目标
- 拆分单体的 `emoji_manager.py`,将实体、常量、文件工具解耦。
- 减少扫描/注册期间的事件循环阻塞。
- 保留现有行为LLM/VLM 流程、容量替换、缓存查找),同时提升可维护性。
## 新结构
- `src/chat/emoji_system/emoji_constants.py`:共享路径与提示/数量上限。
- `src/chat/emoji_system/emoji_entities.py``MaiEmoji`(哈希、格式检测、入库/删除、缓存失效)。
- `src/chat/emoji_system/emoji_utils.py`目录保证、临时清理、增量文件扫描、DB 行到实体转换。
- `src/chat/emoji_system/emoji_manager.py`负责完整性检查、扫描、注册、VLM/LLM 描述、替换与缓存,现委托给上述模块。
- `src/chat/emoji_system/README.md`:快速使用/生命周期指引。
## 行为变化
- 完整性检查改为游标+批量增量扫描,每处理 50 个让出一次事件循环。
- 循环内的重文件操作exists、listdir、remove、makedirs通过 `asyncio.to_thread` 释放主循环。
- 目录扫描使用 `os.scandir`(经 `list_image_files`),减少重复 stat并返回文件列表与是否为空。
- 快速查找:加载时重建 `_emoji_index`,增删时保持同步;`get_emoji_from_manager` 优先走索引。
- 注册与替换流程在更新索引的同时,异步清理失败/重复文件。
## 迁移提示
- 现有调用继续使用 `get_emoji_manager()``EmojiManager` API外部接口未改动。
- 如曾直接从 `emoji_manager` 引入常量或工具,请改为从 `emoji_constants``emoji_entities``emoji_utils` 引入。
- 依赖同步文件时序的测试/脚本可能观察到不同的耗时,但逻辑等价。
## 后续建议
1.`list_image_files``clean_unused_emojis`、完整性扫描游标行为补充单测。
2. 将 VLM/LLM 提示词模板外置为配置,便于迭代。
3. 暴露扫描耗时、清理数量、注册延迟等指标,便于观测。
4.`replace_a_emoji` 的 LLM 调用添加重试上限,并记录 prompt/决策日志以便审计。

View File

@@ -0,0 +1,37 @@
# 新表情系统概览
本目录存放表情包的采集、注册与选择逻辑。
## 模块
- `emoji_constants.py`:共享路径与数量上限。
- `emoji_entities.py``MaiEmoji` 实体,负责哈希/格式检测、数据库注册与删除。
- `emoji_utils.py`文件系统工具目录保证、临时清理、DB 行转换、文件列表扫描)。
- `emoji_manager.py`核心管理器定期扫描、完整性检查、VLM/LLM 标注、容量替换、缓存查找。
- `emoji_history.py`:按会话保存的内存历史。
## 生命周期
1. 通过 `EmojiManager.start()` 启动后台任务(或在已有事件循环中直接 await `start_periodic_check_register()`)。
2. 循环会加载数据库状态、做完整性清理、清理临时缓存,并扫描 `data/emoji` 中的新文件。
3. 新图片会生成哈希,调用 VLM/LLM 生成描述后注册入库,并移动到 `data/emoji_registed`
4. 达到容量上限时,`replace_a_emoji()` 可能在 LLM 协助下删除低使用量表情再注册新表情。
## 关键行为
- 完整性检查增量扫描,批量让出事件循环避免长阻塞。
- 循环内的文件操作使用 `asyncio.to_thread` 以保持事件循环可响应。
- 哈希索引 `_emoji_index` 加速内存查找;数据库为事实来源,内存为镜像。
- 描述与标签使用缓存(见管理器上的 `@cached`)。
## 常用操作
- `get_emoji_for_text(text_emotion)`:按目标情绪选取表情路径与描述。
- `record_usage(emoji_hash)`:累加使用次数。
- `delete_emoji(emoji_hash)`:删除文件与数据库记录并清缓存。
## 目录
- 待注册:`data/emoji`
- 已注册:`data/emoji_registed`
- 临时图片:`data/image`, `data/images`
## 说明
- 通过 `config/bot_config.toml``config/model_config.toml` 配置上限与模型。
- GIF 支持保留,注册前会提取关键帧再送 VLM。
- 避免直接使用 `Session`,请使用本模块提供的 API。

View File

@@ -0,0 +1,6 @@
import os
BASE_DIR = os.path.join("data")
EMOJI_DIR = os.path.join(BASE_DIR, "emoji")
EMOJI_REGISTERED_DIR = os.path.join(BASE_DIR, "emoji_registed")
MAX_EMOJI_FOR_PROMPT = 20

View File

@@ -0,0 +1,192 @@
import asyncio
import base64
import binascii
import hashlib
import io
import os
import time
import traceback
from PIL import Image
from src.chat.emoji_system.emoji_constants import EMOJI_REGISTERED_DIR
from src.chat.utils.utils_image import image_path_to_base64
from src.common.database.api.crud import CRUDBase
from src.common.database.compatibility import get_db_session
from src.common.database.core.models import Emoji
from src.common.database.optimization.cache_manager import get_cache
from src.common.database.utils.decorators import generate_cache_key
from src.common.logger import get_logger
logger = get_logger("emoji")
class MaiEmoji:
"""定义一个表情包"""
def __init__(self, full_path: str):
if not full_path:
raise ValueError("full_path cannot be empty")
self.full_path = full_path
self.path = os.path.dirname(full_path)
self.filename = os.path.basename(full_path)
self.embedding = []
self.hash = ""
self.description = ""
self.emotion: list[str] = []
self.usage_count = 0
self.last_used_time = time.time()
self.register_time = time.time()
self.is_deleted = False
self.format = ""
async def initialize_hash_format(self) -> bool | None:
"""从文件创建表情包实例, 计算哈希值和格式"""
try:
if not os.path.exists(self.full_path):
logger.error(f"[初始化错误] 表情包文件不存在: {self.full_path}")
self.is_deleted = True
return None
logger.debug(f"[初始化] 正在读取文件: {self.full_path}")
image_base64 = image_path_to_base64(self.full_path)
if image_base64 is None:
logger.error(f"[初始化错误] 无法读取或转换Base64: {self.full_path}")
self.is_deleted = True
return None
logger.debug(f"[初始化] 文件读取成功 (Base64预览: {image_base64[:50]}...)")
logger.debug(f"[初始化] 正在解码Base64并计算哈希: {self.filename}")
if isinstance(image_base64, str):
image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii")
image_bytes = base64.b64decode(image_base64)
self.hash = hashlib.md5(image_bytes).hexdigest()
logger.debug(f"[初始化] 哈希计算成功: {self.hash}")
logger.debug(f"[初始化] 正在使用Pillow获取格式: {self.filename}")
try:
with Image.open(io.BytesIO(image_bytes)) as img:
self.format = (img.format or "jpeg").lower()
logger.debug(f"[初始化] 格式获取成功: {self.format}")
except Exception as pil_error:
logger.error(f"[初始化错误] Pillow无法处理图片 ({self.filename}): {pil_error}")
logger.error(traceback.format_exc())
self.is_deleted = True
return None
return True
except FileNotFoundError:
logger.error(f"[初始化错误] 文件在处理过程中丢失: {self.full_path}")
self.is_deleted = True
return None
except (binascii.Error, ValueError) as b64_error:
logger.error(f"[初始化错误] Base64解码失败 ({self.filename}): {b64_error}")
self.is_deleted = True
return None
except Exception as e:
logger.error(f"[初始化错误] 初始化表情包时发生未预期错误 ({self.filename}): {e!s}")
logger.error(traceback.format_exc())
self.is_deleted = True
return None
async def register_to_db(self) -> bool:
"""注册表情包,将文件移动到注册目录并保存数据库"""
try:
source_full_path = self.full_path
destination_full_path = os.path.join(EMOJI_REGISTERED_DIR, self.filename)
if not await asyncio.to_thread(os.path.exists, source_full_path):
logger.error(f"[错误] 源文件不存在: {source_full_path}")
return False
try:
if await asyncio.to_thread(os.path.exists, destination_full_path):
await asyncio.to_thread(os.remove, destination_full_path)
await asyncio.to_thread(os.rename, source_full_path, destination_full_path)
logger.debug(f"[移动] 文件从 {source_full_path} 移动到 {destination_full_path}")
self.full_path = destination_full_path
self.path = EMOJI_REGISTERED_DIR
except Exception as move_error:
logger.error(f"[错误] 移动文件失败: {move_error!s}")
return False
try:
async with get_db_session() as session:
emotion_str = ",".join(self.emotion) if self.emotion else ""
emoji = Emoji(
emoji_hash=self.hash,
full_path=self.full_path,
format=self.format,
description=self.description,
emotion=emotion_str,
query_count=0,
is_registered=True,
is_banned=False,
record_time=self.register_time,
register_time=self.register_time,
usage_count=self.usage_count,
last_used_time=self.last_used_time,
)
session.add(emoji)
await session.commit()
logger.info(f"[注册] 表情包信息保存到数据库: {self.filename} ({self.emotion})")
return True
except Exception as db_error:
logger.error(f"[错误] 保存数据库失败 ({self.filename}): {db_error!s}")
return False
except Exception as e:
logger.error(f"[错误] 注册表情包失败 ({self.filename}): {e!s}")
logger.error(traceback.format_exc())
return False
async def delete(self) -> bool:
"""删除表情包文件及数据库记录"""
try:
file_to_delete = self.full_path
if await asyncio.to_thread(os.path.exists, file_to_delete):
try:
await asyncio.to_thread(os.remove, file_to_delete)
logger.debug(f"[删除] 文件: {file_to_delete}")
except Exception as e:
logger.error(f"[错误] 删除文件失败 {file_to_delete}: {e!s}")
try:
crud = CRUDBase(Emoji)
will_delete_emoji = await crud.get_by(emoji_hash=self.hash)
if will_delete_emoji is None:
logger.warning(f"[删除] 数据库中未找到哈希值为 {self.hash} 的表情包记录。")
result = 0
else:
await crud.delete(will_delete_emoji.id)
result = 1
cache = await get_cache()
await cache.delete(generate_cache_key("emoji_by_hash", self.hash))
await cache.delete(generate_cache_key("emoji_description", self.hash))
await cache.delete(generate_cache_key("emoji_tag", self.hash))
except Exception as e:
logger.error(f"[错误] 删除数据库记录时出错: {e!s}")
result = 0
if result > 0:
logger.info(f"[删除] 表情包数据库记录 {self.filename} (Hash: {self.hash})")
self.is_deleted = True
return True
if not os.path.exists(file_to_delete):
logger.warning(
f"[警告] 表情包文件 {file_to_delete} 已删除,但数据库记录删除失败 (Hash: {self.hash})"
)
else:
logger.error(f"[错误] 删除表情包数据库记录失败: {self.hash}")
return False
except Exception as e:
logger.error(f"[错误] 删除表情包失败 ({self.filename}): {e!s}")
return False

View File

@@ -1,6 +1,5 @@
import asyncio
import base64
import binascii
import hashlib
import io
import json
@@ -16,6 +15,16 @@ from PIL import Image
from rich.traceback import install
from sqlalchemy import select
from src.chat.emoji_system.emoji_constants import EMOJI_DIR, EMOJI_REGISTERED_DIR, MAX_EMOJI_FOR_PROMPT
from src.chat.emoji_system.emoji_entities import MaiEmoji
from src.chat.emoji_system.emoji_utils import (
_emoji_objects_to_readable_list,
_to_emoji_objects,
_ensure_emoji_dir,
clear_temp_emoji,
clean_unused_emojis,
list_image_files,
)
from src.chat.utils.utils_image import get_image_manager, image_path_to_base64
from src.common.database.api.crud import CRUDBase
from src.common.database.compatibility import get_db_session
@@ -23,364 +32,6 @@ from src.common.database.core.models import Emoji, Images
from src.common.database.utils.decorators import cached
from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.llm_models.utils_model import LLMRequest
install(extra_lines=3)
logger = get_logger("emoji")
BASE_DIR = os.path.join("data")
EMOJI_DIR = os.path.join(BASE_DIR, "emoji") # 表情包存储目录
EMOJI_REGISTERED_DIR = os.path.join(BASE_DIR, "emoji_registed") # 已注册的表情包注册目录
MAX_EMOJI_FOR_PROMPT = 20 # 最大允许的表情包描述数量于图片替换的 prompt 中
"""
还没经过测试,有些地方数据库和内存数据同步可能不完全
"""
class MaiEmoji:
"""定义一个表情包"""
def __init__(self, full_path: str):
if not full_path:
raise ValueError("full_path cannot be empty")
self.full_path = full_path # 文件的完整路径 (包括文件名)
self.path = os.path.dirname(full_path) # 文件所在的目录路径
self.filename = os.path.basename(full_path) # 文件名
self.embedding = []
self.hash = "" # 初始为空,在创建实例时会计算
self.description = ""
self.emotion: list[str] = []
self.usage_count = 0
self.last_used_time = time.time()
self.register_time = time.time()
self.is_deleted = False # 标记是否已被删除
self.format = ""
async def initialize_hash_format(self) -> bool | None:
"""从文件创建表情包实例, 计算哈希值和格式"""
try:
# 使用 full_path 检查文件是否存在
if not os.path.exists(self.full_path):
logger.error(f"[初始化错误] 表情包文件不存在: {self.full_path}")
self.is_deleted = True
return None
# 使用 full_path 读取文件
logger.debug(f"[初始化] 正在读取文件: {self.full_path}")
image_base64 = image_path_to_base64(self.full_path)
if image_base64 is None:
logger.error(f"[初始化错误] 无法读取或转换Base64: {self.full_path}")
self.is_deleted = True
return None
logger.debug(f"[初始化] 文件读取成功 (Base64预览: {image_base64[:50]}...)")
# 计算哈希值
logger.debug(f"[初始化] 正在解码Base64并计算哈希: {self.filename}")
# 确保base64字符串只包含ASCII字符
if isinstance(image_base64, str):
image_base64 = image_base64.encode("ascii", errors="ignore").decode("ascii")
image_bytes = base64.b64decode(image_base64)
self.hash = hashlib.md5(image_bytes).hexdigest()
logger.debug(f"[初始化] 哈希计算成功: {self.hash}")
# 获取图片格式
logger.debug(f"[初始化] 正在使用Pillow获取格式: {self.filename}")
try:
with Image.open(io.BytesIO(image_bytes)) as img:
self.format = (img.format or "jpeg").lower()
logger.debug(f"[初始化] 格式获取成功: {self.format}")
except Exception as pil_error:
logger.error(f"[初始化错误] Pillow无法处理图片 ({self.filename}): {pil_error}")
logger.error(traceback.format_exc())
self.is_deleted = True
return None
# 如果所有步骤成功,返回 True
return True
except FileNotFoundError:
logger.error(f"[初始化错误] 文件在处理过程中丢失: {self.full_path}")
self.is_deleted = True
return None
except (binascii.Error, ValueError) as b64_error:
logger.error(f"[初始化错误] Base64解码失败 ({self.filename}): {b64_error}")
self.is_deleted = True
return None
except Exception as e:
logger.error(f"[初始化错误] 初始化表情包时发生未预期错误 ({self.filename}): {e!s}")
logger.error(traceback.format_exc())
self.is_deleted = True
return None
async def register_to_db(self) -> bool:
"""
注册表情包
将表情包对应的文件从当前路径移动到EMOJI_REGISTERED_DIR目录下
并修改对应的实例属性,然后将表情包信息保存到数据库中
"""
try:
# 确保目标目录存在
# 源路径是当前实例的完整路径 self.full_path
source_full_path = self.full_path
# 目标完整路径
destination_full_path = os.path.join(EMOJI_REGISTERED_DIR, self.filename)
# 检查源文件是否存在
if not os.path.exists(source_full_path):
logger.error(f"[错误] 源文件不存在: {source_full_path}")
return False
# --- 文件移动 ---
try:
# 如果目标文件已存在,先删除 (确保移动成功)
if os.path.exists(destination_full_path):
os.remove(destination_full_path)
os.rename(source_full_path, destination_full_path)
logger.debug(f"[移动] 文件从 {source_full_path} 移动到 {destination_full_path}")
# 更新实例的路径属性为新路径
self.full_path = destination_full_path
self.path = EMOJI_REGISTERED_DIR
# self.filename 保持不变
except Exception as move_error:
logger.error(f"[错误] 移动文件失败: {move_error!s}")
# 如果移动失败,尝试将实例状态恢复?暂时不处理,仅返回失败
return False
# --- 数据库操作 ---
try:
# 准备数据库记录 for emoji collection
async with get_db_session() as session:
emotion_str = ",".join(self.emotion) if self.emotion else ""
emoji = Emoji(
emoji_hash=self.hash,
full_path=self.full_path,
format=self.format,
description=self.description,
emotion=emotion_str, # Store as comma-separated string
query_count=0, # Default value
is_registered=True,
is_banned=False, # Default value
record_time=self.register_time, # Use MaiEmoji's register_time for DB record_time
register_time=self.register_time,
usage_count=self.usage_count,
last_used_time=self.last_used_time,
)
session.add(emoji)
await session.commit()
logger.info(f"[注册] 表情包信息保存到数据库: {self.filename} ({self.emotion})")
return True
except Exception as db_error:
logger.error(f"[错误] 保存数据库失败 ({self.filename}): {db_error!s}")
return False
except Exception as e:
logger.error(f"[错误] 注册表情包失败 ({self.filename}): {e!s}")
logger.error(traceback.format_exc())
return False
async def delete(self) -> bool:
"""删除表情包
删除表情包的文件和数据库记录
返回:
bool: 是否成功删除
"""
try:
# 1. 删除文件
file_to_delete = self.full_path
if os.path.exists(file_to_delete):
try:
os.remove(file_to_delete)
logger.debug(f"[删除] 文件: {file_to_delete}")
except Exception as e:
logger.error(f"[错误] 删除文件失败 {file_to_delete}: {e!s}")
# 文件删除失败,但仍然尝试删除数据库记录
# 2. 删除数据库记录
try:
# 使用CRUD进行删除
crud = CRUDBase(Emoji)
will_delete_emoji = await crud.get_by(emoji_hash=self.hash)
if will_delete_emoji is None:
logger.warning(f"[删除] 数据库中未找到哈希值为 {self.hash} 的表情包记录。")
result = 0 # Indicate no DB record was deleted
else:
await crud.delete(will_delete_emoji.id)
result = 1 # Successfully deleted one record
# 使缓存失效
from src.common.database.optimization.cache_manager import get_cache
from src.common.database.utils.decorators import generate_cache_key
cache = await get_cache()
await cache.delete(generate_cache_key("emoji_by_hash", self.hash))
await cache.delete(generate_cache_key("emoji_description", self.hash))
await cache.delete(generate_cache_key("emoji_tag", self.hash))
except Exception as e:
logger.error(f"[错误] 删除数据库记录时出错: {e!s}")
result = 0
if result > 0:
logger.info(f"[删除] 表情包数据库记录 {self.filename} (Hash: {self.hash})")
# 3. 标记对象已被删除
self.is_deleted = True
return True
else:
# 如果数据库记录删除失败,但文件可能已删除,记录一个警告
if not os.path.exists(file_to_delete):
logger.warning(
f"[警告] 表情包文件 {file_to_delete} 已删除,但数据库记录删除失败 (Hash: {self.hash})"
)
else:
logger.error(f"[错误] 删除表情包数据库记录失败: {self.hash}")
return False
except Exception as e:
logger.error(f"[错误] 删除表情包失败 ({self.filename}): {e!s}")
return False
def _emoji_objects_to_readable_list(emoji_objects: list["MaiEmoji"]) -> list[str]:
"""将表情包对象列表转换为可读的字符串列表
参数:
emoji_objects: MaiEmoji对象列表
返回:
list[str]: 可读的表情包信息字符串列表
"""
emoji_info_list = []
for i, emoji in enumerate(emoji_objects):
# 转换时间戳为可读时间
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(emoji.register_time))
# 构建每个表情包的信息字符串
emoji_info = f"编号: {i + 1}\n描述: {emoji.description}\n使用次数: {emoji.usage_count}\n添加时间: {time_str}\n"
emoji_info_list.append(emoji_info)
return emoji_info_list
def _to_emoji_objects(data: Any) -> tuple[list["MaiEmoji"], int]:
emoji_objects = []
load_errors = 0
emoji_data_list = list(data)
for emoji_data in emoji_data_list: # emoji_data is an Emoji model instance
full_path = emoji_data.full_path
if not full_path:
logger.warning(
f"[加载错误] 数据库记录缺少 'full_path' 字段: ID {emoji_data.id if hasattr(emoji_data, 'id') else 'Unknown'}"
)
load_errors += 1
continue
try:
emoji = MaiEmoji(full_path=full_path)
emoji.hash = emoji_data.emoji_hash
if not emoji.hash:
logger.warning(f"[加载错误] 数据库记录缺少 'hash' 字段: {full_path}")
load_errors += 1
continue
emoji.description = emoji_data.description
# Deserialize emotion string from DB to list
emoji.emotion = emoji_data.emotion.split(",") if emoji_data.emotion else []
emoji.usage_count = emoji_data.usage_count
db_last_used_time = emoji_data.last_used_time
db_register_time = emoji_data.register_time
# If last_used_time from DB is None, use MaiEmoji's initialized register_time or current time
emoji.last_used_time = db_last_used_time if db_last_used_time is not None else emoji.register_time
# If register_time from DB is None, use MaiEmoji's initialized register_time (which is time.time())
emoji.register_time = db_register_time if db_register_time is not None else emoji.register_time
emoji.format = emoji_data.format
emoji_objects.append(emoji)
except ValueError as ve:
logger.error(f"[加载错误] 初始化 MaiEmoji 失败 ({full_path}): {ve}")
load_errors += 1
except Exception as e:
logger.error(f"[加载错误] 处理数据库记录时出错 ({full_path}): {e!s}")
load_errors += 1
return emoji_objects, load_errors
def _ensure_emoji_dir() -> None:
"""确保表情存储目录存在"""
os.makedirs(EMOJI_DIR, exist_ok=True)
os.makedirs(EMOJI_REGISTERED_DIR, exist_ok=True)
async def clear_temp_emoji() -> None:
"""清理临时表情包
清理/data/emoji、/data/image和/data/images目录下的所有文件
当目录中文件数超过100时会全部删除
"""
logger.info("[清理] 开始清理缓存...")
for need_clear in (
os.path.join(BASE_DIR, "emoji"),
os.path.join(BASE_DIR, "image"),
os.path.join(BASE_DIR, "images"),
):
if os.path.exists(need_clear):
files = os.listdir(need_clear)
# 如果文件数超过1000就全部删除
if len(files) > 1000:
for filename in files:
file_path = os.path.join(need_clear, filename)
if os.path.isfile(file_path):
os.remove(file_path)
logger.debug(f"[清理] 删除: {filename}")
async def clean_unused_emojis(emoji_dir: str, emoji_objects: list["MaiEmoji"], removed_count: int) -> int:
"""清理指定目录中未被 emoji_objects 追踪的表情包文件"""
if not os.path.exists(emoji_dir):
logger.warning(f"[清理] 目标目录不存在,跳过清理: {emoji_dir}")
return removed_count
cleaned_count = 0
try:
# 获取内存中所有有效表情包的完整路径集合
tracked_full_paths = {emoji.full_path for emoji in emoji_objects if not emoji.is_deleted}
# 遍历指定目录中的所有文件
for file_name in os.listdir(emoji_dir):
file_full_path = os.path.join(emoji_dir, file_name)
# 确保处理的是文件而不是子目录
if not os.path.isfile(file_full_path):
continue
# 如果文件不在被追踪的集合中,则删除
if file_full_path not in tracked_full_paths:
try:
os.remove(file_full_path)
logger.info(f"[清理] 删除未追踪的表情包文件: {file_full_path}")
cleaned_count += 1
except Exception as e:
logger.error(f"[错误] 删除文件时出错 ({file_full_path}): {e!s}")
if cleaned_count > 0:
logger.info(f"[清理] 在目录 {emoji_dir} 中清理了 {cleaned_count} 个破损表情包。")
else:
logger.info(f"[清理] 目录 {emoji_dir} 中没有需要清理的。")
except Exception as e:
logger.error(f"[错误] 清理未使用表情包文件时出错 ({emoji_dir}): {e!s}")
return removed_count + cleaned_count
@@ -401,6 +52,10 @@ class EmojiManager:
return # 如果已经初始化过,直接返回
self._scan_task = None
self._emoji_index: dict[str, MaiEmoji] = {}
self._integrity_yield_every = 50
self._integrity_cursor = 0
self._integrity_batch_size = 500
if model_config is None:
raise RuntimeError("Model config is not initialized")
@@ -568,34 +223,40 @@ class EmojiManager:
如果文件已被删除,则执行对象的删除方法并从列表中移除
"""
try:
# if not self.emoji_objects:
# logger.warning("[检查] emoji_objects为空跳过完整性检查")
# return
total_count = len(self.emoji_objects)
self.emoji_num = total_count
removed_count = 0
# 使用列表复制进行遍历,因为我们会在遍历过程中修改列表
objects_to_remove = []
for emoji in self.emoji_objects:
if total_count == 0:
return
start = self._integrity_cursor % total_count
end = min(start + self._integrity_batch_size, total_count)
indices: list[int] = list(range(start, end))
if end - start < self._integrity_batch_size and total_count > 0:
wrap_rest = self._integrity_batch_size - (end - start)
if wrap_rest > 0:
indices.extend(range(0, min(wrap_rest, total_count)))
objects_to_remove: list[MaiEmoji] = []
processed = 0
for idx in indices:
if idx >= len(self.emoji_objects):
break
emoji = self.emoji_objects[idx]
try:
# 跳过已经标记为删除的,避免重复处理
if emoji.is_deleted:
objects_to_remove.append(emoji) # 收集起来一次性移除
objects_to_remove.append(emoji)
continue
# 检查文件是否存在
if not os.path.exists(emoji.full_path):
exists = await asyncio.to_thread(os.path.exists, emoji.full_path)
if not exists:
logger.warning(f"[检查] 表情包文件丢失: {emoji.full_path}")
# 执行表情包对象的删除方法
await emoji.delete() # delete 方法现在会标记 is_deleted
objects_to_remove.append(emoji) # 标记删除后,也收集起来移除
# 更新计数
await emoji.delete()
objects_to_remove.append(emoji)
self.emoji_num -= 1
removed_count += 1
continue
# 检查描述是否为空 (如果为空也视为无效)
if not emoji.description:
logger.warning(f"[检查] 表情包描述为空,视为无效: {emoji.filename}")
await emoji.delete()
@@ -604,19 +265,24 @@ class EmojiManager:
removed_count += 1
continue
processed += 1
if processed % self._integrity_yield_every == 0:
await asyncio.sleep(0)
except Exception as item_error:
logger.error(f"[错误] 处理表情包记录时出错 ({emoji.filename}): {item_error!s}")
# 即使出错,也尝试继续检查下一个
continue
# 从 self.emoji_objects 中移除标记的对象
if objects_to_remove:
self.emoji_objects = [e for e in self.emoji_objects if e not in objects_to_remove]
for e in objects_to_remove:
if e.hash in self._emoji_index:
self._emoji_index.pop(e.hash, None)
self._integrity_cursor = (start + processed) % max(1, len(self.emoji_objects))
# 清理 EMOJI_REGISTERED_DIR 目录中未被追踪的文件
removed_count = await clean_unused_emojis(EMOJI_REGISTERED_DIR, self.emoji_objects, removed_count)
# 输出清理结果
if removed_count > 0:
logger.info(f"[清理] 已清理 {removed_count} 个失效/文件丢失的表情包记录")
logger.info(f"[统计] 清理前记录数: {total_count} | 清理后有效记录数: {len(self.emoji_objects)}")
@@ -639,36 +305,30 @@ class EmojiManager:
logger.info("[扫描] 开始扫描新表情包...")
# 检查表情包目录是否存在
if not os.path.exists(EMOJI_DIR):
if not await asyncio.to_thread(os.path.exists, EMOJI_DIR):
logger.warning(f"[警告] 表情包目录不存在: {EMOJI_DIR}")
os.makedirs(EMOJI_DIR, exist_ok=True)
await asyncio.to_thread(os.makedirs, EMOJI_DIR, True)
logger.info(f"[创建] 已创建表情包目录: {EMOJI_DIR}")
await asyncio.sleep(global_config.emoji.check_interval * 60)
continue
# 检查目录是否为空
files = os.listdir(EMOJI_DIR)
if not files:
image_files, is_empty = await list_image_files(EMOJI_DIR)
if is_empty:
logger.warning(f"[警告] 表情包目录为空: {EMOJI_DIR}")
await asyncio.sleep(global_config.emoji.check_interval * 60)
continue
if not image_files:
await asyncio.sleep(global_config.emoji.check_interval * 60)
continue
# 无论steal_emoji是否开启都检查emoji文件夹以支持手动注册
# 只有在需要腾出空间或填充表情库时,才真正执行注册
if (self.emoji_num > self.emoji_num_max and global_config.emoji.do_replace) or (
self.emoji_num < self.emoji_num_max
):
try:
# 获取目录下所有图片文件
files_to_process = [
f
for f in files
if os.path.isfile(os.path.join(EMOJI_DIR, f))
and f.lower().endswith((".jpg", ".jpeg", ".png", ".gif"))
]
# 处理每个符合条件的文件
for filename in files_to_process:
for filename in image_files:
# 尝试注册表情包
success = await self.register_emoji_by_filename(filename)
if success:
@@ -677,8 +337,9 @@ class EmojiManager:
# 注册失败则删除对应文件
file_path = os.path.join(EMOJI_DIR, filename)
os.remove(file_path)
await asyncio.to_thread(os.remove, file_path)
logger.warning(f"[清理] 删除注册失败的表情包文件: {filename}")
await asyncio.sleep(0)
except Exception as e:
logger.error(f"[错误] 扫描表情包目录失败: {e!s}")
@@ -698,6 +359,7 @@ class EmojiManager:
# 更新内存中的列表和数量
self.emoji_objects = emoji_objects
self.emoji_num = len(emoji_objects)
self._emoji_index = {e.hash: e for e in emoji_objects if getattr(e, "hash", None)}
logger.info(f"[数据库] 加载完成: 共加载 {self.emoji_num} 个表情包记录。")
if load_errors > 0:
@@ -753,11 +415,15 @@ class EmojiManager:
返回:
MaiEmoji 或 None: 如果找到则返回 MaiEmoji 对象,否则返回 None
"""
for emoji in self.emoji_objects:
# 确保对象未被标记为删除且哈希值匹配
if not emoji.is_deleted and emoji.hash == emoji_hash:
return emoji
return None # 如果循环结束还没找到,则返回 None
emoji = self._emoji_index.get(emoji_hash)
if emoji and not emoji.is_deleted:
return emoji
for item in self.emoji_objects:
if not item.is_deleted and item.hash == emoji_hash:
self._emoji_index[emoji_hash] = item
return item
return None
@cached(ttl=1800, key_prefix="emoji_tag") # 缓存30分钟
async def get_emoji_tag_by_hash(self, emoji_hash: str) -> str | None:
@@ -849,6 +515,7 @@ class EmojiManager:
if success:
# 从emoji_objects列表中移除该对象
self.emoji_objects = [e for e in self.emoji_objects if e.hash != emoji_hash]
self._emoji_index.pop(emoji_hash, None)
# 更新计数
self.emoji_num -= 1
logger.info(f"[统计] 当前表情包数量: {self.emoji_num}")
@@ -931,6 +598,7 @@ class EmojiManager:
register_success = await new_emoji.register_to_db()
if register_success:
self.emoji_objects.append(new_emoji)
self._emoji_index[new_emoji.hash] = new_emoji
self.emoji_num += 1
logger.info(f"[成功] 注册: {new_emoji.filename}")
return True
@@ -1099,7 +767,7 @@ class EmojiManager:
bool: 注册是否成功
"""
file_full_path = os.path.join(EMOJI_DIR, filename)
if not os.path.exists(file_full_path):
if not await asyncio.to_thread(os.path.exists, file_full_path):
logger.error(f"[注册失败] 文件不存在: {file_full_path}")
return False
@@ -1117,7 +785,7 @@ class EmojiManager:
logger.warning(f"[注册跳过] 表情包已存在 (Hash: {new_emoji.hash}): {filename}")
# 删除重复的源文件
try:
os.remove(file_full_path)
await asyncio.to_thread(os.remove, file_full_path)
logger.info(f"[清理] 删除重复的待注册文件: {filename}")
except Exception as e:
logger.error(f"[错误] 删除重复文件失败: {e!s}")
@@ -1137,7 +805,7 @@ class EmojiManager:
logger.warning(f"[注册失败] 未能生成有效描述或审核未通过: {filename}")
# 删除未能生成描述的文件
try:
os.remove(file_full_path)
await asyncio.to_thread(os.remove, file_full_path)
logger.info(f"[清理] 删除描述生成失败的文件: {filename}")
except Exception as e:
logger.error(f"[错误] 删除描述生成失败文件时出错: {e!s}")
@@ -1149,7 +817,7 @@ class EmojiManager:
logger.error(f"[注册失败] 生成描述/情感时出错 ({filename}): {build_desc_error}")
# 同样考虑删除文件
try:
os.remove(file_full_path)
await asyncio.to_thread(os.remove, file_full_path)
logger.info(f"[清理] 删除描述生成异常的文件: {filename}")
except Exception as e:
logger.error(f"[错误] 删除描述生成异常文件时出错: {e!s}")
@@ -1163,7 +831,7 @@ class EmojiManager:
logger.error("[注册失败] 替换表情包失败,无法完成注册")
# 替换失败,删除新表情包文件
try:
os.remove(file_full_path) # new_emoji 的 full_path 此时还是源路径
await asyncio.to_thread(os.remove, file_full_path) # new_emoji 的 full_path 此时还是源路径
logger.info(f"[清理] 删除替换失败的新表情文件: {filename}")
except Exception as e:
logger.error(f"[错误] 删除替换失败文件时出错: {e!s}")
@@ -1176,6 +844,7 @@ class EmojiManager:
if register_success:
# 注册成功后,添加到内存列表
self.emoji_objects.append(new_emoji)
self._emoji_index[new_emoji.hash] = new_emoji
self.emoji_num += 1
logger.info(f"[成功] 注册新表情包: {filename} (当前: {self.emoji_num}/{self.emoji_num_max})")
return True
@@ -1183,9 +852,9 @@ class EmojiManager:
logger.error(f"[注册失败] 保存表情包到数据库/移动文件失败: {filename}")
# register_to_db 失败时,内部会尝试清理移动后的文件,源文件可能还在
# 是否需要删除源文件?
if os.path.exists(file_full_path):
if await asyncio.to_thread(os.path.exists, file_full_path):
try:
os.remove(file_full_path)
await asyncio.to_thread(os.remove, file_full_path)
logger.info(f"[清理] 删除注册失败的源文件: {filename}")
except Exception as e:
logger.error(f"[错误] 删除注册失败源文件时出错: {e!s}")
@@ -1195,9 +864,9 @@ class EmojiManager:
logger.error(f"[错误] 注册表情包时发生未预期错误 ({filename}): {e!s}")
logger.error(traceback.format_exc())
# 尝试删除源文件以避免循环处理
if os.path.exists(file_full_path):
if await asyncio.to_thread(os.path.exists, file_full_path):
try:
os.remove(file_full_path)
await asyncio.to_thread(os.remove, file_full_path)
logger.info(f"[清理] 删除处理异常的源文件: {filename}")
except Exception as remove_error:
logger.error(f"[错误] 删除异常处理文件时出错: {remove_error}")

View File

@@ -0,0 +1,140 @@
import asyncio
import os
import time
from typing import Any
from src.chat.emoji_system.emoji_constants import BASE_DIR, EMOJI_DIR, EMOJI_REGISTERED_DIR
from src.chat.emoji_system.emoji_entities import MaiEmoji
from src.common.logger import get_logger
logger = get_logger("emoji")
def _emoji_objects_to_readable_list(emoji_objects: list[MaiEmoji]) -> list[str]:
emoji_info_list = []
for i, emoji in enumerate(emoji_objects):
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(emoji.register_time))
emoji_info = f"编号: {i + 1}\n描述: {emoji.description}\n使用次数: {emoji.usage_count}\n添加时间: {time_str}\n"
emoji_info_list.append(emoji_info)
return emoji_info_list
def _to_emoji_objects(data: Any) -> tuple[list[MaiEmoji], int]:
emoji_objects = []
load_errors = 0
emoji_data_list = list(data)
for emoji_data in emoji_data_list:
full_path = emoji_data.full_path
if not full_path:
logger.warning(
f"[加载错误] 数据库记录缺少 'full_path' 字段: ID {emoji_data.id if hasattr(emoji_data, 'id') else 'Unknown'}"
)
load_errors += 1
continue
try:
emoji = MaiEmoji(full_path=full_path)
emoji.hash = emoji_data.emoji_hash
if not emoji.hash:
logger.warning(f"[加载错误] 数据库记录缺少 'hash' 字段: {full_path}")
load_errors += 1
continue
emoji.description = emoji_data.description
emoji.emotion = emoji_data.emotion.split(",") if emoji_data.emotion else []
emoji.usage_count = emoji_data.usage_count
db_last_used_time = emoji_data.last_used_time
db_register_time = emoji_data.register_time
emoji.last_used_time = db_last_used_time if db_last_used_time is not None else emoji.register_time
emoji.register_time = db_register_time if db_register_time is not None else emoji.register_time
emoji.format = emoji_data.format
emoji_objects.append(emoji)
except ValueError as ve:
logger.error(f"[加载错误] 初始化 MaiEmoji 失败 ({full_path}): {ve}")
load_errors += 1
except Exception as e:
logger.error(f"[加载错误] 处理数据库记录时出错 ({full_path}): {e!s}")
load_errors += 1
return emoji_objects, load_errors
def _ensure_emoji_dir() -> None:
os.makedirs(EMOJI_DIR, exist_ok=True)
os.makedirs(EMOJI_REGISTERED_DIR, exist_ok=True)
async def clear_temp_emoji() -> None:
logger.info("[清理] 开始清理缓存...")
for need_clear in (
os.path.join(BASE_DIR, "emoji"),
os.path.join(BASE_DIR, "image"),
os.path.join(BASE_DIR, "images"),
):
if await asyncio.to_thread(os.path.exists, need_clear):
files = await asyncio.to_thread(os.listdir, need_clear)
if len(files) > 1000:
for i, filename in enumerate(files):
file_path = os.path.join(need_clear, filename)
if await asyncio.to_thread(os.path.isfile, file_path):
try:
await asyncio.to_thread(os.remove, file_path)
logger.debug(f"[清理] 删除: {filename}")
except Exception as e:
logger.debug(f"[清理] 删除失败 {filename}: {e!s}")
if (i + 1) % 100 == 0:
await asyncio.sleep(0)
async def clean_unused_emojis(emoji_dir: str, emoji_objects: list[MaiEmoji], removed_count: int) -> int:
if not await asyncio.to_thread(os.path.exists, emoji_dir):
logger.warning(f"[清理] 目标目录不存在,跳过清理: {emoji_dir}")
return removed_count
cleaned_count = 0
try:
tracked_full_paths = {emoji.full_path for emoji in emoji_objects if not emoji.is_deleted}
for entry in await asyncio.to_thread(lambda: list(os.scandir(emoji_dir))):
if not entry.is_file():
continue
file_full_path = entry.path
if file_full_path not in tracked_full_paths:
try:
await asyncio.to_thread(os.remove, file_full_path)
logger.info(f"[清理] 删除未追踪的表情包文件: {file_full_path}")
cleaned_count += 1
except Exception as e:
logger.error(f"[错误] 删除文件时出错 ({file_full_path}): {e!s}")
if cleaned_count > 0:
logger.info(f"[清理] 在目录 {emoji_dir} 中清理了 {cleaned_count} 个破损表情包。")
else:
logger.info(f"[清理] 目录 {emoji_dir} 中没有需要清理的。")
except Exception as e:
logger.error(f"[错误] 清理未使用表情包文件时出错 ({emoji_dir}): {e!s}")
return removed_count + cleaned_count
async def list_image_files(directory: str) -> tuple[list[str], bool]:
def _scan() -> tuple[list[str], bool]:
entries = list(os.scandir(directory))
files = [
entry.name
for entry in entries
if entry.is_file() and entry.name.lower().endswith((".jpg", ".jpeg", ".png", ".gif"))
]
return files, len(entries) == 0
return await asyncio.to_thread(_scan)