diff --git a/src/chat/affinity_flow/chatter.py b/src/chat/affinity_flow/chatter.py index e31fd6d60..8514350b2 100644 --- a/src/chat/affinity_flow/chatter.py +++ b/src/chat/affinity_flow/chatter.py @@ -47,14 +47,24 @@ class AffinityFlowChatter: 处理单个消息 Args: - message_data: 消息数据字典 + message_data: 消息数据字典,包含: + - message_info: 消息基本信息 + - processed_plain_text: 处理后的纯文本 + - context_messages: 上下文消息(历史+未读) + - unread_messages: 未读消息列表 Returns: 处理结果字典 """ try: - # 使用增强版规划器处理消息 - actions, target_message = await self.planner.plan(mode=ChatMode.FOCUS) + # 提取未读消息用于兴趣度计算 + unread_messages = message_data.get("unread_messages", []) + + # 使用增强版规划器处理消息,传递未读消息用于兴趣度计算 + actions, target_message = await self.planner.plan( + mode=ChatMode.FOCUS, + unread_messages=unread_messages + ) self.stats["plans_created"] += 1 # 执行动作(如果规划器返回了动作) @@ -75,10 +85,11 @@ class AffinityFlowChatter: "plan_created": True, "actions_count": len(actions) if actions else 0, "has_target_message": target_message is not None, + "unread_messages_processed": len(unread_messages), **execution_result, } - logger.info(f"聊天流 {self.stream_id} 消息处理成功: 动作数={result['actions_count']}") + logger.info(f"聊天流 {self.stream_id} 消息处理成功: 动作数={result['actions_count']}, 未读消息={result['unread_messages_processed']}") return result diff --git a/src/chat/affinity_flow/interest_scoring.py b/src/chat/affinity_flow/interest_scoring.py index 5515952d1..6e44ccd21 100644 --- a/src/chat/affinity_flow/interest_scoring.py +++ b/src/chat/affinity_flow/interest_scoring.py @@ -29,10 +29,9 @@ class InterestScoringSystem: # 评分权重 self.score_weights = { - "interest_match": 0.4, # 兴趣匹配度权重 + "interest_match": 0.5, # 兴趣匹配度权重 "relationship": 0.3, # 关系分权重 "mentioned": 0.2, # 是否提及bot权重 - "time_factor": 0.1, # 时间因子权重 } # 评分阈值 @@ -68,24 +67,19 @@ class InterestScoringSystem: relationship_score = self._calculate_relationship_score(message.user_info.user_id) # 3. 计算提及分数 - mentioned_score = self._calculate_mentioned_score(message.processed_plain_text, bot_nickname) - - # 4. 计算时间因子 - time_factor_score = self._calculate_time_factor_score(message.time) + mentioned_score = self._calculate_mentioned_score(message, bot_nickname) # 5. 计算总分 total_score = ( interest_match_score * self.score_weights["interest_match"] + relationship_score * self.score_weights["relationship"] + - mentioned_score * self.score_weights["mentioned"] + - time_factor_score * self.score_weights["time_factor"] + mentioned_score * self.score_weights["mentioned"] ) details = { "interest_match": f"兴趣匹配度: {interest_match_score:.2f}", "relationship": f"关系分: {relationship_score:.2f}", "mentioned": f"提及分数: {mentioned_score:.2f}", - "time_factor": f"时间因子: {time_factor_score:.2f}", } return InterestScore( @@ -94,7 +88,6 @@ class InterestScoringSystem: interest_match_score=interest_match_score, relationship_score=relationship_score, mentioned_score=mentioned_score, - time_factor_score=time_factor_score, details=details ) @@ -132,39 +125,16 @@ class InterestScoringSystem: return min(relationship_value, 1.0) return 0.3 # 默认新用户的基础分 - def _calculate_mentioned_score(self, content: str, bot_nickname: str) -> float: + def _calculate_mentioned_score(self, msg: DatabaseMessages, bot_nickname: str) -> float: """计算提及分数""" - if not content: + if not msg.processed_plain_text: return 0.0 - content_lower = content.lower() - bot_name_lower = bot_nickname.lower() - - if bot_name_lower in content_lower: + if msg.is_mentioned or (bot_nickname and bot_nickname in msg.processed_plain_text): return 1.0 - - # 检查是否被@提及 - if "@" in content and any(alias.lower() in content_lower for alias in global_config.bot.alias_names or []): - return 1.0 - + return 0.0 - - def _calculate_time_factor_score(self, timestamp: float) -> float: - """计算时间因子分数""" - message_time = datetime.fromtimestamp(timestamp) - current_time = datetime.now() - time_diff_hours = (current_time - message_time).total_seconds() / 3600 - - # 24小时内消息时间因子为1.0,之后逐渐衰减 - if time_diff_hours <= 24: - return 1.0 - elif time_diff_hours <= 72: # 3天内 - return 0.8 - elif time_diff_hours <= 168: # 7天内 - return 0.6 - else: - return 0.3 - + def should_reply(self, score: InterestScore) -> bool: """判断是否应该回复""" base_threshold = self.reply_threshold diff --git a/src/chat/message_manager/__init__.py b/src/chat/message_manager/__init__.py new file mode 100644 index 000000000..c52e7f1b2 --- /dev/null +++ b/src/chat/message_manager/__init__.py @@ -0,0 +1,16 @@ +""" +消息管理模块 +管理每个聊天流的上下文信息,包含历史记录和未读消息,定期检查并处理新消息 +""" + +from .message_manager import MessageManager, message_manager +from src.common.data_models.message_manager_data_model import StreamContext, MessageStatus, MessageManagerStats, StreamStats + +__all__ = [ + "MessageManager", + "message_manager", + "StreamContext", + "MessageStatus", + "MessageManagerStats", + "StreamStats" +] \ No newline at end of file diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py new file mode 100644 index 000000000..4d073ac10 --- /dev/null +++ b/src/chat/message_manager/message_manager.py @@ -0,0 +1,232 @@ +""" +消息管理模块 +管理每个聊天流的上下文信息,包含历史记录和未读消息,定期检查并处理新消息 +""" +import asyncio +import time +import traceback +from typing import Dict, Optional, Any + +from src.common.logger import get_logger +from src.common.data_models.database_data_model import DatabaseMessages +from src.common.data_models.message_manager_data_model import StreamContext, MessageManagerStats, StreamStats +from src.chat.affinity_flow.afc_manager import afc_manager + +logger = get_logger("message_manager") + + +class MessageManager: + """消息管理器""" + + def __init__(self, check_interval: float = 2.0): + self.stream_contexts: Dict[str, StreamContext] = {} + self.check_interval = check_interval # 检查间隔(秒) + self.is_running = False + self.manager_task: Optional[asyncio.Task] = None + + # 统计信息 + self.stats = MessageManagerStats() + + async def start(self): + """启动消息管理器""" + if self.is_running: + logger.warning("消息管理器已经在运行") + return + + self.is_running = True + self.manager_task = asyncio.create_task(self._manager_loop()) + logger.info("消息管理器已启动") + + async def stop(self): + """停止消息管理器""" + if not self.is_running: + return + + self.is_running = False + + # 停止所有流处理任务 + for context in self.stream_contexts.values(): + if context.processing_task and not context.processing_task.done(): + context.processing_task.cancel() + + # 停止管理器任务 + if self.manager_task and not self.manager_task.done(): + self.manager_task.cancel() + + logger.info("消息管理器已停止") + + def add_message(self, stream_id: str, message: DatabaseMessages): + """添加消息到指定聊天流""" + # 获取或创建流上下文 + if stream_id not in self.stream_contexts: + self.stream_contexts[stream_id] = StreamContext(stream_id=stream_id) + self.stats.total_streams += 1 + + context = self.stream_contexts[stream_id] + context.add_message(message) + + logger.debug(f"添加消息到聊天流 {stream_id}: {message.message_id}") + + async def _manager_loop(self): + """管理器主循环""" + while self.is_running: + try: + await self._check_all_streams() + await asyncio.sleep(self.check_interval) + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"消息管理器循环出错: {e}") + traceback.print_exc() + + async def _check_all_streams(self): + """检查所有聊天流""" + active_streams = 0 + total_unread = 0 + + for stream_id, context in self.stream_contexts.items(): + if not context.is_active: + continue + + active_streams += 1 + + # 检查是否有未读消息 + unread_messages = context.get_unread_messages() + if unread_messages: + total_unread += len(unread_messages) + + # 如果没有处理任务,创建一个 + if not context.processing_task or context.processing_task.done(): + context.processing_task = asyncio.create_task( + self._process_stream_messages(stream_id) + ) + + # 更新统计 + self.stats.active_streams = active_streams + self.stats.total_unread_messages = total_unread + + async def _process_stream_messages(self, stream_id: str): + """处理指定聊天流的消息""" + if stream_id not in self.stream_contexts: + return + + context = self.stream_contexts[stream_id] + + try: + # 获取未读消息 + unread_messages = context.get_unread_messages() + if not unread_messages: + return + + logger.debug(f"开始处理聊天流 {stream_id} 的 {len(unread_messages)} 条未读消息") + + # 获取上下文消息 + context_messages = context.get_context_messages() + + # 批量处理消息 + messages_data = [] + for msg in unread_messages: + message_data = { + "message_info": { + "platform": msg.user_info.platform, + "user_info": { + "user_id": msg.user_info.user_id, + "user_nickname": msg.user_info.user_nickname, + "user_cardname": msg.user_info.user_cardname, + "platform": msg.user_info.platform + }, + "group_info": { + "group_id": msg.group_info.group_id, + "group_name": msg.group_info.group_name, + "group_platform": msg.group_info.group_platform + } if msg.group_info else None + }, + "processed_plain_text": msg.processed_plain_text, + "context_messages": [ctx_msg.flatten() for ctx_msg in context_messages], + "unread_messages": unread_messages # 传递原始对象而不是字典 + } + messages_data.append(message_data) + + # 发送到AFC处理器 + if messages_data: + results = await afc_manager.process_messages_batch(stream_id, messages_data) + + # 处理结果,标记消息为已读 + for i, result in enumerate(results): + if result.get("success", False): + msg_id = unread_messages[i].message_id + context.mark_message_as_read(msg_id) + self.stats.total_processed_messages += 1 + logger.debug(f"消息 {msg_id} 处理完成,标记为已读") + + logger.debug(f"聊天流 {stream_id} 消息处理完成") + + except asyncio.CancelledError: + raise + except Exception as e: + logger.error(f"处理聊天流 {stream_id} 消息时出错: {e}") + traceback.print_exc() + + def deactivate_stream(self, stream_id: str): + """停用聊天流""" + if stream_id in self.stream_contexts: + context = self.stream_contexts[stream_id] + context.is_active = False + + # 取消处理任务 + if context.processing_task and not context.processing_task.done(): + context.processing_task.cancel() + + logger.info(f"停用聊天流: {stream_id}") + + def activate_stream(self, stream_id: str): + """激活聊天流""" + if stream_id in self.stream_contexts: + self.stream_contexts[stream_id].is_active = True + logger.info(f"激活聊天流: {stream_id}") + + def get_stream_stats(self, stream_id: str) -> Optional[StreamStats]: + """获取聊天流统计""" + if stream_id not in self.stream_contexts: + return None + + context = self.stream_contexts[stream_id] + return StreamStats( + stream_id=stream_id, + is_active=context.is_active, + unread_count=len(context.get_unread_messages()), + history_count=len(context.history_messages), + last_check_time=context.last_check_time, + has_active_task=context.processing_task and not context.processing_task.done() + ) + + def get_manager_stats(self) -> Dict[str, Any]: + """获取管理器统计""" + return { + "total_streams": self.stats.total_streams, + "active_streams": self.stats.active_streams, + "total_unread_messages": self.stats.total_unread_messages, + "total_processed_messages": self.stats.total_processed_messages, + "uptime": self.stats.uptime, + "start_time": self.stats.start_time + } + + def cleanup_inactive_streams(self, max_inactive_hours: int = 24): + """清理不活跃的聊天流""" + current_time = time.time() + max_inactive_seconds = max_inactive_hours * 3600 + + inactive_streams = [] + for stream_id, context in self.stream_contexts.items(): + if (current_time - context.last_check_time > max_inactive_seconds and + not context.get_unread_messages()): + inactive_streams.append(stream_id) + + for stream_id in inactive_streams: + self.deactivate_stream(stream_id) + del self.stream_contexts[stream_id] + logger.info(f"清理不活跃聊天流: {stream_id}") + + +# 创建全局消息管理器实例 +message_manager = MessageManager() \ No newline at end of file diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 6e593035c..e5cf72aad 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -11,12 +11,12 @@ from src.mood.mood_manager import mood_manager # 导入情绪管理器 from src.chat.message_receive.chat_stream import get_chat_manager, ChatStream from src.chat.message_receive.message import MessageRecv, MessageRecvS4U from src.chat.message_receive.storage import MessageStorage -from src.chat.affinity_flow.afc_manager import afc_manager +from src.chat.message_manager import message_manager from src.chat.utils.prompt import Prompt, global_prompt_manager from src.plugin_system.core import component_registry, event_manager, global_announcement_manager from src.plugin_system.base import BaseCommand, EventType from src.mais4u.mais4u_chat.s4u_msg_processor import S4UMessageProcessor - +from src.chat.utils.utils import is_mentioned_bot_in_message # 导入反注入系统 from src.chat.antipromptinjector import initialize_anti_injector @@ -80,6 +80,9 @@ class ChatBot: # 初始化反注入系统 self._initialize_anti_injector() + # 启动消息管理器 + self._message_manager_started = False + def _initialize_anti_injector(self): """初始化反注入系统""" try: @@ -98,6 +101,12 @@ class ChatBot: if not self._started: logger.debug("确保ChatBot所有任务已启动") + # 启动消息管理器 + if not self._message_manager_started: + await message_manager.start() + self._message_manager_started = True + logger.info("消息管理器已启动") + self._started = True async def _process_plus_commands(self, message: MessageRecv): @@ -398,7 +407,8 @@ class ChatBot: # print(message_data) # logger.debug(str(message_data)) message = MessageRecv(message_data) - + + message.is_mentioned, _ = is_mentioned_bot_in_message(message) group_info = message.message_info.group_info user_info = message.message_info.user_info if message.message_info.additional_config: @@ -464,13 +474,45 @@ class ChatBot: template_group_name = None async def preprocess(): - # 使用亲和力流系统处理消息 - message_data = { - "message_info": message.message_info.__dict__, - "processed_plain_text": message.processed_plain_text, - "chat_stream": message.chat_stream.__dict__ if message.chat_stream else None - } - await afc_manager.process_message(message.chat_stream.stream_id, message_data) + # 使用消息管理器处理消息 + from src.common.data_models.database_data_model import DatabaseMessages + + # 创建数据库消息对象 + db_message = DatabaseMessages( + message_id=message.message_info.message_id, + time=message.message_info.time, + chat_id=message.chat_stream.stream_id, + processed_plain_text=message.processed_plain_text, + display_message=message.processed_plain_text, + is_mentioned=message.is_mentioned, + is_at=message.is_at, + is_emoji=message.is_emoji, + is_picid=message.is_picid, + is_command=message.is_command, + is_notify=message.is_notify, + user_id=message.message_info.user_info.user_id, + user_nickname=message.message_info.user_info.user_nickname, + user_cardname=message.message_info.user_info.user_cardname, + user_platform=message.message_info.user_info.platform, + chat_info_stream_id=message.chat_stream.stream_id, + chat_info_platform=message.chat_stream.platform, + chat_info_create_time=message.chat_stream.create_time, + chat_info_last_active_time=message.chat_stream.last_active_time, + chat_info_user_id=message.chat_stream.user_info.user_id, + chat_info_user_nickname=message.chat_stream.user_info.user_nickname, + chat_info_user_cardname=message.chat_stream.user_info.user_cardname, + chat_info_user_platform=message.chat_stream.user_info.platform + ) + + # 如果是群聊,添加群组信息 + if message.chat_stream.group_info: + db_message.chat_info_group_id = message.chat_stream.group_info.group_id + db_message.chat_info_group_name = message.chat_stream.group_info.group_name + db_message.chat_info_group_platform = message.chat_stream.group_info.platform + + # 添加消息到消息管理器 + message_manager.add_message(message.chat_stream.stream_id, db_message) + logger.debug(f"消息已添加到消息管理器: {message.chat_stream.stream_id}") if template_group_name: async with global_prompt_manager.async_message_scope(template_group_name): diff --git a/src/chat/message_receive/message.py b/src/chat/message_receive/message.py index 1df006a1c..5ed08e096 100644 --- a/src/chat/message_receive/message.py +++ b/src/chat/message_receive/message.py @@ -114,7 +114,7 @@ class MessageRecv(Message): self.is_video = False self.is_mentioned = None self.is_notify = False - + self.is_at = False self.is_command = False self.priority_mode = "interest" diff --git a/src/chat/planner_actions/planner.py b/src/chat/planner_actions/planner.py index 3e973b398..8bdd21464 100644 --- a/src/chat/planner_actions/planner.py +++ b/src/chat/planner_actions/planner.py @@ -2,7 +2,6 @@ 主规划器入口,负责协调 PlanGenerator, PlanFilter, 和 PlanExecutor。 集成兴趣度评分系统和用户关系追踪机制,实现智能化的聊天决策。 """ -import time from dataclasses import asdict from typing import Dict, List, Optional, Tuple @@ -65,13 +64,13 @@ class ActionPlanner: "other_actions_executed": 0, } - async def plan(self, mode: ChatMode = ChatMode.FOCUS) -> Tuple[List[Dict], Optional[Dict]]: + async def plan(self, mode: ChatMode = ChatMode.FOCUS, unread_messages: List[Dict] = None) -> Tuple[List[Dict], Optional[Dict]]: """ 执行完整的增强版规划流程。 Args: mode (ChatMode): 当前的聊天模式,默认为 FOCUS。 - use_enhanced (bool): 是否使用增强功能,默认为 True。 + unread_messages (List[Dict]): 未读消息列表,用于兴趣度计算。 Returns: Tuple[List[Dict], Optional[Dict]]: 一个元组,包含: @@ -81,7 +80,7 @@ class ActionPlanner: try: self.planner_stats["total_plans"] += 1 - return await self._enhanced_plan_flow(mode) + return await self._enhanced_plan_flow(mode, unread_messages or []) except Exception as e: @@ -89,17 +88,17 @@ class ActionPlanner: self.planner_stats["failed_plans"] += 1 return [], None - async def _enhanced_plan_flow(self, mode: ChatMode) -> Tuple[List[Dict], Optional[Dict]]: + async def _enhanced_plan_flow(self, mode: ChatMode, unread_messages: List[Dict]) -> Tuple[List[Dict], Optional[Dict]]: """执行增强版规划流程""" try: # 1. 生成初始 Plan initial_plan = await self.generator.generate(mode) - # 2. 兴趣度评分 - if initial_plan.chat_history: + # 2. 兴趣度评分 - 只对未读消息进行评分 + if unread_messages: bot_nickname = global_config.bot.nickname interest_scores = self.interest_scoring.calculate_interest_scores( - initial_plan.chat_history, bot_nickname + unread_messages, bot_nickname ) # 3. 根据兴趣度调整可用动作 @@ -132,18 +131,12 @@ class ActionPlanner: logger.error(f"增强版规划流程出错: {e}") self.planner_stats["failed_plans"] += 1 return [], None - - except Exception as e: - logger.error(f"增强版规划流程出错: {e}") - self.planner_stats["failed_plans"] += 1 - return [], None def _update_stats_from_execution_result(self, execution_result: Dict[str, any]): """根据执行结果更新规划器统计""" if not execution_result: return - - executed_count = execution_result.get("executed_count", 0) + successful_count = execution_result.get("successful_count", 0) # 更新成功执行计数 diff --git a/src/chat/planner_actions/planner_prompts.py b/src/chat/planner_actions/planner_prompts.py index 9f052a787..29ef4b916 100644 --- a/src/chat/planner_actions/planner_prompts.py +++ b/src/chat/planner_actions/planner_prompts.py @@ -31,15 +31,17 @@ def init_prompts(): **任务: 构建一个完整的响应** 你的任务是根据当前的聊天内容,构建一个完整的、人性化的响应。一个完整的响应由两部分组成: -1. **主要动作**: 这是响应的核心,通常是 `reply`(文本回复)。 +1. **主要动作**: 这是响应的核心,通常是 `reply`(如果有)。 2. **辅助动作 (可选)**: 这是为了增强表达效果的附加动作,例如 `emoji`(发送表情包)或 `poke_user`(戳一戳)。 **决策流程:** -1. 首先,决定是否要进行 `reply`。 +1. 首先,决定是否要进行 `reply`(如果有)。 2. 然后,评估当前的对话气氛和用户情绪,判断是否需要一个**辅助动作**来让你的回应更生动、更符合你的性格。 -3. 如果需要,选择一个最合适的辅助动作与 `reply` 组合。 +3. 如果需要,选择一个最合适的辅助动作与 `reply`(如果有) 组合。 4. 如果用户明确要求了某个动作,请务必优先满足。 +**如果可选动作中没有reply,请不要使用** + **可用动作:** {actions_before_now_block} diff --git a/src/common/data_models/database_data_model.py b/src/common/data_models/database_data_model.py index bf4a5f527..7167c64cb 100644 --- a/src/common/data_models/database_data_model.py +++ b/src/common/data_models/database_data_model.py @@ -79,6 +79,7 @@ class DatabaseMessages(BaseDataModel): is_command: bool = False, is_notify: bool = False, selected_expressions: Optional[str] = None, + is_read: bool = False, user_id: str = "", user_nickname: str = "", user_cardname: Optional[str] = None, @@ -122,6 +123,7 @@ class DatabaseMessages(BaseDataModel): self.is_notify = is_notify self.selected_expressions = selected_expressions + self.is_read = is_read self.group_info: Optional[DatabaseGroupInfo] = None self.user_info = DatabaseUserInfo( @@ -188,6 +190,7 @@ class DatabaseMessages(BaseDataModel): "is_command": self.is_command, "is_notify": self.is_notify, "selected_expressions": self.selected_expressions, + "is_read": self.is_read, "user_id": self.user_info.user_id, "user_nickname": self.user_info.user_nickname, "user_cardname": self.user_info.user_cardname, diff --git a/src/common/data_models/info_data_model.py b/src/common/data_models/info_data_model.py index c3eb3ec31..0e3cfd35d 100644 --- a/src/common/data_models/info_data_model.py +++ b/src/common/data_models/info_data_model.py @@ -33,7 +33,6 @@ class InterestScore(BaseDataModel): interest_match_score: float relationship_score: float mentioned_score: float - time_factor_score: float details: Dict[str, str] diff --git a/src/common/data_models/message_manager_data_model.py b/src/common/data_models/message_manager_data_model.py new file mode 100644 index 000000000..82919d3b6 --- /dev/null +++ b/src/common/data_models/message_manager_data_model.py @@ -0,0 +1,82 @@ +""" +消息管理模块数据模型 +定义消息管理器使用的数据结构 +""" +import asyncio +import time +from dataclasses import dataclass, field +from enum import Enum +from typing import List, Optional, TYPE_CHECKING + +from . import BaseDataModel + +if TYPE_CHECKING: + from .database_data_model import DatabaseMessages + + +class MessageStatus(Enum): + """消息状态枚举""" + UNREAD = "unread" # 未读消息 + READ = "read" # 已读消息 + PROCESSING = "processing" # 处理中 + + +@dataclass +class StreamContext(BaseDataModel): + """聊天流上下文信息""" + stream_id: str + unread_messages: List["DatabaseMessages"] = field(default_factory=list) + history_messages: List["DatabaseMessages"] = field(default_factory=list) + last_check_time: float = field(default_factory=time.time) + is_active: bool = True + processing_task: Optional[asyncio.Task] = None + + def add_message(self, message: "DatabaseMessages"): + """添加消息到上下文""" + message.is_read = False + self.unread_messages.append(message) + + def get_unread_messages(self) -> List["DatabaseMessages"]: + """获取未读消息""" + return [msg for msg in self.unread_messages if not msg.is_read] + + def mark_message_as_read(self, message_id: str): + """标记消息为已读""" + for msg in self.unread_messages: + if msg.message_id == message_id: + msg.is_read = True + self.history_messages.append(msg) + self.unread_messages.remove(msg) + break + + def get_context_messages(self, limit: int = 20) -> List["DatabaseMessages"]: + """获取上下文消息(历史消息+未读消息)""" + # 优先返回最近的历史消息和所有未读消息 + recent_history = self.history_messages[-limit:] if len(self.history_messages) > limit else self.history_messages + return recent_history + self.unread_messages + + +@dataclass +class MessageManagerStats(BaseDataModel): + """消息管理器统计信息""" + total_streams: int = 0 + active_streams: int = 0 + total_unread_messages: int = 0 + total_processed_messages: int = 0 + start_time: float = field(default_factory=time.time) + + @property + def uptime(self) -> float: + """运行时间""" + return time.time() - self.start_time + + +@dataclass +class StreamStats(BaseDataModel): + """聊天流统计信息""" + stream_id: str + is_active: bool + unread_count: int + history_count: int + last_check_time: float + has_active_task: bool \ No newline at end of file diff --git a/src/main.py b/src/main.py index 103884c10..145cbbffe 100644 --- a/src/main.py +++ b/src/main.py @@ -113,11 +113,24 @@ class MainSystem: def _cleanup(self): """清理资源""" + try: + # 停止消息管理器 + from src.chat.message_manager import message_manager + import asyncio + + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.create_task(message_manager.stop()) + else: + loop.run_until_complete(message_manager.stop()) + logger.info("🛑 消息管理器已停止") + except Exception as e: + logger.error(f"停止消息管理器时出错: {e}") + try: # 停止消息重组器 from src.plugin_system.core.event_manager import event_manager from src.plugin_system import EventType - import asyncio asyncio.run(event_manager.trigger_event(EventType.ON_STOP,permission_group="SYSTEM")) from src.utils.message_chunker import reassembler @@ -276,6 +289,11 @@ MoFox_Bot(第三方修改版) await reassembler.start_cleanup_task() logger.info("消息重组器已启动") + # 启动消息管理器 + from src.chat.message_manager import message_manager + await message_manager.start() + logger.info("消息管理器已启动") + # 初始化个体特征 await self.individuality.initialize()