Merge branch 'refactor' into msg_buffer

This commit is contained in:
meng_xi_pan
2025-04-04 18:07:17 +08:00
23 changed files with 761 additions and 199 deletions

View File

@@ -75,25 +75,48 @@ class ChatBot:
- 表情包处理
- 性能计时
"""
message = MessageRecv(message_data)
groupinfo = message.message_info.group_info
try:
message = MessageRecv(message_data)
groupinfo = message.message_info.group_info
logger.debug(f"处理消息:{str(message_data)[:50]}...")
if global_config.enable_pfc_chatting:
try:
if global_config.enable_pfc_chatting:
try:
if groupinfo is None and global_config.enable_friend_chat:
userinfo = message.message_info.user_info
messageinfo = message.message_info
# 创建聊天流
chat = await chat_manager.get_or_create_stream(
platform=messageinfo.platform,
user_info=userinfo,
group_info=groupinfo,
)
message.update_chat_stream(chat)
await self.only_process_chat.process_message(message)
await self._create_PFC_chat(message)
else:
if groupinfo.group_id in global_config.talk_allowed_groups:
logger.debug(f"开始群聊模式{message_data}")
if global_config.response_mode == "heart_flow":
await self.think_flow_chat.process_message(message_data)
elif global_config.response_mode == "reasoning":
logger.debug(f"开始推理模式{message_data}")
await self.reasoning_chat.process_message(message_data)
else:
logger.error(f"未知的回复模式,请检查配置文件!!: {global_config.response_mode}")
except Exception as e:
logger.error(f"处理PFC消息失败: {e}")
else:
if groupinfo is None and global_config.enable_friend_chat:
userinfo = message.message_info.user_info
messageinfo = message.message_info
# 创建聊天流
chat = await chat_manager.get_or_create_stream(
platform=messageinfo.platform,
user_info=userinfo,
group_info=groupinfo,
)
message.update_chat_stream(chat)
await self.only_process_chat.process_message(message)
await self._create_PFC_chat(message)
else:
# 私聊处理流程
# await self._handle_private_chat(message)
if global_config.response_mode == "heart_flow":
await self.think_flow_chat.process_message(message_data)
elif global_config.response_mode == "reasoning":
await self.reasoning_chat.process_message(message_data)
else:
logger.error(f"未知的回复模式,请检查配置文件!!: {global_config.response_mode}")
else: # 群聊处理
if groupinfo.group_id in global_config.talk_allowed_groups:
if global_config.response_mode == "heart_flow":
await self.think_flow_chat.process_message(message_data)
@@ -101,26 +124,8 @@ class ChatBot:
await self.reasoning_chat.process_message(message_data)
else:
logger.error(f"未知的回复模式,请检查配置文件!!: {global_config.response_mode}")
except Exception as e:
logger.error(f"处理PFC消息失败: {e}")
else:
if groupinfo is None and global_config.enable_friend_chat:
# 私聊处理流程
# await self._handle_private_chat(message)
if global_config.response_mode == "heart_flow":
await self.think_flow_chat.process_message(message_data)
elif global_config.response_mode == "reasoning":
await self.reasoning_chat.process_message(message_data)
else:
logger.error(f"未知的回复模式,请检查配置文件!!: {global_config.response_mode}")
else: # 群聊处理
if groupinfo.group_id in global_config.talk_allowed_groups:
if global_config.response_mode == "heart_flow":
await self.think_flow_chat.process_message(message_data)
elif global_config.response_mode == "reasoning":
await self.reasoning_chat.process_message(message_data)
else:
logger.error(f"未知的回复模式,请检查配置文件!!: {global_config.response_mode}")
except Exception as e:
logger.error(f"处理消息失败: {e}")
# 创建全局ChatBot实例

View File

@@ -39,12 +39,28 @@ class EmojiManager:
model=global_config.llm_emotion_judge, max_tokens=600, temperature=0.8, request_type="emoji"
) # 更高的温度更少的token后续可以根据情绪来调整温度
self.emoji_num = 0
self.emoji_num_max = global_config.max_emoji_num
self.emoji_num_max_reach_deletion = global_config.max_reach_deletion
logger.info("启动表情包管理器")
def _ensure_emoji_dir(self):
"""确保表情存储目录存在"""
os.makedirs(self.EMOJI_DIR, exist_ok=True)
def _update_emoji_count(self):
"""更新表情包数量统计
检查数据库中的表情包数量并更新到 self.emoji_num
"""
try:
self._ensure_db()
self.emoji_num = db.emoji.count_documents({})
logger.info(f"[统计] 当前表情包数量: {self.emoji_num}")
except Exception as e:
logger.error(f"[错误] 更新表情包数量失败: {str(e)}")
def initialize(self):
"""初始化数据库连接和表情目录"""
if not self._initialized:
@@ -52,6 +68,8 @@ class EmojiManager:
self._ensure_emoji_collection()
self._ensure_emoji_dir()
self._initialized = True
# 更新表情包数量
self._update_emoji_count()
# 启动时执行一次完整性检查
self.check_emoji_file_integrity()
except Exception:
@@ -339,13 +357,7 @@ class EmojiManager:
except Exception:
logger.exception("[错误] 扫描表情包失败")
async def start_periodic_register(self):
"""定期扫描新表情包"""
while True:
logger.info("[扫描] 开始扫描新表情包...")
await self.scan_new_emojis()
await asyncio.sleep(global_config.EMOJI_CHECK_INTERVAL * 60)
def check_emoji_file_integrity(self):
"""检查表情包文件完整性
@@ -418,12 +430,136 @@ class EmojiManager:
logger.error(f"[错误] 检查表情包完整性失败: {str(e)}")
logger.error(traceback.format_exc())
async def start_periodic_check(self):
def check_emoji_file_full(self):
"""检查表情包文件是否完整,如果数量超出限制且允许删除,则删除多余的表情包
删除规则:
1. 优先删除创建时间更早的表情包
2. 优先删除使用次数少的表情包,但使用次数多的也有小概率被删除
"""
try:
self._ensure_db()
# 更新表情包数量
self._update_emoji_count()
# 检查是否超出限制
if self.emoji_num <= self.emoji_num_max:
return
# 如果超出限制但不允许删除,则只记录警告
if not global_config.max_reach_deletion:
logger.warning(f"[警告] 表情包数量({self.emoji_num})超出限制({self.emoji_num_max}),但未开启自动删除")
return
# 计算需要删除的数量
delete_count = self.emoji_num - self.emoji_num_max
logger.info(f"[清理] 需要删除 {delete_count} 个表情包")
# 获取所有表情包,按时间戳升序(旧的在前)排序
all_emojis = list(db.emoji.find().sort([("timestamp", 1)]))
# 计算权重:使用次数越多,被删除的概率越小
weights = []
max_usage = max((emoji.get("usage_count", 0) for emoji in all_emojis), default=1)
for emoji in all_emojis:
usage_count = emoji.get("usage_count", 0)
# 使用指数衰减函数计算权重,使用次数越多权重越小
weight = 1.0 / (1.0 + usage_count / max(1, max_usage))
weights.append(weight)
# 根据权重随机选择要删除的表情包
to_delete = []
remaining_indices = list(range(len(all_emojis)))
while len(to_delete) < delete_count and remaining_indices:
# 计算当前剩余表情包的权重
current_weights = [weights[i] for i in remaining_indices]
# 归一化权重
total_weight = sum(current_weights)
if total_weight == 0:
break
normalized_weights = [w/total_weight for w in current_weights]
# 随机选择一个表情包
selected_idx = random.choices(remaining_indices, weights=normalized_weights, k=1)[0]
to_delete.append(all_emojis[selected_idx])
remaining_indices.remove(selected_idx)
# 删除选中的表情包
deleted_count = 0
for emoji in to_delete:
try:
# 删除文件
if "path" in emoji and os.path.exists(emoji["path"]):
os.remove(emoji["path"])
logger.info(f"[删除] 文件: {emoji['path']} (使用次数: {emoji.get('usage_count', 0)})")
# 删除数据库记录
db.emoji.delete_one({"_id": emoji["_id"]})
deleted_count += 1
# 同时从images集合中删除
if "hash" in emoji:
db.images.delete_one({"hash": emoji["hash"]})
except Exception as e:
logger.error(f"[错误] 删除表情包失败: {str(e)}")
continue
# 更新表情包数量
self._update_emoji_count()
logger.success(f"[清理] 已删除 {deleted_count} 个表情包,当前数量: {self.emoji_num}")
except Exception as e:
logger.error(f"[错误] 检查表情包数量失败: {str(e)}")
async def start_periodic_check_register(self):
"""定期检查表情包完整性和数量"""
while True:
logger.info("[扫描] 开始检查表情包完整性...")
self.check_emoji_file_integrity()
logger.info("[扫描] 开始删除所有图片缓存...")
await self.delete_all_images()
logger.info("[扫描] 开始扫描新表情包...")
if self.emoji_num < self.emoji_num_max:
await self.scan_new_emojis()
if (self.emoji_num > self.emoji_num_max):
logger.warning(f"[警告] 表情包数量超过最大限制: {self.emoji_num} > {self.emoji_num_max},跳过注册")
if not global_config.max_reach_deletion:
logger.warning("表情包数量超过最大限制,终止注册")
break
else:
logger.warning("表情包数量超过最大限制,开始删除表情包")
self.check_emoji_file_full()
await asyncio.sleep(global_config.EMOJI_CHECK_INTERVAL * 60)
async def delete_all_images(self):
"""删除 data/image 目录下的所有文件"""
try:
image_dir = os.path.join("data", "image")
if not os.path.exists(image_dir):
logger.warning(f"[警告] 目录不存在: {image_dir}")
return
deleted_count = 0
failed_count = 0
# 遍历目录下的所有文件
for filename in os.listdir(image_dir):
file_path = os.path.join(image_dir, filename)
try:
if os.path.isfile(file_path):
os.remove(file_path)
deleted_count += 1
logger.debug(f"[删除] 文件: {file_path}")
except Exception as e:
failed_count += 1
logger.error(f"[错误] 删除文件失败 {file_path}: {str(e)}")
logger.success(f"[清理] 已删除 {deleted_count} 个文件,失败 {failed_count}")
except Exception as e:
logger.error(f"[错误] 删除图片目录失败: {str(e)}")
# 创建全局单例
emoji_manager = EmojiManager()

View File

@@ -31,7 +31,7 @@ class Message(MessageBase):
def __init__(
self,
message_id: str,
time: int,
time: float,
chat_stream: ChatStream,
user_info: UserInfo,
message_segment: Optional[Seg] = None,

View File

@@ -9,7 +9,7 @@ from .message import MessageSending, MessageThinking, MessageSet
from ..storage.storage import MessageStorage
from ..config.config import global_config
from .utils import truncate_message, calculate_typing_time
from .utils import truncate_message, calculate_typing_time, count_messages_between
from src.common.logger import LogConfig, SENDER_STYLE_CONFIG
@@ -69,9 +69,14 @@ class Message_Sender:
if end_point:
# logger.info(f"发送消息到{end_point}")
# logger.info(message_json)
await global_api.send_message(end_point, message_json)
await global_api.send_message_REST(end_point, message_json)
else:
raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置请检查配置文件")
try:
await global_api.send_message(message)
except Exception as e:
raise ValueError(
f"未找到平台:{message.message_info.platform} 的url配置请检查配置文件"
) from e
logger.success(f"发送消息“{message_preview}”成功")
except Exception as e:
logger.error(f"发送消息“{message_preview}”失败: {str(e)}")
@@ -85,16 +90,16 @@ class MessageContainer:
self.max_size = max_size
self.messages = []
self.last_send_time = 0
self.thinking_timeout = 10 # 思考等待超时时间(秒)
self.thinking_wait_timeout = 20 # 思考等待超时时间(秒)
def get_timeout_messages(self) -> List[MessageSending]:
"""获取所有超时的Message_Sending对象思考时间超过30秒按thinking_start_time排序"""
"""获取所有超时的Message_Sending对象思考时间超过20秒按thinking_start_time排序"""
current_time = time.time()
timeout_messages = []
for msg in self.messages:
if isinstance(msg, MessageSending):
if current_time - msg.thinking_start_time > self.thinking_timeout:
if current_time - msg.thinking_start_time > self.thinking_wait_timeout:
timeout_messages.append(msg)
# 按thinking_start_time排序时间早的在前面
@@ -172,6 +177,7 @@ class MessageManager:
message_earliest = container.get_earliest_message()
if isinstance(message_earliest, MessageThinking):
"""取得了思考消息"""
message_earliest.update_thinking_time()
thinking_time = message_earliest.thinking_time
# print(thinking_time)
@@ -187,14 +193,20 @@ class MessageManager:
container.remove_message(message_earliest)
else:
# print(message_earliest.is_head)
# print(message_earliest.update_thinking_time())
# print(message_earliest.is_private_message())
"""取得了发送消息"""
thinking_time = message_earliest.update_thinking_time()
print(thinking_time)
thinking_start_time = message_earliest.thinking_start_time
now_time = time.time()
thinking_messages_count, thinking_messages_length = count_messages_between(
start_time=thinking_start_time, end_time=now_time, stream_id=message_earliest.chat_stream.stream_id
)
# print(thinking_time)
# print(thinking_messages_count)
# print(thinking_messages_length)
if (
message_earliest.is_head
and message_earliest.update_thinking_time() > 18
and (thinking_messages_count > 4 or thinking_messages_length > 250)
and not message_earliest.is_private_message() # 避免在私聊时插入reply
):
logger.debug(f"设置回复消息{message_earliest.processed_plain_text}")
@@ -216,12 +228,18 @@ class MessageManager:
continue
try:
# print(msg.is_head)
print(msg.update_thinking_time())
# print(msg.is_private_message())
thinking_time = msg.update_thinking_time()
thinking_start_time = msg.thinking_start_time
now_time = time.time()
thinking_messages_count, thinking_messages_length = count_messages_between(
start_time=thinking_start_time, end_time=now_time, stream_id=msg.chat_stream.stream_id
)
# print(thinking_time)
# print(thinking_messages_count)
# print(thinking_messages_length)
if (
msg.is_head
and msg.update_thinking_time() > 18
and (thinking_messages_count > 4 or thinking_messages_length > 250)
and not msg.is_private_message() # 避免在私聊时插入reply
):
logger.debug(f"设置回复消息{msg.processed_plain_text}")

View File

@@ -487,3 +487,108 @@ def is_western_char(char):
def is_western_paragraph(paragraph):
"""检测是否为西文字符段落"""
return all(is_western_char(char) for char in paragraph if char.isalnum())
def count_messages_between(start_time: float, end_time: float, stream_id: str) -> tuple[int, int]:
"""计算两个时间点之间的消息数量和文本总长度
Args:
start_time (float): 起始时间戳
end_time (float): 结束时间戳
stream_id (str): 聊天流ID
Returns:
tuple[int, int]: (消息数量, 文本总长度)
- 消息数量:包含起始时间的消息,不包含结束时间的消息
- 文本总长度所有消息的processed_plain_text长度之和
"""
try:
# 获取开始时间之前最新的一条消息
start_message = db.messages.find_one(
{
"chat_id": stream_id,
"time": {"$lte": start_time}
},
sort=[("time", -1), ("_id", -1)] # 按时间倒序_id倒序最后插入的在前
)
# 获取结束时间最近的一条消息
# 先找到结束时间点的所有消息
end_time_messages = list(db.messages.find(
{
"chat_id": stream_id,
"time": {"$lte": end_time}
},
sort=[("time", -1)] # 先按时间倒序
).limit(10)) # 限制查询数量,避免性能问题
if not end_time_messages:
logger.warning(f"未找到结束时间 {end_time} 之前的消息")
return 0, 0
# 找到最大时间
max_time = end_time_messages[0]["time"]
# 在最大时间的消息中找最后插入的_id最大的
end_message = max(
[msg for msg in end_time_messages if msg["time"] == max_time],
key=lambda x: x["_id"]
)
if not start_message:
logger.warning(f"未找到开始时间 {start_time} 之前的消息")
return 0, 0
# 调试输出
# print("\n=== 消息范围信息 ===")
# print("Start message:", {
# "message_id": start_message.get("message_id"),
# "time": start_message.get("time"),
# "text": start_message.get("processed_plain_text", ""),
# "_id": str(start_message.get("_id"))
# })
# print("End message:", {
# "message_id": end_message.get("message_id"),
# "time": end_message.get("time"),
# "text": end_message.get("processed_plain_text", ""),
# "_id": str(end_message.get("_id"))
# })
# print("Stream ID:", stream_id)
# 如果结束消息的时间等于开始时间返回0
if end_message["time"] == start_message["time"]:
return 0, 0
# 获取并打印这个时间范围内的所有消息
# print("\n=== 时间范围内的所有消息 ===")
all_messages = list(db.messages.find(
{
"chat_id": stream_id,
"time": {
"$gte": start_message["time"],
"$lte": end_message["time"]
}
},
sort=[("time", 1), ("_id", 1)] # 按时间正序_id正序
))
count = 0
total_length = 0
for msg in all_messages:
count += 1
text_length = len(msg.get("processed_plain_text", ""))
total_length += text_length
# print(f"\n消息 {count}:")
# print({
# "message_id": msg.get("message_id"),
# "time": msg.get("time"),
# "text": msg.get("processed_plain_text", ""),
# "text_length": text_length,
# "_id": str(msg.get("_id"))
# })
# 如果时间不同需要把end_message本身也计入
return count - 1, total_length
except Exception as e:
logger.error(f"计算消息数量时出错: {str(e)}")
return 0, 0

View File

@@ -112,8 +112,13 @@ class ImageManager:
return f"[表情包:{cached_description}]"
# 调用AI获取描述
prompt = "这是一个表情包,使用中文简洁的描述一下表情包的内容和表情包所表达的情感"
description, _ = await self._llm.generate_response_for_image(prompt, image_base64, image_format)
if image_format == "gif" or image_format == "GIF":
image_base64 = self.transform_gif(image_base64)
prompt = "这是一个动态图表情包,每一张图代表了动态图的某一帧,黑色背景代表透明,使用中文简洁的描述一下表情包的内容和表达的情感,简短一些"
description, _ = await self._llm.generate_response_for_image(prompt, image_base64, "jpg")
else:
prompt = "这是一个表情包,使用中文简洁的描述一下表情包的内容和表情包所表达的情感"
description, _ = await self._llm.generate_response_for_image(prompt, image_base64, image_format)
cached_description = self._get_description_from_db(image_hash, "emoji")
if cached_description:
@@ -221,6 +226,72 @@ class ImageManager:
logger.error(f"获取图片描述失败: {str(e)}")
return "[图片]"
def transform_gif(self, gif_base64: str) -> str:
"""将GIF转换为水平拼接的静态图像
Args:
gif_base64: GIF的base64编码字符串
Returns:
str: 拼接后的JPG图像的base64编码字符串
"""
try:
# 解码base64
gif_data = base64.b64decode(gif_base64)
gif = Image.open(io.BytesIO(gif_data))
# 收集所有帧
frames = []
try:
while True:
gif.seek(len(frames))
frame = gif.convert('RGB')
frames.append(frame.copy())
except EOFError:
pass
if not frames:
raise ValueError("No frames found in GIF")
# 计算需要抽取的帧的索引
total_frames = len(frames)
if total_frames <= 15:
selected_frames = frames
else:
# 均匀抽取10帧
indices = [int(i * (total_frames - 1) / 14) for i in range(15)]
selected_frames = [frames[i] for i in indices]
# 获取单帧的尺寸
frame_width, frame_height = selected_frames[0].size
# 计算目标尺寸,保持宽高比
target_height = 200 # 固定高度
target_width = int((target_height / frame_height) * frame_width)
# 调整所有帧的大小
resized_frames = [frame.resize((target_width, target_height), Image.Resampling.LANCZOS)
for frame in selected_frames]
# 创建拼接图像
total_width = target_width * len(resized_frames)
combined_image = Image.new('RGB', (total_width, target_height))
# 水平拼接图像
for idx, frame in enumerate(resized_frames):
combined_image.paste(frame, (idx * target_width, 0))
# 转换为base64
buffer = io.BytesIO()
combined_image.save(buffer, format='JPEG', quality=85)
result_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
return result_base64
except Exception as e:
logger.error(f"GIF转换失败: {str(e)}")
return None
# 创建全局单例
image_manager = ImageManager()