feat:新的表情包系统,表情包成为类,且含义更丰富,发送更快
This commit is contained in:
@@ -1,71 +0,0 @@
|
||||
{
|
||||
"测试时间": "2025-04-24 13:22:36",
|
||||
"测试迭代次数": 3,
|
||||
"不使用工具调用": {
|
||||
"平均耗时": 3.1020479996999106,
|
||||
"最短耗时": 2.980656862258911,
|
||||
"最长耗时": 3.2487313747406006,
|
||||
"标准差": 0.13581516492157006,
|
||||
"所有耗时": [
|
||||
2.98,
|
||||
3.08,
|
||||
3.25
|
||||
]
|
||||
},
|
||||
"不使用工具调用_详细响应": [
|
||||
{
|
||||
"内容摘要": "那个猫猫头表情包真的太可爱了,墨墨发的表情包也好萌,感觉可以分享一下我收藏的猫猫头系列",
|
||||
"推理内容摘要": ""
|
||||
},
|
||||
{
|
||||
"内容摘要": "那个猫猫头表情包确实很魔性,我存了好多张,每次看到都觉得特别治愈。墨墨好像也喜欢这种可爱的表情包,可以分享一下我收藏的。",
|
||||
"推理内容摘要": ""
|
||||
},
|
||||
{
|
||||
"内容摘要": "那个猫猫头表情包真的超可爱,我存了好多张,每次看到都会忍不住笑出来。墨墨发的表情包也好萌,感觉可以和大家分享一下我收藏的猫猫头。\n\n工具:无",
|
||||
"推理内容摘要": ""
|
||||
}
|
||||
],
|
||||
"使用工具调用": {
|
||||
"平均耗时": 7.927528937657674,
|
||||
"最短耗时": 5.714647531509399,
|
||||
"最长耗时": 11.046205997467041,
|
||||
"标准差": 2.778799784731646,
|
||||
"所有耗时": [
|
||||
7.02,
|
||||
11.05,
|
||||
5.71
|
||||
]
|
||||
},
|
||||
"使用工具调用_详细响应": [
|
||||
{
|
||||
"内容摘要": "这个猫猫头表情包确实挺有意思的,不过他们好像还在讨论版本问题。小千石在问3.8和3.11谁大,这挺简单的。",
|
||||
"推理内容摘要": "",
|
||||
"工具调用数量": 1,
|
||||
"工具调用详情": [
|
||||
{
|
||||
"工具名称": "compare_numbers",
|
||||
"参数": "{\"num1\":3.8,\"num2\":3.11}"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"内容摘要": "3.8和3.11谁大这个问题有点突然,不过可以简单比较一下。可能小千石在测试我或者真的想知道答案。现在群里的话题有点分散,既有技术讨论又有表情包的话题,我还是先回答数字比较的问题好了,毕竟比较直接。",
|
||||
"推理内容摘要": "",
|
||||
"工具调用数量": 1,
|
||||
"工具调用详情": [
|
||||
{
|
||||
"工具名称": "compare_numbers",
|
||||
"参数": "{\"num1\":3.8,\"num2\":3.11}"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"内容摘要": "他们还在纠结调试消息的事儿,不过好像讨论得差不多了。猫猫头表情包确实挺有意思的,但感觉聊得有点散了哦。小千石问3.8和3.11谁大,这个问题可以回答一下。",
|
||||
"推理内容摘要": "",
|
||||
"工具调用数量": 0,
|
||||
"工具调用详情": []
|
||||
}
|
||||
],
|
||||
"差异百分比": 155.56
|
||||
}
|
||||
@@ -464,7 +464,7 @@ EMOJI_STYLE_CONFIG = {
|
||||
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 表情 | {message}",
|
||||
},
|
||||
"simple": {
|
||||
"console_format": "<level>{time:MM-DD HH:mm}</level> | <light-blue>表情 | {message} </light-blue>", # noqa: E501
|
||||
"console_format": "<level>{time:MM-DD HH:mm}</level> | <yellow>表情 | {message} </yellow>", # noqa: E501
|
||||
"file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 表情 | {message}",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -222,7 +222,11 @@ class BotConfig:
|
||||
max_reach_deletion: bool = True # 开启则在达到最大数量时删除表情包,关闭则不会继续收集表情包
|
||||
EMOJI_CHECK_INTERVAL: int = 120 # 表情包检查间隔(分钟)
|
||||
EMOJI_REGISTER_INTERVAL: int = 10 # 表情包注册间隔(分钟)
|
||||
EMOJI_SAVE: bool = True # 偷表情包
|
||||
|
||||
save_pic: bool = False # 是否保存图片
|
||||
save_emoji: bool = False # 是否保存表情包
|
||||
steal_emoji: bool = True # 是否偷取表情包,让麦麦可以发送她保存的这些表情包
|
||||
|
||||
EMOJI_CHECK: bool = False # 是否开启过滤
|
||||
EMOJI_CHECK_PROMPT: str = "符合公序良俗" # 表情包过滤要求
|
||||
|
||||
@@ -392,12 +396,16 @@ class BotConfig:
|
||||
config.EMOJI_CHECK_INTERVAL = emoji_config.get("check_interval", config.EMOJI_CHECK_INTERVAL)
|
||||
config.EMOJI_REGISTER_INTERVAL = emoji_config.get("register_interval", config.EMOJI_REGISTER_INTERVAL)
|
||||
config.EMOJI_CHECK_PROMPT = emoji_config.get("check_prompt", config.EMOJI_CHECK_PROMPT)
|
||||
config.EMOJI_SAVE = emoji_config.get("auto_save", config.EMOJI_SAVE)
|
||||
config.EMOJI_CHECK = emoji_config.get("enable_check", config.EMOJI_CHECK)
|
||||
if config.INNER_VERSION in SpecifierSet(">=1.1.1"):
|
||||
config.max_emoji_num = emoji_config.get("max_emoji_num", config.max_emoji_num)
|
||||
config.max_reach_deletion = emoji_config.get("max_reach_deletion", config.max_reach_deletion)
|
||||
|
||||
if config.INNER_VERSION in SpecifierSet(">=1.4.2"):
|
||||
config.save_pic = emoji_config.get("save_pic", config.save_pic)
|
||||
config.save_emoji = emoji_config.get("save_emoji", config.save_emoji)
|
||||
config.steal_emoji = emoji_config.get("steal_emoji", config.steal_emoji)
|
||||
|
||||
|
||||
def bot(parent: dict):
|
||||
# 机器人基础配置
|
||||
bot_config = parent["bot"]
|
||||
|
||||
@@ -21,18 +21,15 @@ logger = get_module_logger("subheartflow", config=subheartflow_config)
|
||||
|
||||
def init_prompt():
|
||||
prompt = ""
|
||||
# prompt += f"麦麦的总体想法是:{self.main_heartflow_info}\n\n"
|
||||
prompt += "{extra_info}\n"
|
||||
# prompt += "{prompt_schedule}\n"
|
||||
# prompt += "{relation_prompt_all}\n"
|
||||
prompt += "{prompt_personality}\n"
|
||||
prompt += "刚刚你的想法是:\n我是{bot_name},我想,{current_thinking_info}\n"
|
||||
prompt += "刚刚你的内心想法是:{current_thinking_info}\n"
|
||||
prompt += "-----------------------------------\n"
|
||||
prompt += "现在是{time_now},你正在上网,和qq群里的网友们聊天,群里正在聊的话题是:\n{chat_observe_info}\n"
|
||||
prompt += "现在是{time_now},你正在上网,和qq群里的网友们聊天,以下是正在进行的聊天内容:\n{chat_observe_info}\n"
|
||||
prompt += "\n你现在{mood_info}\n"
|
||||
prompt += "现在请你生成你的内心想法,要求思考群里正在进行的话题,之前大家聊过的话题,群里成员的关系。"
|
||||
prompt += "请你思考,要不要对群里的话题进行回复,以及如何对群聊内容进行回复\n"
|
||||
prompt += "回复的要求是:平淡一些,简短一些,如果你要回复,最好只回复一个人的一个话题\n"
|
||||
prompt += "回复的要求是:不要总是重复自己提到过的话题,如果你要回复,最好只回复一个人的一个话题\n"
|
||||
prompt += "请注意不要输出多余内容(包括前后缀,冒号和引号,括号, 表情,等),不要回复自己的发言\n"
|
||||
prompt += "现在请你先输出想法,{hf_do_next},不要分点输出,文字不要浮夸"
|
||||
prompt += "在输出完想法后,请你思考应该使用什么工具。工具可以帮你取得一些你不知道的信息,或者进行一些操作。"
|
||||
|
||||
@@ -3,7 +3,7 @@ import time
|
||||
from .plugins.utils.statistic import LLMStatistics
|
||||
from .plugins.moods.moods import MoodManager
|
||||
from .plugins.schedule.schedule_generator import bot_schedule
|
||||
from .plugins.chat.emoji_manager import emoji_manager
|
||||
from .plugins.emoji_system.emoji_manager import emoji_manager
|
||||
from .plugins.person_info.person_info import person_info_manager
|
||||
from .plugins.willing.willing_manager import willing_manager
|
||||
from .plugins.chat.chat_stream import chat_manager
|
||||
@@ -128,7 +128,6 @@ class MainSystem:
|
||||
self.print_mood_task(),
|
||||
self.remove_recalled_message_task(),
|
||||
emoji_manager.start_periodic_check_register(),
|
||||
# emoji_manager.start_periodic_register(),
|
||||
self.app.run(),
|
||||
self.server.run(),
|
||||
]
|
||||
|
||||
@@ -4,7 +4,7 @@ MaiMBot插件系统
|
||||
"""
|
||||
|
||||
from .chat.chat_stream import chat_manager
|
||||
from .chat.emoji_manager import emoji_manager
|
||||
from .emoji_system.emoji_manager import emoji_manager
|
||||
from .person_info.relationship_manager import relationship_manager
|
||||
from .moods.moods import MoodManager
|
||||
from .willing.willing_manager import willing_manager
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from .emoji_manager import emoji_manager
|
||||
from ..emoji_system.emoji_manager import emoji_manager
|
||||
from ..person_info.relationship_manager import relationship_manager
|
||||
from .chat_stream import chat_manager
|
||||
from .message_sender import message_manager
|
||||
|
||||
@@ -1,595 +0,0 @@
|
||||
import asyncio
|
||||
import base64
|
||||
import hashlib
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
import traceback
|
||||
from typing import Optional, Tuple
|
||||
from PIL import Image
|
||||
import io
|
||||
|
||||
from ...common.database import db
|
||||
from ...config.config import global_config
|
||||
from ..chat.utils import get_embedding
|
||||
from ..chat.utils_image import ImageManager, image_path_to_base64
|
||||
from ..models.utils_model import LLMRequest
|
||||
from src.common.logger import get_module_logger, LogConfig, EMOJI_STYLE_CONFIG
|
||||
|
||||
emoji_log_config = LogConfig(
|
||||
console_format=EMOJI_STYLE_CONFIG["console_format"],
|
||||
file_format=EMOJI_STYLE_CONFIG["file_format"],
|
||||
)
|
||||
|
||||
logger = get_module_logger("emoji", config=emoji_log_config)
|
||||
|
||||
|
||||
image_manager = ImageManager()
|
||||
|
||||
|
||||
class EmojiManager:
|
||||
_instance = None
|
||||
EMOJI_DIR = os.path.join("data", "emoji") # 表情包存储目录
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
self._scan_task = None
|
||||
self.vlm = LLMRequest(model=global_config.vlm, temperature=0.3, max_tokens=1000, request_type="emoji")
|
||||
self.llm_emotion_judge = LLMRequest(
|
||||
model=global_config.llm_emotion_judge, max_tokens=600, temperature=0.8, request_type="emoji"
|
||||
) # 更高的温度,更少的token(后续可以根据情绪来调整温度)
|
||||
|
||||
self.emoji_num = 0
|
||||
self.emoji_num_max = global_config.max_emoji_num
|
||||
self.emoji_num_max_reach_deletion = global_config.max_reach_deletion
|
||||
|
||||
logger.info("启动表情包管理器")
|
||||
|
||||
def _ensure_emoji_dir(self):
|
||||
"""确保表情存储目录存在"""
|
||||
os.makedirs(self.EMOJI_DIR, exist_ok=True)
|
||||
|
||||
def _update_emoji_count(self):
|
||||
"""更新表情包数量统计
|
||||
|
||||
检查数据库中的表情包数量并更新到 self.emoji_num
|
||||
"""
|
||||
try:
|
||||
self._ensure_db()
|
||||
self.emoji_num = db.emoji.count_documents({})
|
||||
logger.info(f"[统计] 当前表情包数量: {self.emoji_num}")
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 更新表情包数量失败: {str(e)}")
|
||||
|
||||
def initialize(self):
|
||||
"""初始化数据库连接和表情目录"""
|
||||
if not self._initialized:
|
||||
try:
|
||||
self._ensure_emoji_collection()
|
||||
self._ensure_emoji_dir()
|
||||
self._initialized = True
|
||||
# 更新表情包数量
|
||||
self._update_emoji_count()
|
||||
# 启动时执行一次完整性检查
|
||||
self.check_emoji_file_integrity()
|
||||
except Exception:
|
||||
logger.exception("初始化表情管理器失败")
|
||||
|
||||
def _ensure_db(self):
|
||||
"""确保数据库已初始化"""
|
||||
if not self._initialized:
|
||||
self.initialize()
|
||||
if not self._initialized:
|
||||
raise RuntimeError("EmojiManager not initialized")
|
||||
|
||||
@staticmethod
|
||||
def _ensure_emoji_collection():
|
||||
"""确保emoji集合存在并创建索引
|
||||
|
||||
这个函数用于确保MongoDB数据库中存在emoji集合,并创建必要的索引。
|
||||
|
||||
索引的作用是加快数据库查询速度:
|
||||
- embedding字段的2dsphere索引: 用于加速向量相似度搜索,帮助快速找到相似的表情包
|
||||
- tags字段的普通索引: 加快按标签搜索表情包的速度
|
||||
- filename字段的唯一索引: 确保文件名不重复,同时加快按文件名查找的速度
|
||||
|
||||
没有索引的话,数据库每次查询都需要扫描全部数据,建立索引后可以大大提高查询效率。
|
||||
"""
|
||||
if "emoji" not in db.list_collection_names():
|
||||
db.create_collection("emoji")
|
||||
db.emoji.create_index([("embedding", "2dsphere")])
|
||||
db.emoji.create_index([("filename", 1)], unique=True)
|
||||
|
||||
def record_usage(self, emoji_id: str):
|
||||
"""记录表情使用次数"""
|
||||
try:
|
||||
self._ensure_db()
|
||||
db.emoji.update_one({"_id": emoji_id}, {"$inc": {"usage_count": 1}})
|
||||
except Exception as e:
|
||||
logger.error(f"记录表情使用失败: {str(e)}")
|
||||
|
||||
async def get_emoji_for_text(self, text: str) -> Optional[Tuple[str, str]]:
|
||||
"""根据文本内容获取相关表情包
|
||||
Args:
|
||||
text: 输入文本
|
||||
Returns:
|
||||
Optional[str]: 表情包文件路径,如果没有找到则返回None
|
||||
|
||||
|
||||
可不可以通过 配置文件中的指令 来自定义使用表情包的逻辑?
|
||||
我觉得可行
|
||||
|
||||
"""
|
||||
try:
|
||||
self._ensure_db()
|
||||
|
||||
# 获取文本的embedding
|
||||
text_for_search = await self._get_kimoji_for_text(text)
|
||||
if not text_for_search:
|
||||
logger.error("无法获取文本的情绪")
|
||||
return None
|
||||
text_embedding = await get_embedding(text_for_search, request_type="emoji")
|
||||
if not text_embedding:
|
||||
logger.error("无法获取文本的embedding")
|
||||
return None
|
||||
|
||||
try:
|
||||
# 获取所有表情包
|
||||
all_emojis = [
|
||||
e
|
||||
for e in db.emoji.find({}, {"_id": 1, "path": 1, "embedding": 1, "description": 1, "blacklist": 1})
|
||||
if "blacklist" not in e
|
||||
]
|
||||
|
||||
if not all_emojis:
|
||||
logger.warning("数据库中没有任何表情包")
|
||||
return None
|
||||
|
||||
# 计算余弦相似度并排序
|
||||
def cosine_similarity(v1, v2):
|
||||
if not v1 or not v2:
|
||||
return 0
|
||||
dot_product = sum(a * b for a, b in zip(v1, v2))
|
||||
norm_v1 = sum(a * a for a in v1) ** 0.5
|
||||
norm_v2 = sum(b * b for b in v2) ** 0.5
|
||||
if norm_v1 == 0 or norm_v2 == 0:
|
||||
return 0
|
||||
return dot_product / (norm_v1 * norm_v2)
|
||||
|
||||
# 计算所有表情包与输入文本的相似度
|
||||
emoji_similarities = [
|
||||
(emoji, cosine_similarity(text_embedding, emoji.get("embedding", []))) for emoji in all_emojis
|
||||
]
|
||||
|
||||
# 按相似度降序排序
|
||||
emoji_similarities.sort(key=lambda x: x[1], reverse=True)
|
||||
|
||||
# 获取前3个最相似的表情包
|
||||
top_10_emojis = emoji_similarities[: 10 if len(emoji_similarities) > 10 else len(emoji_similarities)]
|
||||
|
||||
if not top_10_emojis:
|
||||
logger.warning("未找到匹配的表情包")
|
||||
return None
|
||||
|
||||
# 从前3个中随机选择一个
|
||||
selected_emoji, similarity = random.choice(top_10_emojis)
|
||||
|
||||
if selected_emoji and "path" in selected_emoji:
|
||||
# 更新使用次数
|
||||
db.emoji.update_one({"_id": selected_emoji["_id"]}, {"$inc": {"usage_count": 1}})
|
||||
|
||||
logger.info(
|
||||
f"[匹配] 找到表情包: {selected_emoji.get('description', '无描述')} (相似度: {similarity:.4f})"
|
||||
)
|
||||
# 稍微改一下文本描述,不然容易产生幻觉,描述已经包含 表情包 了
|
||||
return selected_emoji["path"], "[ %s ]" % selected_emoji.get("description", "无描述")
|
||||
|
||||
except Exception as search_error:
|
||||
logger.error(f"[错误] 搜索表情包失败: {str(search_error)}")
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 获取表情包失败: {str(e)}")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
async def _get_emoji_description(image_base64: str) -> str:
|
||||
"""获取表情包的标签,使用image_manager的描述生成功能"""
|
||||
|
||||
try:
|
||||
# 使用image_manager获取描述,去掉前后的方括号和"表情包:"前缀
|
||||
description = await image_manager.get_emoji_description(image_base64)
|
||||
# 去掉[表情包:xxx]的格式,只保留描述内容
|
||||
description = description.strip("[]").replace("表情包:", "")
|
||||
return description
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 获取表情包描述失败: {str(e)}")
|
||||
return None
|
||||
|
||||
async def _check_emoji(self, image_base64: str, image_format: str) -> str:
|
||||
try:
|
||||
prompt = (
|
||||
f'这是一个表情包,请回答这个表情包是否满足"{global_config.EMOJI_CHECK_PROMPT}"的要求,是则回答是,'
|
||||
f"否则回答否,不要出现任何其他内容"
|
||||
)
|
||||
|
||||
content, _ = await self.vlm.generate_response_for_image(prompt, image_base64, image_format)
|
||||
logger.debug(f"[检查] 表情包检查结果: {content}")
|
||||
return content
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 表情包检查失败: {str(e)}")
|
||||
return None
|
||||
|
||||
async def _get_kimoji_for_text(self, text: str):
|
||||
try:
|
||||
prompt = (
|
||||
f"这是{global_config.BOT_NICKNAME}将要发送的消息内容:\n{text}\n若要为其配上表情包,"
|
||||
f"请你输出这个表情包应该表达怎样的情感,应该给人什么样的感觉,不要太简洁也不要太长,"
|
||||
f'注意不要输出任何对消息内容的分析内容,只输出"一种什么样的感觉"中间的形容词部分。'
|
||||
)
|
||||
|
||||
content, _ = await self.llm_emotion_judge.generate_response_async(prompt, temperature=1.5)
|
||||
logger.info(f"[情感] 表情包情感描述: {content}")
|
||||
return content
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 获取表情包情感失败: {str(e)}")
|
||||
return None
|
||||
|
||||
async def scan_new_emojis(self):
|
||||
"""扫描新的表情包"""
|
||||
try:
|
||||
emoji_dir = self.EMOJI_DIR
|
||||
os.makedirs(emoji_dir, exist_ok=True)
|
||||
|
||||
# 获取所有支持的图片文件
|
||||
files_to_process = [
|
||||
f for f in os.listdir(emoji_dir) if f.lower().endswith((".jpg", ".jpeg", ".png", ".gif"))
|
||||
]
|
||||
|
||||
# 检查当前表情包数量
|
||||
self._update_emoji_count()
|
||||
if self.emoji_num >= self.emoji_num_max:
|
||||
logger.warning(f"[警告] 表情包数量已达到上限({self.emoji_num}/{self.emoji_num_max}),跳过注册")
|
||||
return
|
||||
|
||||
# 计算还可以注册的数量
|
||||
remaining_slots = self.emoji_num_max - self.emoji_num
|
||||
logger.info(f"[注册] 还可以注册 {remaining_slots} 个表情包")
|
||||
|
||||
for filename in files_to_process:
|
||||
# 如果已经达到上限,停止注册
|
||||
if self.emoji_num >= self.emoji_num_max:
|
||||
logger.warning(f"[警告] 表情包数量已达到上限({self.emoji_num}/{self.emoji_num_max}),停止注册")
|
||||
break
|
||||
|
||||
image_path = os.path.join(emoji_dir, filename)
|
||||
|
||||
# 获取图片的base64编码和哈希值
|
||||
image_base64 = image_path_to_base64(image_path)
|
||||
if image_base64 is None:
|
||||
os.remove(image_path)
|
||||
continue
|
||||
|
||||
image_bytes = base64.b64decode(image_base64)
|
||||
image_hash = hashlib.md5(image_bytes).hexdigest()
|
||||
image_format = Image.open(io.BytesIO(image_bytes)).format.lower()
|
||||
# 检查是否已经注册过
|
||||
existing_emoji_by_path = db["emoji"].find_one({"filename": filename})
|
||||
existing_emoji_by_hash = db["emoji"].find_one({"hash": image_hash})
|
||||
if existing_emoji_by_path and existing_emoji_by_hash:
|
||||
if existing_emoji_by_path["_id"] != existing_emoji_by_hash["_id"]:
|
||||
logger.error(f"[错误] 表情包已存在但记录不一致: {filename}")
|
||||
db.emoji.delete_one({"_id": existing_emoji_by_path["_id"]})
|
||||
db.emoji.delete_one({"_id": existing_emoji_by_hash["_id"]})
|
||||
existing_emoji = None
|
||||
else:
|
||||
existing_emoji = existing_emoji_by_hash
|
||||
elif existing_emoji_by_hash:
|
||||
logger.error(f"[错误] 表情包hash已存在但path不存在: {filename}")
|
||||
db.emoji.delete_one({"_id": existing_emoji_by_hash["_id"]})
|
||||
existing_emoji = None
|
||||
elif existing_emoji_by_path:
|
||||
logger.error(f"[错误] 表情包path已存在但hash不存在: {filename}")
|
||||
db.emoji.delete_one({"_id": existing_emoji_by_path["_id"]})
|
||||
existing_emoji = None
|
||||
else:
|
||||
existing_emoji = None
|
||||
|
||||
description = None
|
||||
|
||||
if existing_emoji:
|
||||
# 即使表情包已存在,也检查是否需要同步到images集合
|
||||
description = existing_emoji.get("description")
|
||||
# 检查是否在images集合中存在
|
||||
existing_image = db.images.find_one({"hash": image_hash})
|
||||
if not existing_image:
|
||||
# 同步到images集合
|
||||
image_doc = {
|
||||
"hash": image_hash,
|
||||
"path": image_path,
|
||||
"type": "emoji",
|
||||
"description": description,
|
||||
"timestamp": int(time.time()),
|
||||
}
|
||||
db.images.update_one({"hash": image_hash}, {"$set": image_doc}, upsert=True)
|
||||
# 保存描述到image_descriptions集合
|
||||
image_manager._save_description_to_db(image_hash, description, "emoji")
|
||||
logger.success(f"[同步] 已同步表情包到images集合: {filename}")
|
||||
continue
|
||||
|
||||
# 检查是否在images集合中已有描述
|
||||
existing_description = image_manager._get_description_from_db(image_hash, "emoji")
|
||||
|
||||
if existing_description:
|
||||
description = existing_description
|
||||
else:
|
||||
# 获取表情包的描述
|
||||
description = await self._get_emoji_description(image_base64)
|
||||
|
||||
if global_config.EMOJI_CHECK:
|
||||
check = await self._check_emoji(image_base64, image_format)
|
||||
if "是" not in check:
|
||||
os.remove(image_path)
|
||||
logger.info(f"[过滤] 表情包描述: {description}")
|
||||
logger.info(f"[过滤] 表情包不满足规则,已移除: {check}")
|
||||
continue
|
||||
logger.info(f"[检查] 表情包检查通过: {check}")
|
||||
|
||||
if description is not None:
|
||||
embedding = await get_embedding(description, request_type="emoji")
|
||||
if not embedding:
|
||||
logger.error("获取消息嵌入向量失败")
|
||||
raise ValueError("获取消息嵌入向量失败")
|
||||
# 准备数据库记录
|
||||
emoji_record = {
|
||||
"filename": filename,
|
||||
"path": image_path,
|
||||
"embedding": embedding,
|
||||
"description": description,
|
||||
"hash": image_hash,
|
||||
"timestamp": int(time.time()),
|
||||
}
|
||||
|
||||
# 保存到emoji数据库
|
||||
db["emoji"].insert_one(emoji_record)
|
||||
logger.success(f"[注册] 新表情包: {filename}")
|
||||
logger.info(f"[描述] {description}")
|
||||
|
||||
# 更新当前表情包数量
|
||||
self.emoji_num += 1
|
||||
logger.info(f"[统计] 当前表情包数量: {self.emoji_num}/{self.emoji_num_max}")
|
||||
|
||||
# 保存到images数据库
|
||||
image_doc = {
|
||||
"hash": image_hash,
|
||||
"path": image_path,
|
||||
"type": "emoji",
|
||||
"description": description,
|
||||
"timestamp": int(time.time()),
|
||||
}
|
||||
db.images.update_one({"hash": image_hash}, {"$set": image_doc}, upsert=True)
|
||||
# 保存描述到image_descriptions集合
|
||||
image_manager._save_description_to_db(image_hash, description, "emoji")
|
||||
logger.success(f"[同步] 已保存到images集合: {filename}")
|
||||
else:
|
||||
logger.warning(f"[跳过] 表情包: {filename}")
|
||||
|
||||
except Exception:
|
||||
logger.exception("[错误] 扫描表情包失败")
|
||||
|
||||
def check_emoji_file_integrity(self):
|
||||
"""检查表情包文件完整性
|
||||
如果文件已被删除,则从数据库中移除对应记录
|
||||
"""
|
||||
try:
|
||||
self._ensure_db()
|
||||
# 获取所有表情包记录
|
||||
all_emojis = list(db.emoji.find())
|
||||
removed_count = 0
|
||||
total_count = len(all_emojis)
|
||||
|
||||
for emoji in all_emojis:
|
||||
try:
|
||||
if "path" not in emoji:
|
||||
logger.warning(f"[检查] 发现无效记录(缺少path字段),ID: {emoji.get('_id', 'unknown')}")
|
||||
db.emoji.delete_one({"_id": emoji["_id"]})
|
||||
removed_count += 1
|
||||
continue
|
||||
|
||||
if "embedding" not in emoji:
|
||||
logger.warning(f"[检查] 发现过时记录(缺少embedding字段),ID: {emoji.get('_id', 'unknown')}")
|
||||
db.emoji.delete_one({"_id": emoji["_id"]})
|
||||
removed_count += 1
|
||||
continue
|
||||
|
||||
# 检查文件是否存在
|
||||
if not os.path.exists(emoji["path"]):
|
||||
logger.warning(f"[检查] 表情包文件已被删除: {emoji['path']}")
|
||||
# 从数据库中删除记录
|
||||
result = db.emoji.delete_one({"_id": emoji["_id"]})
|
||||
if result.deleted_count > 0:
|
||||
logger.debug(f"[清理] 成功删除数据库记录: {emoji['_id']}")
|
||||
removed_count += 1
|
||||
else:
|
||||
logger.error(f"[错误] 删除数据库记录失败: {emoji['_id']}")
|
||||
continue
|
||||
|
||||
if "hash" not in emoji:
|
||||
logger.warning(f"[检查] 发现缺失记录(缺少hash字段),ID: {emoji.get('_id', 'unknown')}")
|
||||
hash = hashlib.md5(open(emoji["path"], "rb").read()).hexdigest()
|
||||
db.emoji.update_one({"_id": emoji["_id"]}, {"$set": {"hash": hash}})
|
||||
else:
|
||||
file_hash = hashlib.md5(open(emoji["path"], "rb").read()).hexdigest()
|
||||
if emoji["hash"] != file_hash:
|
||||
logger.warning(f"[检查] 表情包文件hash不匹配,ID: {emoji.get('_id', 'unknown')}")
|
||||
db.emoji.delete_one({"_id": emoji["_id"]})
|
||||
removed_count += 1
|
||||
|
||||
# 修复拼写错误
|
||||
if "discription" in emoji:
|
||||
desc = emoji["discription"]
|
||||
db.emoji.update_one(
|
||||
{"_id": emoji["_id"]}, {"$unset": {"discription": ""}, "$set": {"description": desc}}
|
||||
)
|
||||
|
||||
except Exception as item_error:
|
||||
logger.error(f"[错误] 处理表情包记录时出错: {str(item_error)}")
|
||||
continue
|
||||
|
||||
# 验证清理结果
|
||||
remaining_count = db.emoji.count_documents({})
|
||||
if removed_count > 0:
|
||||
logger.success(f"[清理] 已清理 {removed_count} 个失效的表情包记录")
|
||||
logger.info(f"[统计] 清理前: {total_count} | 清理后: {remaining_count}")
|
||||
else:
|
||||
logger.info(f"[检查] 已检查 {total_count} 个表情包记录")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 检查表情包完整性失败: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
def check_emoji_file_full(self):
|
||||
"""检查表情包文件是否完整,如果数量超出限制且允许删除,则删除多余的表情包
|
||||
|
||||
删除规则:
|
||||
1. 优先删除创建时间更早的表情包
|
||||
2. 优先删除使用次数少的表情包,但使用次数多的也有小概率被删除
|
||||
"""
|
||||
try:
|
||||
self._ensure_db()
|
||||
# 更新表情包数量
|
||||
self._update_emoji_count()
|
||||
|
||||
# 检查是否超出限制
|
||||
if self.emoji_num <= self.emoji_num_max:
|
||||
return
|
||||
|
||||
# 如果超出限制但不允许删除,则只记录警告
|
||||
if not global_config.max_reach_deletion:
|
||||
logger.warning(f"[警告] 表情包数量({self.emoji_num})超出限制({self.emoji_num_max}),但未开启自动删除")
|
||||
return
|
||||
|
||||
# 计算需要删除的数量
|
||||
delete_count = self.emoji_num - self.emoji_num_max
|
||||
logger.info(f"[清理] 需要删除 {delete_count} 个表情包")
|
||||
|
||||
# 获取所有表情包,按时间戳升序(旧的在前)排序
|
||||
all_emojis = list(db.emoji.find().sort([("timestamp", 1)]))
|
||||
|
||||
# 计算权重:使用次数越多,被删除的概率越小
|
||||
weights = []
|
||||
max_usage = max((emoji.get("usage_count", 0) for emoji in all_emojis), default=1)
|
||||
for emoji in all_emojis:
|
||||
usage_count = emoji.get("usage_count", 0)
|
||||
# 使用指数衰减函数计算权重,使用次数越多权重越小
|
||||
weight = 1.0 / (1.0 + usage_count / max(1, max_usage))
|
||||
weights.append(weight)
|
||||
|
||||
# 根据权重随机选择要删除的表情包
|
||||
to_delete = []
|
||||
remaining_indices = list(range(len(all_emojis)))
|
||||
|
||||
while len(to_delete) < delete_count and remaining_indices:
|
||||
# 计算当前剩余表情包的权重
|
||||
current_weights = [weights[i] for i in remaining_indices]
|
||||
# 归一化权重
|
||||
total_weight = sum(current_weights)
|
||||
if total_weight == 0:
|
||||
break
|
||||
normalized_weights = [w / total_weight for w in current_weights]
|
||||
|
||||
# 随机选择一个表情包
|
||||
selected_idx = random.choices(remaining_indices, weights=normalized_weights, k=1)[0]
|
||||
to_delete.append(all_emojis[selected_idx])
|
||||
remaining_indices.remove(selected_idx)
|
||||
|
||||
# 删除选中的表情包
|
||||
deleted_count = 0
|
||||
for emoji in to_delete:
|
||||
try:
|
||||
# 删除文件
|
||||
if "path" in emoji and os.path.exists(emoji["path"]):
|
||||
os.remove(emoji["path"])
|
||||
logger.info(f"[删除] 文件: {emoji['path']} (使用次数: {emoji.get('usage_count', 0)})")
|
||||
|
||||
# 删除数据库记录
|
||||
db.emoji.delete_one({"_id": emoji["_id"]})
|
||||
deleted_count += 1
|
||||
|
||||
# 同时从images集合中删除
|
||||
if "hash" in emoji:
|
||||
db.images.delete_one({"hash": emoji["hash"]})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 删除表情包失败: {str(e)}")
|
||||
continue
|
||||
|
||||
# 更新表情包数量
|
||||
self._update_emoji_count()
|
||||
logger.success(f"[清理] 已删除 {deleted_count} 个表情包,当前数量: {self.emoji_num}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 检查表情包数量失败: {str(e)}")
|
||||
|
||||
async def start_periodic_check_register(self):
|
||||
"""定期检查表情包完整性和数量"""
|
||||
while True:
|
||||
logger.info("[扫描] 开始检查表情包完整性...")
|
||||
self.check_emoji_file_integrity()
|
||||
logger.info("[扫描] 开始删除所有图片缓存...")
|
||||
await self.delete_all_images()
|
||||
logger.info("[扫描] 开始扫描新表情包...")
|
||||
if self.emoji_num < self.emoji_num_max:
|
||||
await self.scan_new_emojis()
|
||||
if self.emoji_num > self.emoji_num_max:
|
||||
logger.warning(f"[警告] 表情包数量超过最大限制: {self.emoji_num} > {self.emoji_num_max},跳过注册")
|
||||
if not global_config.max_reach_deletion:
|
||||
logger.warning("表情包数量超过最大限制,终止注册")
|
||||
break
|
||||
else:
|
||||
logger.warning("表情包数量超过最大限制,开始删除表情包")
|
||||
self.check_emoji_file_full()
|
||||
await asyncio.sleep(global_config.EMOJI_CHECK_INTERVAL * 60)
|
||||
|
||||
@staticmethod
|
||||
async def delete_all_images():
|
||||
"""删除 data/image 目录下的所有文件"""
|
||||
try:
|
||||
image_dir = os.path.join("data", "image")
|
||||
if not os.path.exists(image_dir):
|
||||
logger.warning(f"[警告] 目录不存在: {image_dir}")
|
||||
return
|
||||
|
||||
deleted_count = 0
|
||||
failed_count = 0
|
||||
|
||||
# 遍历目录下的所有文件
|
||||
for filename in os.listdir(image_dir):
|
||||
file_path = os.path.join(image_dir, filename)
|
||||
try:
|
||||
if os.path.isfile(file_path):
|
||||
os.remove(file_path)
|
||||
deleted_count += 1
|
||||
logger.debug(f"[删除] 文件: {file_path}")
|
||||
except Exception as e:
|
||||
failed_count += 1
|
||||
logger.error(f"[错误] 删除文件失败 {file_path}: {str(e)}")
|
||||
|
||||
logger.success(f"[清理] 已删除 {deleted_count} 个文件,失败 {failed_count} 个")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 删除图片目录失败: {str(e)}")
|
||||
|
||||
|
||||
# 创建全局单例
|
||||
emoji_manager = EmojiManager()
|
||||
@@ -127,12 +127,12 @@ class MessageRecv(Message):
|
||||
# 如果是base64图片数据
|
||||
if isinstance(seg.data, str):
|
||||
return await image_manager.get_image_description(seg.data)
|
||||
return "[图片]"
|
||||
return "[发了一张图片,网卡了加载不出来]"
|
||||
elif seg.type == "emoji":
|
||||
self.is_emoji = True
|
||||
if isinstance(seg.data, str):
|
||||
return await image_manager.get_emoji_description(seg.data)
|
||||
return "[表情]"
|
||||
return "[发了一个表情包,网卡了加载不出来]"
|
||||
else:
|
||||
return f"[{seg.type}:{str(seg.data)}]"
|
||||
except Exception as e:
|
||||
@@ -141,14 +141,8 @@ class MessageRecv(Message):
|
||||
|
||||
def _generate_detailed_text(self) -> str:
|
||||
"""生成详细文本,包含时间和用户信息"""
|
||||
# time_str = time.strftime("%m-%d %H:%M:%S", time.localtime(self.message_info.time))
|
||||
timestamp = self.message_info.time
|
||||
user_info = self.message_info.user_info
|
||||
# name = (
|
||||
# f"{user_info.user_nickname}(ta的昵称:{user_info.user_cardname},ta的id:{user_info.user_id})"
|
||||
# if user_info.user_cardname != None
|
||||
# else f"{user_info.user_nickname}(ta的id:{user_info.user_id})"
|
||||
# )
|
||||
name = f"<{self.message_info.platform}:{user_info.user_id}:{user_info.user_nickname}:{user_info.user_cardname}>"
|
||||
return f"[{timestamp}] {name}: {self.processed_plain_text}\n"
|
||||
|
||||
@@ -222,11 +216,11 @@ class MessageProcessBase(Message):
|
||||
# 如果是base64图片数据
|
||||
if isinstance(seg.data, str):
|
||||
return await image_manager.get_image_description(seg.data)
|
||||
return "[图片]"
|
||||
return "[图片,网卡了加载不出来]"
|
||||
elif seg.type == "emoji":
|
||||
if isinstance(seg.data, str):
|
||||
return await image_manager.get_emoji_description(seg.data)
|
||||
return "[表情]"
|
||||
return "[表情,网卡了加载不出来]"
|
||||
elif seg.type == "at":
|
||||
return f"[@{seg.data}]"
|
||||
elif seg.type == "reply":
|
||||
|
||||
@@ -121,7 +121,7 @@ class ImageManager:
|
||||
prompt = "这是一个动态图表情包,每一张图代表了动态图的某一帧,黑色背景代表透明,使用1-2个词描述一下表情包表达的情感和内容,简短一些"
|
||||
description, _ = await self._llm.generate_response_for_image(prompt, image_base64, "jpg")
|
||||
else:
|
||||
prompt = "这是一个表情包,请用使用1-2个词描述一下表情包所表达的情感和内容,简短一些"
|
||||
prompt = "这是一个表情包,请用使用几个词描述一下表情包所表达的情感和内容,简短一些"
|
||||
description, _ = await self._llm.generate_response_for_image(prompt, image_base64, image_format)
|
||||
|
||||
cached_description = self._get_description_from_db(image_hash, "emoji")
|
||||
@@ -130,7 +130,7 @@ class ImageManager:
|
||||
return f"[表达了:{cached_description}]"
|
||||
|
||||
# 根据配置决定是否保存图片
|
||||
if global_config.EMOJI_SAVE:
|
||||
if global_config.save_emoji:
|
||||
# 生成文件名和路径
|
||||
timestamp = int(time.time())
|
||||
filename = f"{timestamp}_{image_hash[:8]}.{image_format}"
|
||||
@@ -196,7 +196,7 @@ class ImageManager:
|
||||
return "[图片]"
|
||||
|
||||
# 根据配置决定是否保存图片
|
||||
if global_config.EMOJI_SAVE:
|
||||
if global_config.save_pic:
|
||||
# 生成文件名和路径
|
||||
timestamp = int(time.time())
|
||||
filename = f"{timestamp}_{image_hash[:8]}.{image_format}"
|
||||
|
||||
794
src/plugins/emoji_system/emoji_manager.py
Normal file
794
src/plugins/emoji_system/emoji_manager.py
Normal file
@@ -0,0 +1,794 @@
|
||||
import asyncio
|
||||
import base64
|
||||
import hashlib
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
import traceback
|
||||
from typing import Optional, Tuple
|
||||
from PIL import Image
|
||||
import io
|
||||
import re
|
||||
|
||||
from ...common.database import db
|
||||
from ...config.config import global_config
|
||||
from ..chat.utils import get_embedding
|
||||
from ..chat.utils_image import image_path_to_base64, image_manager
|
||||
from ..models.utils_model import LLMRequest
|
||||
from src.common.logger import get_module_logger, LogConfig, EMOJI_STYLE_CONFIG
|
||||
|
||||
|
||||
emoji_log_config = LogConfig(
|
||||
console_format=EMOJI_STYLE_CONFIG["console_format"],
|
||||
file_format=EMOJI_STYLE_CONFIG["file_format"],
|
||||
)
|
||||
|
||||
logger = get_module_logger("emoji", config=emoji_log_config)
|
||||
|
||||
EMOJI_DIR = os.path.join("data", "emoji") # 表情包存储目录
|
||||
EMOJI_REGISTED_DIR = os.path.join("data", "emoji_registed") # 已注册的表情包注册目录
|
||||
|
||||
|
||||
class MaiEmoji:
|
||||
"""定义一个表情包"""
|
||||
def __init__(self, filename: str, path: str):
|
||||
self.path = path # 存储目录路径
|
||||
self.filename = filename
|
||||
self.embedding = []
|
||||
self.hash = "" # 初始为空,在创建实例时会计算
|
||||
self.description = ""
|
||||
self.emotion = []
|
||||
self.usage_count = 0
|
||||
self.last_used_time = time.time()
|
||||
self.register_time = time.time()
|
||||
self.is_deleted = False # 标记是否已被删除
|
||||
self.format = ""
|
||||
|
||||
async def initialize_hash_format(self):
|
||||
"""从文件创建表情包实例
|
||||
|
||||
参数:
|
||||
file_path: 文件的完整路径
|
||||
|
||||
返回:
|
||||
MaiEmoji: 创建的表情包实例,如果失败则返回None
|
||||
"""
|
||||
try:
|
||||
file_path = os.path.join(self.path, self.filename)
|
||||
if not os.path.exists(file_path):
|
||||
logger.error(f"[错误] 表情包文件不存在: {file_path}")
|
||||
return None
|
||||
|
||||
image_base64 = image_path_to_base64(file_path)
|
||||
if image_base64 is None:
|
||||
logger.error(f"[错误] 无法读取图片: {file_path}")
|
||||
return None
|
||||
|
||||
|
||||
# 计算哈希值
|
||||
image_bytes = base64.b64decode(image_base64)
|
||||
self.hash = hashlib.md5(image_bytes).hexdigest()
|
||||
|
||||
# 获取图片格式
|
||||
self.format = Image.open(io.BytesIO(image_bytes)).format.lower()
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 初始化表情包失败: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
return None
|
||||
|
||||
async def register_to_db(self):
|
||||
"""
|
||||
注册表情包
|
||||
将表情包对应的文件,从当前路径移动到EMOJI_REGISTED_DIR目录下
|
||||
并修改对应的实例属性,然后将表情包信息保存到数据库中
|
||||
"""
|
||||
try:
|
||||
# 确保目标目录存在
|
||||
os.makedirs(EMOJI_REGISTED_DIR, exist_ok=True)
|
||||
|
||||
# 源路径是当前实例的完整路径
|
||||
source_path = os.path.join(self.path, self.filename)
|
||||
# 目标路径
|
||||
destination_path = os.path.join(EMOJI_REGISTED_DIR, self.filename)
|
||||
|
||||
# 检查源文件是否存在
|
||||
if not os.path.exists(source_path):
|
||||
logger.error(f"[错误] 源文件不存在: {source_path}")
|
||||
return False
|
||||
|
||||
# --- 文件移动 ---
|
||||
try:
|
||||
# 如果目标文件已存在,先删除 (确保移动成功)
|
||||
if os.path.exists(destination_path):
|
||||
os.remove(destination_path)
|
||||
|
||||
os.rename(source_path, destination_path)
|
||||
logger.info(f"[移动] 文件从 {source_path} 移动到 {destination_path}")
|
||||
# 更新实例的路径属性为新目录
|
||||
self.path = EMOJI_REGISTED_DIR
|
||||
except Exception as move_error:
|
||||
logger.error(f"[错误] 移动文件失败: {str(move_error)}")
|
||||
return False # 文件移动失败,不继续
|
||||
|
||||
# --- 数据库操作 ---
|
||||
try:
|
||||
# 准备数据库记录 for emoji collection
|
||||
emoji_record = {
|
||||
"filename": self.filename,
|
||||
"path": os.path.join(self.path, self.filename), # 使用更新后的路径
|
||||
"embedding": self.embedding,
|
||||
"description": self.description,
|
||||
"emotion": self.emotion, # 添加情感标签字段
|
||||
"hash": self.hash,
|
||||
"format": self.format,
|
||||
"timestamp": int(self.register_time), # 使用实例的注册时间
|
||||
"usage_count": self.usage_count,
|
||||
"last_used_time": self.last_used_time
|
||||
}
|
||||
|
||||
# 使用upsert确保记录存在或被更新
|
||||
db["emoji"].update_one(
|
||||
{"hash": self.hash},
|
||||
{"$set": emoji_record},
|
||||
upsert=True
|
||||
)
|
||||
logger.success(f"[注册] 表情包信息保存到数据库: {self.description}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as db_error:
|
||||
logger.error(f"[错误] 保存数据库失败: {str(db_error)}")
|
||||
# 考虑是否需要将文件移回?为了简化,暂时只记录错误
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 注册表情包失败: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
return False
|
||||
|
||||
async def delete(self):
|
||||
"""删除表情包
|
||||
|
||||
删除表情包的文件和数据库记录
|
||||
|
||||
返回:
|
||||
bool: 是否成功删除
|
||||
"""
|
||||
try:
|
||||
# 1. 删除文件
|
||||
if os.path.exists(os.path.join(self.path, self.filename)):
|
||||
try:
|
||||
os.remove(os.path.join(self.path, self.filename))
|
||||
logger.info(f"[删除] 文件: {os.path.join(self.path, self.filename)}")
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 删除文件失败 {os.path.join(self.path, self.filename)}: {str(e)}")
|
||||
# 继续执行,即使文件删除失败也尝试删除数据库记录
|
||||
|
||||
# 2. 删除数据库记录
|
||||
result = db.emoji.delete_one({"hash": self.hash})
|
||||
deleted_in_db = result.deleted_count > 0
|
||||
|
||||
if deleted_in_db:
|
||||
logger.success(f"[删除] 成功删除表情包记录: {self.description}")
|
||||
|
||||
# 3. 标记对象已被删除
|
||||
self.is_deleted = True
|
||||
return True
|
||||
else:
|
||||
logger.error(f"[错误] 删除表情包记录失败: {self.hash}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 删除表情包失败: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
class EmojiManager:
|
||||
_instance = None
|
||||
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
self._scan_task = None
|
||||
self.vlm = LLMRequest(model=global_config.vlm, temperature=0.3, max_tokens=1000, request_type="emoji")
|
||||
self.llm_emotion_judge = LLMRequest(
|
||||
model=global_config.llm_emotion_judge, max_tokens=600, temperature=0.8, request_type="emoji"
|
||||
) # 更高的温度,更少的token(后续可以根据情绪来调整温度)
|
||||
|
||||
self.emoji_num = 0
|
||||
self.emoji_num_max = global_config.max_emoji_num
|
||||
self.emoji_num_max_reach_deletion = global_config.max_reach_deletion
|
||||
self.emoji_objects: list[MaiEmoji] = [] # 存储MaiEmoji对象的列表,使用类型注解明确列表元素类型
|
||||
|
||||
logger.info("启动表情包管理器")
|
||||
|
||||
def _ensure_emoji_dir(self):
|
||||
"""确保表情存储目录存在"""
|
||||
os.makedirs(EMOJI_DIR, exist_ok=True)
|
||||
|
||||
|
||||
def initialize(self):
|
||||
"""初始化数据库连接和表情目录"""
|
||||
if not self._initialized:
|
||||
try:
|
||||
self._ensure_emoji_collection()
|
||||
self._ensure_emoji_dir()
|
||||
self._initialized = True
|
||||
# 更新表情包数量
|
||||
# 启动时执行一次完整性检查
|
||||
self.check_emoji_file_integrity()
|
||||
except Exception:
|
||||
logger.exception("初始化表情管理器失败")
|
||||
|
||||
def _ensure_db(self):
|
||||
"""确保数据库已初始化"""
|
||||
if not self._initialized:
|
||||
self.initialize()
|
||||
if not self._initialized:
|
||||
raise RuntimeError("EmojiManager not initialized")
|
||||
|
||||
@staticmethod
|
||||
def _ensure_emoji_collection():
|
||||
"""确保emoji集合存在并创建索引
|
||||
|
||||
这个函数用于确保MongoDB数据库中存在emoji集合,并创建必要的索引。
|
||||
|
||||
索引的作用是加快数据库查询速度:
|
||||
- embedding字段的2dsphere索引: 用于加速向量相似度搜索,帮助快速找到相似的表情包
|
||||
- tags字段的普通索引: 加快按标签搜索表情包的速度
|
||||
- filename字段的唯一索引: 确保文件名不重复,同时加快按文件名查找的速度
|
||||
|
||||
没有索引的话,数据库每次查询都需要扫描全部数据,建立索引后可以大大提高查询效率。
|
||||
"""
|
||||
if "emoji" not in db.list_collection_names():
|
||||
db.create_collection("emoji")
|
||||
db.emoji.create_index([("embedding", "2dsphere")])
|
||||
db.emoji.create_index([("filename", 1)], unique=True)
|
||||
|
||||
def record_usage(self, hash: str):
|
||||
"""记录表情使用次数"""
|
||||
try:
|
||||
for emoji in self.emoji_objects:
|
||||
if emoji.hash == hash:
|
||||
emoji.usage_count += 1
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"记录表情使用失败: {str(e)}")
|
||||
|
||||
async def get_emoji_for_text(self, text_emotion: str) -> Optional[Tuple[str, str]]:
|
||||
"""根据文本内容获取相关表情包
|
||||
Args:
|
||||
text_emotion: 输入的情感描述文本
|
||||
Returns:
|
||||
Optional[Tuple[str, str]]: (表情包文件路径, 表情包描述),如果没有找到则返回None
|
||||
"""
|
||||
try:
|
||||
self._ensure_db()
|
||||
time_start = time.time()
|
||||
|
||||
# 获取所有表情包
|
||||
all_emojis = self.emoji_objects
|
||||
|
||||
if not all_emojis:
|
||||
logger.warning("数据库中没有任何表情包")
|
||||
return None
|
||||
|
||||
# 计算每个表情包与输入文本的最大情感相似度
|
||||
emoji_similarities = []
|
||||
for emoji in all_emojis:
|
||||
emotions = emoji.emotion
|
||||
if not emotions:
|
||||
continue
|
||||
|
||||
# 计算与每个emotion标签的相似度,取最大值
|
||||
max_similarity = 0
|
||||
for emotion in emotions:
|
||||
# 使用编辑距离计算相似度
|
||||
distance = self._levenshtein_distance(text_emotion, emotion)
|
||||
max_len = max(len(text_emotion), len(emotion))
|
||||
similarity = 1 - (distance / max_len if max_len > 0 else 0)
|
||||
max_similarity = max(max_similarity, similarity)
|
||||
|
||||
emoji_similarities.append((emoji, max_similarity))
|
||||
|
||||
# 按相似度降序排序
|
||||
emoji_similarities.sort(key=lambda x: x[1], reverse=True)
|
||||
|
||||
# 获取前5个最相似的表情包
|
||||
top_5_emojis = emoji_similarities[:5] if len(emoji_similarities) > 5 else emoji_similarities
|
||||
|
||||
if not top_5_emojis:
|
||||
logger.warning("未找到匹配的表情包")
|
||||
return None
|
||||
|
||||
# 从前5个中随机选择一个
|
||||
selected_emoji, similarity = random.choice(top_5_emojis)
|
||||
|
||||
# 更新使用次数
|
||||
db.emoji.update_one({"hash": selected_emoji.hash}, {"$inc": {"usage_count": 1}})
|
||||
|
||||
logger.info(
|
||||
f"[匹配] 找到表情包: {selected_emoji.description} (相似度: {similarity:.4f})"
|
||||
)
|
||||
|
||||
time_end = time.time()
|
||||
logger.info(f"[匹配] 搜索表情包用时: {time_end - time_start:.2f} 秒")
|
||||
return os.path.join(selected_emoji.path, selected_emoji.filename), f"[ {selected_emoji.description} ]"
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 获取表情包失败: {str(e)}")
|
||||
return None
|
||||
|
||||
def _levenshtein_distance(self, s1: str, s2: str) -> int:
|
||||
"""计算两个字符串的编辑距离
|
||||
|
||||
Args:
|
||||
s1: 第一个字符串
|
||||
s2: 第二个字符串
|
||||
|
||||
Returns:
|
||||
int: 编辑距离
|
||||
"""
|
||||
if len(s1) < len(s2):
|
||||
return self._levenshtein_distance(s2, s1)
|
||||
|
||||
if len(s2) == 0:
|
||||
return len(s1)
|
||||
|
||||
previous_row = range(len(s2) + 1)
|
||||
for i, c1 in enumerate(s1):
|
||||
current_row = [i + 1]
|
||||
for j, c2 in enumerate(s2):
|
||||
insertions = previous_row[j + 1] + 1
|
||||
deletions = current_row[j] + 1
|
||||
substitutions = previous_row[j] + (c1 != c2)
|
||||
current_row.append(min(insertions, deletions, substitutions))
|
||||
previous_row = current_row
|
||||
|
||||
return previous_row[-1]
|
||||
|
||||
async def check_emoji_file_integrity(self):
|
||||
"""检查表情包文件完整性
|
||||
遍历self.emoji_objects中的所有对象,检查文件是否存在
|
||||
如果文件已被删除,则执行对象的删除方法并从列表中移除
|
||||
"""
|
||||
try:
|
||||
if not self.emoji_objects:
|
||||
logger.warning("[检查] emoji_objects为空,跳过完整性检查")
|
||||
return
|
||||
|
||||
total_count = len(self.emoji_objects)
|
||||
removed_count = 0
|
||||
# 使用列表复制进行遍历,因为我们会在遍历过程中修改列表
|
||||
for emoji in self.emoji_objects[:]:
|
||||
try:
|
||||
# 检查文件是否存在
|
||||
if not os.path.exists(emoji.path):
|
||||
logger.warning(f"[检查] 表情包文件已被删除: {emoji.path}")
|
||||
# 执行表情包对象的删除方法
|
||||
await emoji.delete()
|
||||
# 从列表中移除该对象
|
||||
self.emoji_objects.remove(emoji)
|
||||
# 更新计数
|
||||
self.emoji_num -= 1
|
||||
removed_count += 1
|
||||
continue
|
||||
|
||||
except Exception as item_error:
|
||||
logger.error(f"[错误] 处理表情包记录时出错: {str(item_error)}")
|
||||
continue
|
||||
|
||||
# 输出清理结果
|
||||
if removed_count > 0:
|
||||
logger.success(f"[清理] 已清理 {removed_count} 个失效的表情包记录")
|
||||
logger.info(f"[统计] 清理前: {total_count} | 清理后: {len(self.emoji_objects)}")
|
||||
else:
|
||||
logger.info(f"[检查] 已检查 {total_count} 个表情包记录,全部完好")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 检查表情包完整性失败: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
async def start_periodic_check_register(self):
|
||||
"""定期检查表情包完整性和数量"""
|
||||
await self.get_all_emoji_from_db()
|
||||
while True:
|
||||
logger.info("[扫描] 开始检查表情包完整性...")
|
||||
self.check_emoji_file_integrity()
|
||||
logger.info("[扫描] 开始扫描新表情包...")
|
||||
|
||||
# 检查表情包目录是否存在
|
||||
if not os.path.exists(EMOJI_DIR):
|
||||
logger.warning(f"[警告] 表情包目录不存在: {EMOJI_DIR}")
|
||||
os.makedirs(EMOJI_DIR, exist_ok=True)
|
||||
logger.info(f"[创建] 已创建表情包目录: {EMOJI_DIR}")
|
||||
await asyncio.sleep(global_config.EMOJI_CHECK_INTERVAL * 60)
|
||||
continue
|
||||
|
||||
# 检查目录是否为空
|
||||
files = os.listdir(EMOJI_DIR)
|
||||
if not files:
|
||||
logger.warning(f"[警告] 表情包目录为空: {EMOJI_DIR}")
|
||||
await asyncio.sleep(global_config.EMOJI_CHECK_INTERVAL * 60)
|
||||
continue
|
||||
|
||||
# 检查是否需要处理表情包(数量超过最大值或不足)
|
||||
if (self.emoji_num > self.emoji_num_max and global_config.max_reach_deletion) or (self.emoji_num < self.emoji_num_max):
|
||||
try:
|
||||
# 获取目录下所有图片文件
|
||||
files_to_process = [
|
||||
f for f in files
|
||||
if os.path.isfile(os.path.join(EMOJI_DIR, f))
|
||||
and f.lower().endswith((".jpg", ".jpeg", ".png", ".gif"))
|
||||
]
|
||||
|
||||
# 处理每个符合条件的文件
|
||||
for filename in files_to_process:
|
||||
# 尝试注册表情包
|
||||
success = await self.register_emoji_by_filename(filename)
|
||||
if success:
|
||||
# 注册成功则跳出循环
|
||||
break
|
||||
else:
|
||||
# 注册失败则删除对应文件
|
||||
file_path = os.path.join(EMOJI_DIR, filename)
|
||||
os.remove(file_path)
|
||||
logger.warning(f"[清理] 删除注册失败的表情包文件: {filename}")
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 扫描表情包目录失败: {str(e)}")
|
||||
|
||||
await asyncio.sleep(global_config.EMOJI_CHECK_INTERVAL * 60)
|
||||
|
||||
async def get_all_emoji_from_db(self):
|
||||
"""获取所有表情包并初始化为MaiEmoji类对象
|
||||
|
||||
参数:
|
||||
hash: 可选,如果提供则只返回指定哈希值的表情包
|
||||
|
||||
返回:
|
||||
list[MaiEmoji]: 表情包对象列表
|
||||
"""
|
||||
try:
|
||||
self._ensure_db()
|
||||
|
||||
# 获取所有表情包
|
||||
all_emoji_data = list(db.emoji.find())
|
||||
|
||||
# 将数据库记录转换为MaiEmoji对象
|
||||
emoji_objects = []
|
||||
for emoji_data in all_emoji_data:
|
||||
emoji = MaiEmoji(
|
||||
filename=emoji_data.get("filename", ""),
|
||||
path=emoji_data.get("path", ""),
|
||||
)
|
||||
|
||||
# 设置额外属性
|
||||
emoji.usage_count = emoji_data.get("usage_count", 0)
|
||||
emoji.last_used_time = emoji_data.get("last_used_time", emoji_data.get("timestamp", time.time()))
|
||||
emoji.register_time = emoji_data.get("timestamp", time.time())
|
||||
emoji.description = emoji_data.get("description", "")
|
||||
emoji.emotion = emoji_data.get("emotion", []) # 添加情感标签的加载
|
||||
emoji_objects.append(emoji)
|
||||
|
||||
# 存储到EmojiManager中
|
||||
self.emoji_objects = emoji_objects
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 获取所有表情包对象失败: {str(e)}")
|
||||
|
||||
async def get_emoji_from_db(self, hash=None):
|
||||
"""获取所有表情包并初始化为MaiEmoji类对象
|
||||
|
||||
参数:
|
||||
hash: 可选,如果提供则只返回指定哈希值的表情包
|
||||
|
||||
返回:
|
||||
list[MaiEmoji]: 表情包对象列表
|
||||
"""
|
||||
try:
|
||||
self._ensure_db()
|
||||
|
||||
# 准备查询条件
|
||||
query = {}
|
||||
if hash:
|
||||
query = {"hash": hash}
|
||||
|
||||
# 获取所有表情包
|
||||
all_emoji_data = list(db.emoji.find(query))
|
||||
|
||||
# 将数据库记录转换为MaiEmoji对象
|
||||
emoji_objects = []
|
||||
for emoji_data in all_emoji_data:
|
||||
emoji = MaiEmoji(
|
||||
filename=emoji_data.get("filename", ""),
|
||||
path=emoji_data.get("path", ""),
|
||||
)
|
||||
|
||||
# 设置额外属性
|
||||
emoji.usage_count = emoji_data.get("usage_count", 0)
|
||||
emoji.last_used_time = emoji_data.get("last_used_time", emoji_data.get("timestamp", time.time()))
|
||||
emoji.register_time = emoji_data.get("timestamp", time.time())
|
||||
emoji.description = emoji_data.get("description", "")
|
||||
emoji.emotion = emoji_data.get("emotion", []) # 添加情感标签的加载
|
||||
|
||||
emoji_objects.append(emoji)
|
||||
|
||||
# 存储到EmojiManager中
|
||||
self.emoji_objects = emoji_objects
|
||||
|
||||
return emoji_objects
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 获取所有表情包对象失败: {str(e)}")
|
||||
return []
|
||||
|
||||
async def get_emoji_from_manager(self, hash) -> MaiEmoji:
|
||||
"""从EmojiManager中获取表情包
|
||||
|
||||
参数:
|
||||
hash:如果提供则只返回指定哈希值的表情包
|
||||
"""
|
||||
for emoji in self.emoji_objects:
|
||||
if emoji.hash == hash:
|
||||
return emoji
|
||||
return None
|
||||
|
||||
|
||||
|
||||
async def delete_emoji(self, emoji_hash: str) -> bool:
|
||||
"""根据哈希值删除表情包
|
||||
|
||||
Args:
|
||||
emoji_hash: 表情包的哈希值
|
||||
|
||||
Returns:
|
||||
bool: 是否成功删除
|
||||
"""
|
||||
try:
|
||||
self._ensure_db()
|
||||
|
||||
# 从emoji_objects中查找表情包对象
|
||||
emoji = await self.get_emoji_from_manager(emoji_hash)
|
||||
|
||||
if not emoji:
|
||||
logger.warning(f"[警告] 未找到哈希值为 {emoji_hash} 的表情包")
|
||||
return False
|
||||
|
||||
# 使用MaiEmoji对象的delete方法删除表情包
|
||||
success = await emoji.delete()
|
||||
|
||||
if success:
|
||||
# 从emoji_objects列表中移除该对象
|
||||
self.emoji_objects = [e for e in self.emoji_objects if e.hash != emoji_hash]
|
||||
# 更新计数
|
||||
self.emoji_num -= 1
|
||||
logger.info(f"[统计] 当前表情包数量: {self.emoji_num}")
|
||||
|
||||
return True
|
||||
else:
|
||||
logger.error(f"[错误] 删除表情包失败: {emoji_hash}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 删除表情包失败: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
return False
|
||||
|
||||
def _emoji_objects_to_readable_list(self, emoji_objects):
|
||||
"""将表情包对象列表转换为可读的字符串列表
|
||||
|
||||
参数:
|
||||
emoji_objects: MaiEmoji对象列表
|
||||
|
||||
返回:
|
||||
list[str]: 可读的表情包信息字符串列表
|
||||
"""
|
||||
emoji_info_list = []
|
||||
for i, emoji in enumerate(emoji_objects):
|
||||
# 转换时间戳为可读时间
|
||||
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(emoji.register_time))
|
||||
# 构建每个表情包的信息字符串
|
||||
emoji_info = (
|
||||
f"编号: {i+1}\n"
|
||||
f"描述: {emoji.description}\n"
|
||||
f"使用次数: {emoji.usage_count}\n"
|
||||
f"添加时间: {time_str}\n"
|
||||
)
|
||||
emoji_info_list.append(emoji_info)
|
||||
return emoji_info_list
|
||||
|
||||
async def replace_a_emoji(self, new_emoji: MaiEmoji):
|
||||
"""替换一个表情包
|
||||
|
||||
Args:
|
||||
new_emoji: 新表情包对象
|
||||
|
||||
Returns:
|
||||
bool: 是否成功替换表情包
|
||||
"""
|
||||
try:
|
||||
self._ensure_db()
|
||||
|
||||
# 获取所有表情包对象
|
||||
all_emojis = self.emoji_objects
|
||||
|
||||
# 将表情包信息转换为可读的字符串
|
||||
emoji_info_list = self._emoji_objects_to_readable_list(all_emojis)
|
||||
|
||||
# 构建提示词
|
||||
prompt = (
|
||||
f"{global_config.BOT_NICKNAME}的表情包存储已满({self.emoji_num}/{self.emoji_num_max}),"
|
||||
f"需要决定是否删除一个旧表情包来为新表情包腾出空间。\n\n"
|
||||
f"新表情包信息:\n"
|
||||
f"描述: {new_emoji.description}\n\n"
|
||||
f"现有表情包列表:\n" + "\n".join(emoji_info_list) + "\n\n"
|
||||
f"请决定:\n"
|
||||
f"1. 是否要删除某个现有表情包来为新表情包腾出空间?\n"
|
||||
f"2. 如果要删除,应该删除哪一个(给出编号)?\n"
|
||||
f"请只回答:'不删除'或'删除编号X'(X为表情包编号)。"
|
||||
)
|
||||
|
||||
# 调用大模型进行决策
|
||||
decision, _ = await self.llm_emotion_judge.generate_response_async(prompt, temperature=0.8)
|
||||
logger.info(f"[决策] 大模型决策结果: {decision}")
|
||||
|
||||
# 解析决策结果
|
||||
if "不删除" in decision:
|
||||
logger.info("[决策] 决定不删除任何表情包")
|
||||
return False
|
||||
|
||||
# 尝试从决策中提取表情包编号
|
||||
match = re.search(r'删除编号(\d+)', decision)
|
||||
if match:
|
||||
emoji_index = int(match.group(1)) - 1 # 转换为0-based索引
|
||||
|
||||
# 检查索引是否有效
|
||||
if 0 <= emoji_index < len(all_emojis):
|
||||
emoji_to_delete = all_emojis[emoji_index]
|
||||
|
||||
# 删除选定的表情包
|
||||
logger.info(f"[决策] 决定删除表情包: {emoji_to_delete.description}")
|
||||
delete_success = await self.delete_emoji(emoji_to_delete.hash)
|
||||
|
||||
if delete_success:
|
||||
# 修复:等待异步注册完成
|
||||
register_success = await new_emoji.register_to_db()
|
||||
if register_success:
|
||||
self.emoji_objects.append(new_emoji)
|
||||
self.emoji_num += 1
|
||||
logger.success(f"[成功] 注册表情包: {new_emoji.description}")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"[错误] 注册表情包到数据库失败: {new_emoji.filename}")
|
||||
return False
|
||||
else:
|
||||
logger.error(f"[错误] 删除表情包失败,无法完成替换")
|
||||
return False
|
||||
else:
|
||||
logger.error(f"[错误] 无效的表情包编号: {emoji_index+1}")
|
||||
else:
|
||||
logger.error(f"[错误] 无法从决策中提取表情包编号: {decision}")
|
||||
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 替换表情包失败: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
return False
|
||||
|
||||
async def build_emoji_description(self, image_base64: str) -> Tuple[str, list]:
|
||||
"""获取表情包描述和情感列表
|
||||
|
||||
Args:
|
||||
image_base64: 图片的base64编码
|
||||
|
||||
Returns:
|
||||
Tuple[str, list]: 返回表情包描述和情感列表
|
||||
"""
|
||||
try:
|
||||
# 解码图片并获取格式
|
||||
image_bytes = base64.b64decode(image_base64)
|
||||
image_format = Image.open(io.BytesIO(image_bytes)).format.lower()
|
||||
|
||||
# 调用AI获取描述
|
||||
if image_format == "gif" or image_format == "GIF":
|
||||
image_base64 = image_manager.transform_gif(image_base64)
|
||||
prompt = "这是一个动态图表情包,每一张图代表了动态图的某一帧,黑色背景代表透明,详细描述一下表情包表达的情感和内容,请关注其幽默和讽刺意味"
|
||||
description, _ = await self.vlm.generate_response_for_image(prompt, image_base64, "jpg")
|
||||
else:
|
||||
prompt = "这是一个表情包,请详细描述一下表情包所表达的情感和内容,请关注其幽默和讽刺意味"
|
||||
description, _ = await self.vlm.generate_response_for_image(prompt, image_base64, image_format)
|
||||
|
||||
# 审核表情包
|
||||
if global_config.EMOJI_CHECK:
|
||||
prompt = f'''
|
||||
这是一个表情包,请对这个表情包进行审核,标准如下:
|
||||
1. 必须符合"{global_config.EMOJI_CHECK_PROMPT}"的要求
|
||||
2. 不能是色情、暴力、等违法违规内容,必须符合公序良俗
|
||||
3. 不能是任何形式的截图,聊天记录或视频截图
|
||||
4. 不要出现5个以上文字
|
||||
请回答这个表情包是否满足上述要求,是则回答是,否则回答否,不要出现任何其他内容
|
||||
'''
|
||||
content, _ = await self.vlm.generate_response_for_image(prompt, image_base64, image_format)
|
||||
if content == "否":
|
||||
return None, []
|
||||
|
||||
# 分析情感含义
|
||||
emotion_prompt = f'''
|
||||
基于这个表情包的描述:'{description}',请列出1-3个可能的情感标签,每个标签用一个词组表示,格式如下:
|
||||
幽默的讽刺
|
||||
悲伤的无奈
|
||||
愤怒的抗议
|
||||
愤怒的讽刺
|
||||
直接输出词组,词组检用逗号分隔。'''
|
||||
emotions_text, _ = await self.llm_emotion_judge.generate_response_async(emotion_prompt, temperature=0.7)
|
||||
|
||||
# 处理情感列表
|
||||
emotions = [e.strip() for e in emotions_text.split(',') if e.strip()]
|
||||
|
||||
return f"[表情包:{description}]", emotions
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取表情包描述失败: {str(e)}")
|
||||
return "", []
|
||||
|
||||
|
||||
async def register_emoji_by_filename(self, filename: str) -> bool:
|
||||
"""读取指定文件名的表情包图片,分析并注册到数据库
|
||||
|
||||
Args:
|
||||
filename: 表情包文件名,必须位于EMOJI_DIR目录下
|
||||
|
||||
Returns:
|
||||
bool: 注册是否成功
|
||||
"""
|
||||
try:
|
||||
# 使用MaiEmoji类创建表情包实例
|
||||
new_emoji = MaiEmoji(filename, EMOJI_DIR)
|
||||
await new_emoji.initialize_hash_format()
|
||||
emoji_base64 = image_path_to_base64(os.path.join(EMOJI_DIR, filename))
|
||||
description, emotions = await self.build_emoji_description(emoji_base64)
|
||||
if description == "":
|
||||
return False
|
||||
new_emoji.description = description
|
||||
new_emoji.emotion = emotions
|
||||
|
||||
# 检查是否已经注册过
|
||||
# 对比数据库中是否存在相同哈希值的表情包
|
||||
if await self.get_emoji_from_manager(new_emoji.hash):
|
||||
logger.warning(f"[警告] 表情包已存在: {filename}")
|
||||
return False
|
||||
|
||||
if self.emoji_num >= self.emoji_num_max:
|
||||
logger.warning(f"表情包数量已达到上限({self.emoji_num}/{self.emoji_num_max})")
|
||||
replaced = await self.replace_a_emoji(new_emoji)
|
||||
if not replaced:
|
||||
logger.error("[错误] 替换表情包失败,无法完成注册")
|
||||
return False
|
||||
else:
|
||||
# 修复:等待异步注册完成
|
||||
register_success = await new_emoji.register_to_db()
|
||||
if register_success:
|
||||
self.emoji_objects.append(new_emoji)
|
||||
self.emoji_num += 1
|
||||
logger.success(f"[成功] 注册表情包: {filename}")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"[错误] 注册表情包到数据库失败: {filename}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[错误] 注册表情包失败: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
return False
|
||||
|
||||
|
||||
# 创建全局单例
|
||||
emoji_manager = EmojiManager()
|
||||
@@ -15,7 +15,7 @@ from src.plugins.utils.timer_calculater import Timer # <--- Import Timer
|
||||
from src.plugins.heartFC_chat.heartFC_generator import HeartFCGenerator
|
||||
from src.do_tool.tool_use import ToolUser
|
||||
from ..chat.message_sender import message_manager # <-- Import the global manager
|
||||
from src.plugins.chat.emoji_manager import emoji_manager
|
||||
from src.plugins.emoji_system.emoji_manager import emoji_manager
|
||||
from src.plugins.utils.json_utils import process_llm_tool_response # 导入新的JSON工具
|
||||
from src.heart_flow.sub_mind import SubMind
|
||||
from src.heart_flow.observation import Observation
|
||||
|
||||
@@ -27,13 +27,15 @@ def init_prompt():
|
||||
{chat_talking_prompt}
|
||||
现在你想要在群里发言或者回复。\n
|
||||
你需要扮演一位网名叫{bot_name}的人进行回复,这个人的特点是:"{prompt_personality} {prompt_identity}"。
|
||||
你正在{chat_target_2},现在请你读读之前的聊天记录,然后给出日常且口语化的回复,平淡一些,你可以参考贴吧,小红书或者微博的回复风格。
|
||||
你刚刚脑子里在想:
|
||||
你正在{chat_target_2},现在请你读读之前的聊天记录,然后给出日常且口语化的回复,平淡一些,你可以参考贴吧,知乎或者微博的回复风格。
|
||||
看到以上聊天记录,你刚刚在想:
|
||||
|
||||
{current_mind_info}
|
||||
{reason}
|
||||
因为上述想法,你决定发言,原因是:{reason}
|
||||
|
||||
回复尽量简短一些。请注意把握聊天内容,不要回复的太有条理,可以有个性。请一次只回复一个话题,不要同时回复多个人,不用指出你回复的是谁。{prompt_ger}
|
||||
请回复的平淡一些,简短一些,说中文,不要刻意突出自身学科背景,不要说你说过的话题 ,注意只输出回复内容。
|
||||
{moderation_prompt}。注意:不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。""",
|
||||
{moderation_prompt}。注意:回复不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。""",
|
||||
"heart_flow_prompt",
|
||||
)
|
||||
|
||||
@@ -47,8 +49,12 @@ def init_prompt():
|
||||
请结合你的内心想法和观察到的聊天内容,分析情况并使用 'decide_reply_action' 工具来决定你的最终行动。
|
||||
决策依据:
|
||||
1. 如果聊天内容无聊、与你无关、或者你的内心想法认为不适合回复(例如在讨论你不懂或不感兴趣的话题),选择 'no_reply'。
|
||||
2. 如果聊天内容值得回应,且适合用文字表达(参考你的内心想法),选择 'text_reply'。如果你有情绪想表达,想在文字后追加一个表达情绪的表情,请同时提供 'emoji_query' (例如:'开心的'、'惊讶的')。
|
||||
3. 如果聊天内容或你的内心想法适合用一个表情来回应(例如表示赞同、惊讶、无语等),选择 'emoji_reply' 并提供表情主题 'emoji_query'。
|
||||
2. 如果聊天内容值得回应,且适合用文字表达(参考你的内心想法),选择 'text_reply'。如果你有情绪想表达,想在文字后追加一个表达情绪的表情,请同时提供 'emoji_query' (每个标签用一个词组表示,格式如下:
|
||||
幽默的讽刺
|
||||
悲伤的无奈
|
||||
愤怒的抗议
|
||||
愤怒的讽刺)。
|
||||
3. 如果聊天内容或你的内心想法适合用一个表情来回应,选择 'emoji_reply' 并提供表情主题 'emoji_query'。
|
||||
4. 如果最后一条消息是你自己发的,观察到的内容只有你自己的发言,并且之后没有人回复你,通常选择 'no_reply',除非有特殊原因需要追问。
|
||||
5. 如果聊天记录中最新的消息是你自己发送的,并且你还想继续回复,你应该紧紧衔接你发送的消息,进行话题的深入,补充,或追问等等;。
|
||||
6. 表情包是用来表达情绪的,不要直接回复或评价别人的表情包,而是根据对话内容和情绪选择是否用表情回应。
|
||||
|
||||
@@ -6,7 +6,7 @@ from typing import List, Optional # 导入 Optional
|
||||
|
||||
from ..moods.moods import MoodManager
|
||||
from ...config.config import global_config
|
||||
from ..chat.emoji_manager import emoji_manager
|
||||
from ..emoji_system.emoji_manager import emoji_manager
|
||||
from .normal_chat_generator import NormalChatGenerator
|
||||
from ..chat.message import MessageSending, MessageRecv, MessageThinking, MessageSet
|
||||
from ..chat.message_sender import message_manager
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[inner]
|
||||
version = "1.4.1"
|
||||
version = "1.4.2"
|
||||
|
||||
#----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读----
|
||||
#如果你想要修改配置文件,请在修改后将version的值进行变更
|
||||
@@ -125,8 +125,13 @@ at_bot_inevitable_reply = false # @bot 必然回复
|
||||
max_emoji_num = 90 # 表情包最大数量
|
||||
max_reach_deletion = true # 开启则在达到最大数量时删除表情包,关闭则达到最大数量时不删除,只是不会继续收集表情包
|
||||
check_interval = 30 # 检查表情包(注册,破损,删除)的时间间隔(分钟)
|
||||
|
||||
auto_save = true # 是否保存表情包和图片
|
||||
|
||||
save_pic = false # 是否保存图片
|
||||
save_emoji = false # 是否保存表情包
|
||||
steal_emoji = true # 是否偷取表情包,让麦麦可以发送她保存的这些表情包
|
||||
|
||||
enable_check = false # 是否启用表情包过滤,只有符合该要求的表情包才会被保存
|
||||
check_prompt = "符合公序良俗" # 表情包过滤要求,只有符合该要求的表情包才会被保存
|
||||
|
||||
|
||||
Reference in New Issue
Block a user