From d4824e35ad94dc92458e114697bc901471f2b0c4 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Sat, 8 Nov 2025 10:46:44 +0800 Subject: [PATCH] =?UTF-8?q?feat(message-manager):=20=E7=94=A8=E6=B5=81?= =?UTF-8?q?=E5=BE=AA=E7=8E=AF=E7=AE=A1=E7=90=86=E5=99=A8=E6=9B=BF=E6=8D=A2?= =?UTF-8?q?=E8=B0=83=E5=BA=A6=E5=99=A8/=E5=88=86=E6=B4=BE=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 移除 scheduler_dispatcher 模块,并用 distribution_manager 替换 - 实现StreamLoopManager,以改进消息分发和中断处理 - 将消息缓存系统直接添加到StreamContext中,并配置缓存设置 - 使用具有缓存感知的消息处理来增强SingleStreamContextManager - 更新`message_manager`,使用`stream_loop_manager`替代`scheduler_dispatcher` - 在StreamContext数据模型中添加缓存统计和刷新方法 - 通过适当的任务取消和重新处理来改进中断处理 - 为ChatManager添加get_all_stream方法,以实现更优的流管理 - 更新亲和聊天规划器,以更可靠地处理专注/正常模式切换 --- src/chat/message_manager/__init__.py | 10 +- src/chat/message_manager/context_manager.py | 124 ++-- .../message_manager/distribution_manager.py | 682 ++++++++++++++++++ src/chat/message_manager/message_manager.py | 325 +++++---- .../message_manager/scheduler_dispatcher.py | 628 ---------------- src/chat/message_receive/chat_stream.py | 8 + .../data_models/message_manager_data_model.py | 169 ++++- .../core/affinity_chatter.py | 2 +- .../affinity_flow_chatter/planner/planner.py | 138 ++-- 9 files changed, 1178 insertions(+), 908 deletions(-) create mode 100644 src/chat/message_manager/distribution_manager.py delete mode 100644 src/chat/message_manager/scheduler_dispatcher.py diff --git a/src/chat/message_manager/__init__.py b/src/chat/message_manager/__init__.py index 92453eafd..175320774 100644 --- a/src/chat/message_manager/__init__.py +++ b/src/chat/message_manager/__init__.py @@ -1,16 +1,16 @@ """ 消息管理器模块 -提供统一的消息管理、上下文管理和基于 scheduler 的消息分发功能 +提供统一的消息管理、上下文管理和流循环调度功能 """ from .context_manager import SingleStreamContextManager +from .distribution_manager import StreamLoopManager, stream_loop_manager from .message_manager import MessageManager, message_manager -from .scheduler_dispatcher import SchedulerDispatcher, scheduler_dispatcher __all__ = [ "MessageManager", - "SchedulerDispatcher", "SingleStreamContextManager", + "StreamLoopManager", "message_manager", - "scheduler_dispatcher", -] + "stream_loop_manager", +] \ No newline at end of file diff --git a/src/chat/message_manager/context_manager.py b/src/chat/message_manager/context_manager.py index ce5aeb876..c9ad3a674 100644 --- a/src/chat/message_manager/context_manager.py +++ b/src/chat/message_manager/context_manager.py @@ -63,78 +63,39 @@ class SingleStreamContextManager: bool: 是否成功添加 """ try: - # 尝试使用MessageManager的内置缓存系统 - use_cache_system = False - message_manager = None - try: - from .message_manager import message_manager as mm - message_manager = mm - # 检查配置是否启用消息缓存系统 - cache_enabled = global_config.chat.enable_message_cache - use_cache_system = message_manager.is_running and cache_enabled - if not cache_enabled: - logger.debug("消息缓存系统已在配置中禁用") - except Exception as e: - logger.debug(f"MessageManager不可用,使用直接添加: {e}") - use_cache_system = False + # 检查并配置StreamContext的缓存系统 + cache_enabled = global_config.chat.enable_message_cache + if cache_enabled and not self.context.is_cache_enabled: + self.context.enable_cache(True) + logger.debug(f"为StreamContext {self.stream_id} 启用缓存系统") - if use_cache_system and message_manager: - # 使用缓存系统 - try: - # 先计算兴趣值(需要在缓存前计算) - await self._calculate_message_interest(message) - message.is_read = False + # 先计算兴趣值(需要在缓存前计算) + await self._calculate_message_interest(message) + message.is_read = False - # 添加到缓存而不是直接添加到未读消息 - cache_success = message_manager.add_message_to_cache(self.stream_id, message) - - if cache_success: - # 自动检测和更新chat type - self._detect_chat_type(message) - - self.total_messages += 1 - self.last_access_time = time.time() - - # 检查当前是否正在处理消息 - is_processing = message_manager.get_stream_processing_status(self.stream_id) - - if not is_processing: - # 如果当前没有在处理,立即刷新缓存到未读消息 - cached_messages = message_manager.flush_cached_messages(self.stream_id) - for cached_msg in cached_messages: - self.context.unread_messages.append(cached_msg) - logger.debug(f"立即刷新缓存到未读消息: stream={self.stream_id}, 数量={len(cached_messages)}") - else: - logger.debug(f"消息已缓存,等待当前处理完成: stream={self.stream_id}") - - logger.debug(f"添加消息到缓存系统: {self.stream_id}") - return True - else: - logger.warning(f"消息缓存系统添加失败,回退到直接添加: {self.stream_id}") - use_cache_system = False - except Exception as e: - logger.warning(f"消息缓存系统异常,回退到直接添加: {self.stream_id}, error={e}") - use_cache_system = False - - # 回退方案:直接添加到未读消息 - # 这部分代码在缓存系统失败或不可用时执行 - if not use_cache_system: - message.is_read = False - self.context.unread_messages.append(message) + # 使用StreamContext的智能缓存功能 + success = self.context.add_message_with_cache_check(message, force_direct=not cache_enabled) + if success: # 自动检测和更新chat type self._detect_chat_type(message) - # 在上下文管理器中计算兴趣值 - await self._calculate_message_interest(message) self.total_messages += 1 self.last_access_time = time.time() - logger.debug(f"添加消息{message.processed_plain_text}到单流上下文: {self.stream_id}") - return True + # 如果使用了缓存系统,输出调试信息 + if cache_enabled and self.context.is_cache_enabled: + if self.context.is_chatter_processing: + logger.debug(f"消息已缓存到StreamContext,等待处理完成: stream={self.stream_id}") + else: + logger.debug(f"消息直接添加到StreamContext未读列表: stream={self.stream_id}") + else: + logger.debug(f"消息添加到StreamContext(缓存禁用): {self.stream_id}") - # 不应该到达这里,但为了类型检查添加返回值 - return True + return True + else: + logger.error(f"StreamContext消息添加失败: {self.stream_id}") + return False except Exception as e: logger.error(f"添加消息到单流上下文失败 {self.stream_id}: {e}", exc_info=True) @@ -272,7 +233,7 @@ class SingleStreamContextManager: unread_messages = getattr(self.context, "unread_messages", []) history_messages = getattr(self.context, "history_messages", []) - return { + stats = { "stream_id": self.stream_id, "context_type": type(self.context).__name__, "total_messages": len(history_messages) + len(unread_messages), @@ -288,10 +249,47 @@ class SingleStreamContextManager: "uptime_seconds": uptime, "idle_seconds": current_time - self.last_access_time, } + + # 添加缓存统计信息 + if hasattr(self.context, "get_cache_stats"): + stats["cache_stats"] = self.context.get_cache_stats() + + return stats except Exception as e: logger.error(f"获取单流统计失败 {self.stream_id}: {e}", exc_info=True) return {} + def flush_cached_messages(self) -> list[DatabaseMessages]: + """ + 刷新StreamContext中的缓存消息到未读列表 + + Returns: + list[DatabaseMessages]: 刷新的消息列表 + """ + try: + if hasattr(self.context, "flush_cached_messages"): + cached_messages = self.context.flush_cached_messages() + if cached_messages: + logger.debug(f"从StreamContext刷新缓存消息: stream={self.stream_id}, 数量={len(cached_messages)}") + return cached_messages + else: + logger.debug(f"StreamContext不支持缓存刷新: stream={self.stream_id}") + return [] + except Exception as e: + logger.error(f"刷新StreamContext缓存失败: stream={self.stream_id}, error={e}") + return [] + + def get_cache_stats(self) -> dict[str, Any]: + """获取StreamContext的缓存统计信息""" + try: + if hasattr(self.context, "get_cache_stats"): + return self.context.get_cache_stats() + else: + return {"error": "StreamContext不支持缓存统计"} + except Exception as e: + logger.error(f"获取StreamContext缓存统计失败: stream={self.stream_id}, error={e}") + return {"error": str(e)} + def validate_integrity(self) -> bool: """验证上下文完整性""" try: diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py new file mode 100644 index 000000000..6a09326ea --- /dev/null +++ b/src/chat/message_manager/distribution_manager.py @@ -0,0 +1,682 @@ +""" +流循环管理器 +为每个聊天流创建独立的无限循环任务,主动轮询处理消息 +""" + +import asyncio +import time +from typing import Any + +from src.chat.chatter_manager import ChatterManager +from src.chat.energy_system import energy_manager +from src.common.data_models.message_manager_data_model import StreamContext +from src.common.logger import get_logger +from src.config.config import global_config +from src.plugin_system.apis.chat_api import get_chat_manager + +logger = get_logger("stream_loop_manager") + + +class StreamLoopManager: + """流循环管理器 - 每个流一个独立的无限循环任务""" + + def __init__(self, max_concurrent_streams: int | None = None): + # 统计信息 + self.stats: dict[str, Any] = { + "active_streams": 0, + "total_loops": 0, + "total_process_cycles": 0, + "total_failures": 0, + "start_time": time.time(), + } + + # 配置参数 + self.max_concurrent_streams = max_concurrent_streams or global_config.chat.max_concurrent_distributions + + # 强制分发策略 + self.force_dispatch_unread_threshold: int | None = getattr( + global_config.chat, "force_dispatch_unread_threshold", 20 + ) + self.force_dispatch_min_interval: float = getattr(global_config.chat, "force_dispatch_min_interval", 0.1) + + # Chatter管理器 + self.chatter_manager: ChatterManager | None = None + + # 状态控制 + self.is_running = False + + # 每个流的上一次间隔值(用于日志去重) + self._last_intervals: dict[str, float] = {} + + logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})") + + async def start(self) -> None: + """启动流循环管理器""" + if self.is_running: + logger.warning("流循环管理器已经在运行") + return + + self.is_running = True + + async def stop(self) -> None: + """停止流循环管理器""" + if not self.is_running: + return + + self.is_running = False + + # 取消所有流循环 + try: + # 获取所有活跃的流 + from src.plugin_system.apis.chat_api import get_chat_manager + + chat_manager = get_chat_manager() + all_streams = chat_manager.get_all_streams() + + # 创建任务列表以便并发取消 + cancel_tasks = [] + for chat_stream in all_streams.values(): + context = chat_stream.context_manager.context + if context.stream_loop_task and not context.stream_loop_task.done(): + context.stream_loop_task.cancel() + cancel_tasks.append((chat_stream.stream_id, context.stream_loop_task)) + + # 并发等待所有任务取消 + if cancel_tasks: + logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...") + await asyncio.gather( + *[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks], + return_exceptions=True, + ) + + logger.info("所有流循环已清理") + except Exception as e: + logger.error(f"停止管理器时出错: {e}") + + logger.info("流循环管理器已停止") + + async def start_stream_loop(self, stream_id: str, force: bool = False) -> bool: + """启动指定流的循环任务 - 优化版本使用自适应管理器 + + Args: + stream_id: 流ID + force: 是否强制启动 + + Returns: + bool: 是否成功启动 + """ + # 获取流上下文 + context = await self._get_stream_context(stream_id) + if not context: + logger.warning(f"无法获取流上下文: {stream_id}") + return False + + # 快速路径:如果流已存在且不是强制启动,无需处理 + if not force and context.stream_loop_task and not context.stream_loop_task.done(): + logger.debug(f"流 {stream_id} 循环已在运行") + return True + + # 如果是强制启动且任务仍在运行,先取消旧任务 + if force and context.stream_loop_task and not context.stream_loop_task.done(): + logger.debug(f"强制启动模式:先取消现有流循环任务: {stream_id}") + old_task = context.stream_loop_task + old_task.cancel() + try: + await asyncio.wait_for(old_task, timeout=2.0) + logger.debug(f"旧流循环任务已结束: {stream_id}") + except (asyncio.TimeoutError, asyncio.CancelledError): + logger.debug(f"旧流循环任务已取消或超时: {stream_id}") + except Exception as e: + logger.warning(f"等待旧任务结束时出错: {e}") + + # 创建流循环任务 + try: + loop_task = asyncio.create_task(self._stream_loop_worker(stream_id), name=f"stream_loop_{stream_id}") + + # 将任务记录到 StreamContext 中 + context.stream_loop_task = loop_task + + # 更新统计信息 + self.stats["active_streams"] += 1 + self.stats["total_loops"] += 1 + + logger.debug(f"启动流循环任务: {stream_id}") + return True + + except Exception as e: + logger.error(f"启动流循环任务失败 {stream_id}: {e}") + return False + + async def stop_stream_loop(self, stream_id: str) -> bool: + """停止指定流的循环任务 + + Args: + stream_id: 流ID + + Returns: + bool: 是否成功停止 + """ + # 获取流上下文 + context = await self._get_stream_context(stream_id) + if not context: + logger.debug(f"流 {stream_id} 上下文不存在,无需停止") + return False + + # 检查是否有 stream_loop_task + if not context.stream_loop_task or context.stream_loop_task.done(): + logger.debug(f"流 {stream_id} 循环不存在或已结束,无需停止") + return False + + task = context.stream_loop_task + if not task.done(): + task.cancel() + try: + # 设置取消超时,避免无限等待 + await asyncio.wait_for(task, timeout=5.0) + except asyncio.CancelledError: + logger.debug(f"流循环任务已取消: {stream_id}") + except asyncio.TimeoutError: + logger.warning(f"流循环任务取消超时: {stream_id}") + except Exception as e: + logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") + + # 清空 StreamContext 中的任务记录 + context.stream_loop_task = None + + logger.debug(f"停止流循环: {stream_id}") + return True + + async def _stream_loop_worker(self, stream_id: str) -> None: + """单个流的工作循环 - 优化版本 + + Args: + stream_id: 流ID + """ + logger.debug(f"流循环工作器启动: {stream_id}") + + try: + while self.is_running: + try: + # 1. 获取流上下文 + context = await self._get_stream_context(stream_id) + if not context: + logger.warning(f"无法获取流上下文: {stream_id}") + await asyncio.sleep(10.0) + continue + + # 2. 检查是否有消息需要处理 + chat_stream = await get_chat_manager().get_stream(stream_id) + if chat_stream: + chat_stream.context_manager.flush_cached_messages() + unread_count = self._get_unread_count(context) + force_dispatch = self._needs_force_dispatch_for_context(context, unread_count) + + has_messages = force_dispatch or await self._has_messages_to_process(context) + + if has_messages: + if force_dispatch: + logger.info("流 %s 未读消息 %d 条,触发强制分发", stream_id, unread_count) + + # 3. 在处理前更新能量值(用于下次间隔计算) + try: + await self._update_stream_energy(stream_id, context) + except Exception as e: + logger.debug(f"更新流能量失败 {stream_id}: {e}") + + # 4. 激活chatter处理 + success = await self._process_stream_messages(stream_id, context) + + # 更新统计 + self.stats["total_process_cycles"] += 1 + if success: + logger.debug(f"流处理成功: {stream_id}") + else: + self.stats["total_failures"] += 1 + logger.warning(f"流处理失败: {stream_id}") + + # 5. 计算下次检查间隔 + interval = await self._calculate_interval(stream_id, has_messages) + + # 6. sleep等待下次检查 + # 只在间隔发生变化时输出日志,避免刷屏 + last_interval = self._last_intervals.get(stream_id) + if last_interval is None or abs(interval - last_interval) > 0.01: + logger.info(f"流 {stream_id} 等待周期变化: {interval:.2f}s") + self._last_intervals[stream_id] = interval + await asyncio.sleep(interval) + + except asyncio.CancelledError: + logger.debug(f"流循环被取消: {stream_id}") + break + except Exception as e: + logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True) + self.stats["total_failures"] += 1 + await asyncio.sleep(5.0) # 错误时等待5秒再重试 + + finally: + # 清理 StreamContext 中的任务记录 + try: + context = await self._get_stream_context(stream_id) + if context and context.stream_loop_task: + context.stream_loop_task = None + logger.debug(f"清理 StreamContext 中的流循环任务: {stream_id}") + except Exception as e: + logger.debug(f"清理 StreamContext 任务记录失败: {e}") + + # 清理间隔记录 + self._last_intervals.pop(stream_id, None) + + logger.debug(f"流循环结束: {stream_id}") + + async def _get_stream_context(self, stream_id: str) -> StreamContext | None: + """获取流上下文 + + Args: + stream_id: 流ID + + Returns: + Optional[StreamContext]: 流上下文,如果不存在返回None + """ + try: + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(stream_id) + if chat_stream: + return chat_stream.context_manager.context + return None + except Exception as e: + logger.error(f"获取流上下文失败 {stream_id}: {e}") + return None + + async def _has_messages_to_process(self, context: StreamContext) -> bool: + """检查是否有消息需要处理 + + Args: + context: 流上下文 + + Returns: + bool: 是否有未读消息 + """ + try: + # 检查是否有未读消息 + if hasattr(context, "unread_messages") and context.unread_messages: + return True + + return False + except Exception as e: + logger.error(f"检查消息状态失败: {e}") + return False + + async def _process_stream_messages(self, stream_id: str, context: StreamContext) -> bool: + """处理流消息 - 支持子任务管理 + + Args: + stream_id: 流ID + context: 流上下文 + + Returns: + bool: 是否处理成功 + """ + if not self.chatter_manager: + logger.warning(f"Chatter管理器未设置: {stream_id}") + return False + + # 设置处理状态为正在处理 + self._set_stream_processing_status(stream_id, True) + + # 子任务跟踪 + child_tasks = set() + + try: + start_time = time.time() + + # 注意:缓存消息刷新已移至planner开始时执行(动作修改器之后),此处不再刷新 + + # 设置触发用户ID,以实现回复保护 + last_message = context.get_last_message() + if last_message: + context.triggering_user_id = last_message.user_info.user_id + + # 创建子任务用于刷新能量(不阻塞主流程) + energy_task = asyncio.create_task(self._refresh_focus_energy(stream_id)) + child_tasks.add(energy_task) + energy_task.add_done_callback(lambda t: child_tasks.discard(t)) + + # 设置 Chatter 正在处理的标志 + context.is_chatter_processing = True + logger.debug(f"设置 Chatter 处理标志: {stream_id}") + + # 直接调用chatter_manager处理流上下文 + results = await self.chatter_manager.process_stream_context(stream_id, context) + success = results.get("success", False) + + if success: + # 处理成功后,再次刷新缓存中可能的新消息 + additional_messages = await self._flush_cached_messages_to_unread(stream_id) + if additional_messages: + logger.debug(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}") + + process_time = time.time() - start_time + logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)") + else: + logger.warning(f"流处理失败: {stream_id} - {results.get('error_message', '未知错误')}") + + return success + + except asyncio.CancelledError: + logger.debug(f"流处理被取消: {stream_id}") + # 取消所有子任务 + for child_task in child_tasks: + if not child_task.done(): + child_task.cancel() + raise + except Exception as e: + logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True) + # 异常时也要清理子任务 + for child_task in child_tasks: + if not child_task.done(): + child_task.cancel() + return False + finally: + # 清除 Chatter 处理标志 + context.is_chatter_processing = False + logger.debug(f"清除 Chatter 处理标志: {stream_id}") + + # 无论成功或失败,都要设置处理状态为未处理 + self._set_stream_processing_status(stream_id, False) + + def _set_stream_processing_status(self, stream_id: str, is_processing: bool) -> None: + """设置流的处理状态""" + try: + from .message_manager import message_manager + + if message_manager.is_running: + message_manager.set_stream_processing_status(stream_id, is_processing) + logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}") + + except ImportError: + logger.debug("MessageManager不可用,跳过状态设置") + except Exception as e: + logger.warning(f"设置流处理状态失败: stream={stream_id}, error={e}") + + async def _flush_cached_messages_to_unread(self, stream_id: str) -> list: + """将缓存消息刷新到未读消息列表""" + try: + # 获取流上下文 + context = await self._get_stream_context(stream_id) + if not context: + logger.warning(f"无法获取流上下文: {stream_id}") + return [] + + # 使用StreamContext的缓存刷新功能 + if hasattr(context, "flush_cached_messages"): + cached_messages = context.flush_cached_messages() + if cached_messages: + logger.debug(f"从StreamContext刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}") + return cached_messages + else: + logger.debug(f"StreamContext不支持缓存刷新: stream={stream_id}") + return [] + + except Exception as e: + logger.warning(f"刷新StreamContext缓存失败: stream={stream_id}, error={e}") + return [] + + async def _update_stream_energy(self, stream_id: str, context: Any) -> None: + """更新流的能量值 + + Args: + stream_id: 流ID + context: 流上下文 (StreamContext) + """ + try: + from src.chat.message_receive.chat_stream import get_chat_manager + + # 获取聊天流 + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(stream_id) + + if not chat_stream: + logger.debug(f"无法找到聊天流 {stream_id},跳过能量更新") + return + + # 从 context_manager 获取消息(包括未读和历史消息) + # 合并未读消息和历史消息 + all_messages = [] + + # 添加历史消息 + history_messages = context.get_history_messages(limit=global_config.chat.max_context_size) + all_messages.extend(history_messages) + + # 添加未读消息 + unread_messages = context.get_unread_messages() + all_messages.extend(unread_messages) + + # 按时间排序并限制数量 + all_messages.sort(key=lambda m: m.time) + messages = all_messages[-global_config.chat.max_context_size:] + + # 获取用户ID + user_id = None + if context.triggering_user_id: + user_id = context.triggering_user_id + + # 使用能量管理器计算并缓存能量值 + energy = await energy_manager.calculate_focus_energy( + stream_id=stream_id, + messages=messages, + user_id=user_id + ) + + # 同步更新到 ChatStream + chat_stream._focus_energy = energy + + logger.debug(f"已更新流 {stream_id} 的能量值: {energy:.3f}") + + except Exception as e: + logger.warning(f"更新流能量失败 {stream_id}: {e}", exc_info=False) + + async def _calculate_interval(self, stream_id: str, has_messages: bool) -> float: + """计算下次检查间隔 + + Args: + stream_id: 流ID + has_messages: 本次是否有消息处理 + + Returns: + float: 间隔时间(秒) + """ + # 基础间隔 + base_interval = getattr(global_config.chat, "distribution_interval", 5.0) + + # 如果没有消息,使用更长的间隔 + if not has_messages: + return base_interval * 2.0 # 无消息时间隔加倍 + + # 尝试使用能量管理器计算间隔 + try: + # 获取当前focus_energy + focus_energy = energy_manager.energy_cache.get(stream_id, (0.5, 0))[0] + + # 使用能量管理器计算间隔 + interval = energy_manager.get_distribution_interval(focus_energy) + + logger.debug(f"流 {stream_id} 动态间隔: {interval:.2f}s (能量: {focus_energy:.3f})") + return interval + + except Exception as e: + logger.debug(f"流 {stream_id} 使用默认间隔: {base_interval:.2f}s ({e})") + return base_interval + + def get_queue_status(self) -> dict[str, Any]: + """获取队列状态 + + Returns: + Dict[str, Any]: 队列状态信息 + """ + current_time = time.time() + uptime = current_time - self.stats["start_time"] if self.is_running else 0 + + # 从统计信息中获取活跃流数量 + active_streams = self.stats.get("active_streams", 0) + + return { + "active_streams": active_streams, + "total_loops": self.stats["total_loops"], + "max_concurrent": self.max_concurrent_streams, + "is_running": self.is_running, + "uptime": uptime, + "total_process_cycles": self.stats["total_process_cycles"], + "total_failures": self.stats["total_failures"], + "stats": self.stats.copy(), + } + + def set_chatter_manager(self, chatter_manager: ChatterManager) -> None: + """设置chatter管理器 + + Args: + chatter_manager: chatter管理器实例 + """ + self.chatter_manager = chatter_manager + logger.debug(f"设置chatter管理器: {chatter_manager.__class__.__name__}") + + async def _should_force_dispatch_for_stream(self, stream_id: str) -> bool: + if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0: + return False + + try: + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(stream_id) + if not chat_stream: + return False + + unread = getattr(chat_stream.context_manager.context, "unread_messages", []) + return len(unread) > self.force_dispatch_unread_threshold + except Exception as e: + logger.debug(f"检查流 {stream_id} 是否需要强制分发失败: {e}") + return False + + def _get_unread_count(self, context: StreamContext) -> int: + try: + unread_messages = context.unread_messages + if unread_messages is None: + return 0 + return len(unread_messages) + except Exception: + return 0 + + def _needs_force_dispatch_for_context(self, context: StreamContext, unread_count: int | None = None) -> bool: + if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0: + return False + + count = unread_count if unread_count is not None else self._get_unread_count(context) + return count > self.force_dispatch_unread_threshold + + def get_performance_summary(self) -> dict[str, Any]: + """获取性能摘要 + + Returns: + Dict[str, Any]: 性能摘要 + """ + current_time = time.time() + uptime = current_time - self.stats["start_time"] + + # 计算吞吐量 + throughput = self.stats["total_process_cycles"] / max(1, uptime / 3600) # 每小时处理次数 + + # 从统计信息中获取活跃流数量 + active_streams = self.stats.get("active_streams", 0) + + return { + "uptime_hours": uptime / 3600, + "active_streams": active_streams, + "total_process_cycles": self.stats["total_process_cycles"], + "total_failures": self.stats["total_failures"], + "throughput_per_hour": throughput, + "max_concurrent_streams": self.max_concurrent_streams, + } + + async def _refresh_focus_energy(self, stream_id: str) -> None: + """分发完成后基于历史消息刷新能量值""" + try: + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(stream_id) + if not chat_stream: + logger.debug(f"刷新能量时未找到聊天流: {stream_id}") + return + + await chat_stream.context_manager.refresh_focus_energy_from_history() + logger.debug(f"已刷新聊天流 {stream_id} 的聚焦能量") + except Exception as e: + logger.warning(f"刷新聊天流 {stream_id} 能量失败: {e}") + + async def _wait_for_task_cancel(self, stream_id: str, task: asyncio.Task) -> None: + """等待任务取消完成,带有超时控制 + + Args: + stream_id: 流ID + task: 要等待取消的任务 + """ + try: + await asyncio.wait_for(task, timeout=5.0) + logger.debug(f"流循环任务已正常结束: {stream_id}") + except asyncio.CancelledError: + logger.debug(f"流循环任务已取消: {stream_id}") + except asyncio.TimeoutError: + logger.warning(f"流循环任务取消超时: {stream_id}") + except Exception as e: + logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") + + async def _force_dispatch_stream(self, stream_id: str) -> None: + """强制分发流处理 + + 当流的未读消息超过阈值时,强制触发分发处理 + 这个方法主要用于突破并发限制时的紧急处理 + + 注意:此方法目前未被使用,相关功能已集成到 start_stream_loop 方法中 + + Args: + stream_id: 流ID + """ + logger.debug(f"强制分发流处理: {stream_id}") + + try: + # 获取流上下文 + context = await self._get_stream_context(stream_id) + if not context: + logger.warning(f"强制分发时未找到流上下文: {stream_id}") + return + + # 检查是否有现有的 stream_loop_task + if context.stream_loop_task and not context.stream_loop_task.done(): + logger.debug(f"发现现有流循环 {stream_id},将先取消再重新创建") + existing_task = context.stream_loop_task + existing_task.cancel() + # 创建异步任务来等待取消完成,并添加异常处理 + cancel_task = asyncio.create_task( + self._wait_for_task_cancel(stream_id, existing_task), name=f"cancel_existing_loop_{stream_id}" + ) + # 为取消任务添加异常处理,避免孤儿任务 + cancel_task.add_done_callback( + lambda task: logger.debug(f"取消任务完成: {stream_id}") + if not task.exception() + else logger.error(f"取消任务异常: {stream_id} - {task.exception()}") + ) + + # 检查未读消息数量 + unread_count = self._get_unread_count(context) + logger.info(f"流 {stream_id} 当前未读消息数: {unread_count}") + + # 使用 start_stream_loop 重新创建流循环任务 + success = await self.start_stream_loop(stream_id, force=True) + + if success: + logger.info(f"已创建强制分发流循环: {stream_id}") + else: + logger.warning(f"创建强制分发流循环失败: {stream_id}") + + except Exception as e: + logger.error(f"强制分发流处理失败 {stream_id}: {e}", exc_info=True) + + +# 全局流循环管理器实例 +stream_loop_manager = StreamLoopManager() \ No newline at end of file diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index df6186fa2..6b1b15659 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -4,11 +4,12 @@ """ import asyncio +import random import time -from collections import defaultdict, deque from typing import TYPE_CHECKING, Any from src.chat.chatter_manager import ChatterManager +from src.chat.message_receive.chat_stream import ChatStream from src.chat.planner_actions.action_manager import ChatterActionManager from src.common.data_models.database_data_model import DatabaseMessages from src.common.data_models.message_manager_data_model import MessageManagerStats, StreamStats @@ -16,8 +17,8 @@ from src.common.logger import get_logger from src.config.config import global_config from src.plugin_system.apis.chat_api import get_chat_manager +from .distribution_manager import stream_loop_manager from .global_notice_manager import NoticeScope, global_notice_manager -from .scheduler_dispatcher import scheduler_dispatcher if TYPE_CHECKING: pass @@ -40,14 +41,6 @@ class MessageManager: self.action_manager = ChatterActionManager() self.chatter_manager = ChatterManager(self.action_manager) - # 消息缓存系统 - 直接集成到消息管理器 - self.message_caches: dict[str, deque] = defaultdict(deque) # 每个流的消息缓存 - self.stream_processing_status: dict[str, bool] = defaultdict(bool) # 流的处理状态 - self.cache_stats = { - "total_cached_messages": 0, - "total_flushed_messages": 0, - } - # 不再需要全局上下文管理器,直接通过 ChatManager 访问各个 ChatStream 的 context_manager # 全局Notice管理器 @@ -69,19 +62,11 @@ class MessageManager: except Exception as e: logger.error(f"启动批量数据库写入器失败: {e}") - # 启动消息缓存系统(内置) - logger.debug("消息缓存系统已启动") + # 启动流循环管理器并设置chatter_manager + await stream_loop_manager.start() + stream_loop_manager.set_chatter_manager(self.chatter_manager) - # 启动基于 scheduler 的消息分发器 - await scheduler_dispatcher.start() - scheduler_dispatcher.set_chatter_manager(self.chatter_manager) - - # 保留旧的流循环管理器(暂时)以便平滑过渡 - # TODO: 在确认新机制稳定后移除 - # await stream_loop_manager.start() - # stream_loop_manager.set_chatter_manager(self.chatter_manager) - - logger.info("消息管理器已启动(使用 Scheduler 分发器)") + logger.info("消息管理器已启动") async def stop(self): """停止消息管理器""" @@ -99,34 +84,15 @@ class MessageManager: except Exception as e: logger.error(f"停止批量数据库写入器失败: {e}") - # 停止消息缓存系统(内置) - self.message_caches.clear() - self.stream_processing_status.clear() - logger.debug("消息缓存系统已停止") - - # 停止基于 scheduler 的消息分发器 - await scheduler_dispatcher.stop() - - # 停止旧的流循环管理器(如果启用) - # await stream_loop_manager.stop() + # 停止流循环管理器 + await stream_loop_manager.stop() logger.info("消息管理器已停止") async def add_message(self, stream_id: str, message: DatabaseMessages): - """添加消息到指定聊天流 - - 新的流程: - 1. 检查 notice 消息 - 2. 将消息添加到上下文(缓存) - 3. 通知 scheduler_dispatcher 处理(检查打断、创建/更新 schedule) - """ + """添加消息到指定聊天流""" try: - # 硬编码过滤表情包消息 - if message.processed_plain_text and message.processed_plain_text.startswith("[表情包"): - logger.info(f"检测到表情包消息,已过滤: {message.processed_plain_text}") - return - # 检查是否为notice消息 if self._is_notice_message(message): # Notice消息处理 - 添加到全局管理器 @@ -147,14 +113,11 @@ class MessageManager: if not chat_stream: logger.warning(f"MessageManager.add_message: 聊天流 {stream_id} 不存在") return - - # 将消息添加到上下文 + # 启动steam loop任务(如果尚未启动) + await stream_loop_manager.start_stream_loop(stream_id) + await self._check_and_handle_interruption(chat_stream, message) await chat_stream.context_manager.add_message(message) - # 通知 scheduler_dispatcher 处理消息接收事件 - # dispatcher 会检查是否需要打断、创建或更新 schedule - await scheduler_dispatcher.on_message_received(stream_id) - except Exception as e: logger.error(f"添加消息到聊天流 {stream_id} 时发生错误: {e}") @@ -321,9 +284,122 @@ class MessageManager: except Exception as e: logger.error(f"清理不活跃聊天流时发生错误: {e}") - # === 已废弃的方法已移除 === - # _check_and_handle_interruption 和 _trigger_reprocess 已由 scheduler_dispatcher 接管 - # 如需查看历史代码,请参考 git 历史记录 + async def _check_and_handle_interruption(self, chat_stream: ChatStream | None = None, message: DatabaseMessages | None = None): + """检查并处理消息打断 - 通过取消 stream_loop_task 实现""" + if not global_config.chat.interruption_enabled or not chat_stream or not message: + return + + # 检查是否正在回复,以及是否允许在回复时打断 + if chat_stream.context_manager.context.is_replying: + if not global_config.chat.allow_reply_interruption: + logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复中,且配置不允许回复时打断,跳过打断检查") + return + else: + logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复中,但配置允许回复时打断") + + # 检查是否为表情包消息 + if message.is_picid or message.is_emoji: + logger.info(f"消息 {message.message_id} 是表情包或Emoji,跳过打断检查") + return + + # 检查上下文 + context = chat_stream.context_manager.context + + # 只有当 Chatter 真正在处理时才检查打断 + if not context.is_chatter_processing: + logger.debug(f"聊天流 {chat_stream.stream_id} Chatter 未在处理,跳过打断检查") + return + + # 检查是否有 stream_loop_task 在运行 + stream_loop_task = context.stream_loop_task + + if stream_loop_task and not stream_loop_task.done(): + # 检查触发用户ID + triggering_user_id = context.triggering_user_id + if triggering_user_id and message.user_info.user_id != triggering_user_id: + logger.info(f"消息来自非触发用户 {message.user_info.user_id},实际触发用户为 {triggering_user_id},跳过打断检查") + return + + # 计算打断概率 + interruption_probability = context.calculate_interruption_probability( + global_config.chat.interruption_max_limit + ) + + # 检查是否已达到最大打断次数 + if context.interruption_count >= global_config.chat.interruption_max_limit: + logger.debug( + f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},跳过打断检查" + ) + return + + # 根据概率决定是否打断 + if random.random() < interruption_probability: + logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}") + + # 取消 stream_loop_task,子任务会通过 try-catch 自动取消 + try: + stream_loop_task.cancel() + + # 等待任务真正结束(设置超时避免死锁) + try: + await asyncio.wait_for(stream_loop_task, timeout=2.0) + logger.info(f"流循环任务已完全结束: {chat_stream.stream_id}") + except asyncio.TimeoutError: + logger.warning(f"等待流循环任务结束超时: {chat_stream.stream_id}") + except asyncio.CancelledError: + logger.info(f"流循环任务已被取消: {chat_stream.stream_id}") + except Exception as e: + logger.warning(f"取消流循环任务失败: {chat_stream.stream_id} - {e}") + + # 增加打断计数 + await context.increment_interruption_count() + + # 打断后重新创建 stream_loop 任务 + await self._trigger_reprocess(chat_stream) + + # 检查是否已达到最大次数 + if context.interruption_count >= global_config.chat.interruption_max_limit: + logger.warning( + f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断" + ) + else: + logger.info( + f"聊天流 {chat_stream.stream_id} 已打断并重新进入处理流程,当前打断次数: {context.interruption_count}/{global_config.chat.interruption_max_limit}" + ) + else: + logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f}") + + async def _trigger_reprocess(self, chat_stream: ChatStream): + """重新处理聊天流的核心逻辑 - 重新创建 stream_loop 任务""" + try: + stream_id = chat_stream.stream_id + + logger.info(f"🚀 打断后重新创建流循环任务: {stream_id}") + + # 等待一小段时间确保当前消息已经添加到未读消息中 + await asyncio.sleep(0.1) + + # 获取当前的stream context + context = chat_stream.context_manager.context + + # 确保有未读消息需要处理 + unread_messages = context.get_unread_messages() + if not unread_messages: + logger.debug(f"聊天流 {stream_id} 没有未读消息,跳过重新处理") + return + + logger.debug(f"准备重新处理 {len(unread_messages)} 条未读消息: {stream_id}") + + # 重新创建 stream_loop 任务 + success = await stream_loop_manager.start_stream_loop(stream_id, force=True) + + if success: + logger.debug(f"成功重新创建流循环任务: {stream_id}") + else: + logger.warning(f"重新创建流循环任务失败: {stream_id}") + + except Exception as e: + logger.error(f"触发重新处理时出错: {e}") async def clear_all_unread_messages(self, stream_id: str): """清除指定上下文中的所有未读消息,在消息处理完成后调用""" @@ -374,71 +450,44 @@ class MessageManager: except Exception as e: logger.error(f"清除流 {stream_id} 的未读消息时发生错误: {e}") - # ===== 消息缓存系统方法 ===== - - def add_message_to_cache(self, stream_id: str, message: DatabaseMessages) -> bool: - """添加消息到缓存 - - Args: - stream_id: 流ID - message: 消息对象 - - Returns: - bool: 是否成功添加到缓存 - """ - try: - if not self.is_running: - return False - - self.message_caches[stream_id].append(message) - self.cache_stats["total_cached_messages"] += 1 - - if message.processed_plain_text: - logger.debug(f"消息已添加到缓存: stream={stream_id}, content={message.processed_plain_text[:50]}...") - return True - except Exception as e: - logger.error(f"添加消息到缓存失败: stream={stream_id}, error={e}") - return False - - def flush_cached_messages(self, stream_id: str) -> list[DatabaseMessages]: - """刷新缓存消息到未读消息列表 - - Args: - stream_id: 流ID - - Returns: - List[DatabaseMessages]: 缓存的消息列表 - """ - try: - if stream_id not in self.message_caches: - return [] - - cached_messages = list(self.message_caches[stream_id]) - self.message_caches[stream_id].clear() - - self.cache_stats["total_flushed_messages"] += len(cached_messages) - - logger.debug(f"刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}") - return cached_messages - except Exception as e: - logger.error(f"刷新缓存消息失败: stream={stream_id}, error={e}") - return [] + # ===== 流处理状态相关方法(用于向后兼容) ===== def set_stream_processing_status(self, stream_id: str, is_processing: bool): - """设置流的处理状态 + """设置流的处理状态 - 已迁移到StreamContext,此方法仅用于向后兼容 Args: stream_id: 流ID is_processing: 是否正在处理 """ try: - self.stream_processing_status[stream_id] = is_processing - logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}") + # 尝试更新StreamContext的处理状态 + import asyncio + async def _update_context(): + try: + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(stream_id) + if chat_stream and hasattr(chat_stream.context_manager.context, 'is_chatter_processing'): + chat_stream.context_manager.context.is_chatter_processing = is_processing + logger.debug(f"设置StreamContext处理状态: stream={stream_id}, processing={is_processing}") + except Exception as e: + logger.debug(f"更新StreamContext状态失败: stream={stream_id}, error={e}") + + # 在当前事件循环中执行(如果可能) + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.create_task(_update_context()) + else: + # 如果事件循环未运行,则跳过 + logger.debug("事件循环未运行,跳过StreamContext状态更新") + except RuntimeError: + logger.debug("无法获取事件循环,跳过StreamContext状态更新") + except Exception as e: - logger.error(f"设置流处理状态失败: stream={stream_id}, error={e}") + logger.debug(f"设置流处理状态失败(向后兼容模式): stream={stream_id}, error={e}") def get_stream_processing_status(self, stream_id: str) -> bool: - """获取流的处理状态 + """获取流的处理状态 - 已迁移到StreamContext,此方法仅用于向后兼容 Args: stream_id: 流ID @@ -446,43 +495,33 @@ class MessageManager: Returns: bool: 是否正在处理 """ - return self.stream_processing_status.get(stream_id, False) + try: + # 尝试从StreamContext获取处理状态 + import asyncio + async def _get_context_status(): + try: + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(stream_id) + if chat_stream and hasattr(chat_stream.context_manager.context, 'is_chatter_processing'): + return chat_stream.context_manager.context.is_chatter_processing + except Exception: + pass + return False - def has_cached_messages(self, stream_id: str) -> bool: - """检查流是否有缓存消息 + # 同步获取状态(如果可能) + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + # 如果事件循环正在运行,我们无法在这里等待,返回默认值 + return False + else: + # 如果事件循环未运行,运行它来获取状态 + return loop.run_until_complete(_get_context_status()) + except RuntimeError: + return False - Args: - stream_id: 流ID - - Returns: - bool: 是否有缓存消息 - """ - return stream_id in self.message_caches and len(self.message_caches[stream_id]) > 0 - - def get_cached_messages_count(self, stream_id: str) -> int: - """获取流的缓存消息数量 - - Args: - stream_id: 流ID - - Returns: - int: 缓存消息数量 - """ - return len(self.message_caches.get(stream_id, [])) - - def get_cache_stats(self) -> dict[str, Any]: - """获取缓存统计信息 - - Returns: - Dict[str, Any]: 缓存统计信息 - """ - return { - "total_cached_messages": self.cache_stats["total_cached_messages"], - "total_flushed_messages": self.cache_stats["total_flushed_messages"], - "active_caches": len(self.message_caches), - "cached_streams": len([s for s in self.message_caches.keys() if self.message_caches[s]]), - "processing_streams": len([s for s in self.stream_processing_status.keys() if self.stream_processing_status[s]]), - } + except Exception: + return False # ===== Notice管理相关方法 ===== @@ -623,4 +662,4 @@ class MessageManager: # 创建全局消息管理器实例 -message_manager = MessageManager() +message_manager = MessageManager() \ No newline at end of file diff --git a/src/chat/message_manager/scheduler_dispatcher.py b/src/chat/message_manager/scheduler_dispatcher.py deleted file mode 100644 index ccc237324..000000000 --- a/src/chat/message_manager/scheduler_dispatcher.py +++ /dev/null @@ -1,628 +0,0 @@ -""" -基于 unified_scheduler 的消息分发管理器 -替代原有的 stream_loop_task 循环机制,使用统一的调度器来管理消息处理时机 -""" - -import asyncio -import time -from typing import Any - -from src.chat.chatter_manager import ChatterManager -from src.chat.energy_system import energy_manager -from src.common.data_models.message_manager_data_model import StreamContext -from src.common.logger import get_logger -from src.config.config import global_config -from src.plugin_system.apis.chat_api import get_chat_manager -from src.schedule.unified_scheduler import TriggerType, unified_scheduler - -logger = get_logger("scheduler_dispatcher") - - -class SchedulerDispatcher: - """基于 scheduler 的消息分发器 - - 工作流程: - 1. 接收消息时,将消息添加到聊天流上下文 - 2. 检查是否有活跃的 schedule,如果没有则创建 - 3. 如果有,检查打断判定,成功则移除旧 schedule 并创建新的 - 4. schedule 到期时,激活 chatter 处理 - 5. 处理完成后,计算下次间隔并注册新 schedule - """ - - def __init__(self): - # 追踪每个流的 task_name - self.stream_schedules: dict[str, str] = {} # stream_id -> task_name - - # Chatter 管理器 - self.chatter_manager: ChatterManager | None = None - - # 统计信息 - self.stats = { - "total_schedules_created": 0, - "total_schedules_cancelled": 0, - "total_interruptions": 0, - "total_process_cycles": 0, - "total_failures": 0, - "start_time": time.time(), - } - - self.is_running = False - - logger.info("基于 Scheduler 的消息分发器初始化完成") - - async def start(self) -> None: - """启动分发器""" - if self.is_running: - logger.warning("分发器已在运行") - return - - self.is_running = True - logger.info("基于 Scheduler 的消息分发器已启动") - - async def stop(self) -> None: - """停止分发器""" - if not self.is_running: - return - - self.is_running = False - - # 取消所有活跃的 schedule - task_names = list(self.stream_schedules.values()) - for task_name in task_names: - try: - await unified_scheduler.remove_schedule_by_name(task_name) - except Exception as e: - logger.error(f"移除 schedule {task_name} 失败: {e}") - - self.stream_schedules.clear() - logger.info("基于 Scheduler 的消息分发器已停止") - - def set_chatter_manager(self, chatter_manager: ChatterManager) -> None: - """设置 Chatter 管理器""" - self.chatter_manager = chatter_manager - logger.debug(f"设置 Chatter 管理器: {chatter_manager.__class__.__name__}") - - async def on_message_received(self, stream_id: str) -> None: - """消息接收时的处理逻辑 - - Args: - stream_id: 聊天流ID - """ - if not self.is_running: - logger.warning("分发器未运行,忽略消息") - return - - try: - # 1. 获取流上下文 - context = await self._get_stream_context(stream_id) - if not context: - logger.warning(f"无法获取流上下文: {stream_id}") - return - - # 2. 检查是否有活跃的 schedule - task_name = self.stream_schedules.get(stream_id) - has_active_schedule = False - - if task_name: - # 验证schedule是否真的还在scheduler中活跃 - schedule_id = await unified_scheduler.find_schedule_by_name(task_name) - if schedule_id: - # 进一步检查任务是否正在执行或即将执行 - task = await unified_scheduler.get_task_info(schedule_id) - if task and task['is_active']: - has_active_schedule = True - logger.debug(f"验证到活跃schedule: 流={stream_id[:8]}..., task={task_name}") - else: - logger.warning(f"发现不活跃的schedule记录,将清理: 流={stream_id[:8]}..., task={task_name}") - # 清理无效记录 - self.stream_schedules.pop(stream_id, None) - else: - logger.warning(f"发现无效的schedule记录,将清理: 流={stream_id[:8]}..., task={task_name}") - # 清理无效记录 - self.stream_schedules.pop(stream_id, None) - - if not has_active_schedule: - # 4. 创建新的 schedule(在锁内,避免重复创建) - await self._create_schedule(stream_id, context) - return - - # 3. 检查打断判定 - if has_active_schedule: - should_interrupt = await self._check_interruption(stream_id, context) - - if should_interrupt: - # 移除旧 schedule 并创建新的 - await self._cancel_and_recreate_schedule(stream_id, context) - logger.debug(f"⚡ 打断成功: 流={stream_id[:8]}..., 已重新创建 schedule") - else: - logger.debug(f"打断判定失败,保持原有 schedule: 流={stream_id[:8]}...") - - except Exception as e: - logger.error(f"处理消息接收事件失败 {stream_id}: {e}", exc_info=True) - - async def _get_stream_context(self, stream_id: str) -> StreamContext | None: - """获取流上下文""" - try: - chat_manager = get_chat_manager() - chat_stream = await chat_manager.get_stream(stream_id) - if chat_stream: - return chat_stream.context_manager.context - return None - except Exception as e: - logger.error(f"获取流上下文失败 {stream_id}: {e}") - return None - - async def _check_interruption(self, stream_id: str, context: StreamContext) -> bool: - """检查是否应该打断当前处理 - - Args: - stream_id: 流ID - context: 流上下文 - - Returns: - bool: 是否应该打断 - """ - # 检查是否启用打断 - if not global_config.chat.interruption_enabled: - return False - - # 检查是否正在回复,以及是否允许在回复时打断 - if context.is_replying: - if not global_config.chat.allow_reply_interruption: - logger.debug(f"聊天流 {stream_id} 正在回复中,且配置不允许回复时打断") - return False - else: - logger.debug(f"聊天流 {stream_id} 正在回复中,但配置允许回复时打断") - - # 只有当 Chatter 真正在处理时才检查打断 - if not context.is_chatter_processing: - logger.debug(f"聊天流 {stream_id} Chatter 未在处理,无需打断") - return False - - # 检查最后一条消息 - last_message = context.get_last_message() - if not last_message: - return False - - # 检查是否为表情包消息 - if last_message.is_picid or last_message.is_emoji: - logger.info(f"消息 {last_message.message_id} 是表情包或Emoji,跳过打断检查") - return False - - # 检查触发用户ID - triggering_user_id = context.triggering_user_id - if triggering_user_id and last_message.user_info.user_id != triggering_user_id: - logger.info(f"消息来自非触发用户 {last_message.user_info.user_id},实际触发用户为 {triggering_user_id},跳过打断检查") - return False - - # 检查是否已达到最大打断次数 - if context.interruption_count >= global_config.chat.interruption_max_limit: - logger.debug( - f"聊天流 {stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit}" - ) - return False - - # 计算打断概率 - interruption_probability = context.calculate_interruption_probability( - global_config.chat.interruption_max_limit - ) - - # 根据概率决定是否打断 - import random - if random.random() < interruption_probability: - logger.debug(f"聊天流 {stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}") - - # 增加打断计数 - await context.increment_interruption_count() - self.stats["total_interruptions"] += 1 - - # 检查是否已达到最大次数 - if context.interruption_count >= global_config.chat.interruption_max_limit: - logger.warning( - f"聊天流 {stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断" - ) - else: - logger.info( - f"聊天流 {stream_id} 已打断,当前打断次数: {context.interruption_count}/{global_config.chat.interruption_max_limit}" - ) - - return True - else: - logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}") - return False - - async def _cancel_and_recreate_schedule(self, stream_id: str, context: StreamContext) -> None: - """取消旧的 schedule 并创建新的(打断模式,使用极短延迟) - - Args: - stream_id: 流ID - context: 流上下文 - """ - # 移除旧的 schedule - old_task_name = self.stream_schedules.get(stream_id) - if old_task_name: - success = await unified_scheduler.remove_schedule_by_name(old_task_name) - if success: - logger.info(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., task={old_task_name}") - self.stats["total_schedules_cancelled"] += 1 - # 只有成功移除后才从追踪中删除 - self.stream_schedules.pop(stream_id) - else: - logger.error( - f"❌ 打断失败:无法移除旧 schedule: 流={stream_id[:8]}..., " - f"task={old_task_name}, 放弃创建新 schedule 避免重复" - ) - # 移除失败,不创建新 schedule,避免重复 - return - - # 创建新的 schedule,使用即时处理模式(极短延迟) - await self._create_schedule(stream_id, context, immediate_mode=True) - - async def _create_schedule(self, stream_id: str, context: StreamContext, immediate_mode: bool = False) -> None: - """为聊天流创建新的 schedule - - Args: - stream_id: 流ID - context: 流上下文 - immediate_mode: 是否使用即时处理模式(打断时使用极短延迟) - """ - try: - # 检查是否已有活跃的 schedule,如果有则先移除 - if stream_id in self.stream_schedules: - old_task_name = self.stream_schedules[stream_id] - logger.warning( - f"⚠️ 流 {stream_id[:8]}... 已有活跃 schedule {old_task_name}, " - f"这不应该发生,将先移除旧 schedule" - ) - await unified_scheduler.remove_schedule_by_name(old_task_name) - del self.stream_schedules[stream_id] - - # 如果是即时处理模式(打断时),使用固定的1秒延迟立即重新处理 - if immediate_mode: - delay = 1.0 # 硬编码1秒延迟,确保打断后能快速重新处理 - logger.debug( - f"⚡ 打断模式启用: 流={stream_id[:8]}..., " - f"使用即时延迟={delay:.1f}s 立即重新处理" - ) - else: - # 常规模式:计算初始延迟 - delay = await self._calculate_initial_delay(stream_id, context) - - # 获取未读消息数量用于日志 - unread_count = len(context.unread_messages) if context.unread_messages else 0 - - # 生成任务名称 - 使用stream_id确保唯一性 - task_name = f"dispatch_{stream_id}" - - # 创建 schedule - 使用force_overwrite确保可以覆盖 - schedule_id = await unified_scheduler.create_schedule( - callback=self._on_schedule_triggered, - trigger_type=TriggerType.TIME, - trigger_config={"delay_seconds": delay}, - is_recurring=False, # 一次性任务,处理完后会创建新的 - task_name=task_name, - callback_args=(stream_id,), - force_overwrite=True, # 允许覆盖同名任务 - ) - - # 追踪 task_name - self.stream_schedules[stream_id] = task_name - self.stats["total_schedules_created"] += 1 - - mode_indicator = "⚡打断" if immediate_mode else "📅常规" - - logger.info( - f"{mode_indicator} 创建 schedule: 流={stream_id[:8]}..., " - f"延迟={delay:.3f}s, 未读={unread_count}, " - f"task={task_name}, ID={schedule_id[:8]}..." - ) - - except Exception as e: - logger.error(f"创建 schedule 失败 {stream_id}: {e}", exc_info=True) - - async def _calculate_initial_delay(self, stream_id: str, context: StreamContext) -> float: - """计算初始延迟时间 - - Args: - stream_id: 流ID - context: 流上下文 - - Returns: - float: 延迟时间(秒) - """ - # 基础间隔 - base_interval = getattr(global_config.chat, "distribution_interval", 5.0) - - # 检查是否有未读消息 - unread_count = len(context.unread_messages) if context.unread_messages else 0 - - # 强制分发阈值 - force_dispatch_threshold = getattr(global_config.chat, "force_dispatch_unread_threshold", 20) - - # 如果未读消息过多,使用最小间隔 - if force_dispatch_threshold and unread_count > force_dispatch_threshold: - min_interval = getattr(global_config.chat, "force_dispatch_min_interval", 0.1) - logger.warning( - f"⚠️ 强制分发触发: 流={stream_id[:8]}..., " - f"未读={unread_count} (阈值={force_dispatch_threshold}), " - f"使用最小间隔={min_interval}s" - ) - return min_interval - - # 尝试使用能量管理器计算间隔 - try: - # 更新能量值 - await self._update_stream_energy(stream_id, context) - - # 获取当前 focus_energy - focus_energy = energy_manager.energy_cache.get(stream_id, (0.5, 0))[0] - - # 使用能量管理器计算间隔 - interval = energy_manager.get_distribution_interval(focus_energy) - - logger.info( - f"📊 动态间隔计算: 流={stream_id[:8]}..., " - f"能量={focus_energy:.3f}, 间隔={interval:.2f}s" - ) - return interval - - except Exception as e: - logger.info( - f"📊 使用默认间隔: 流={stream_id[:8]}..., " - f"间隔={base_interval:.2f}s (动态计算失败: {e})" - ) - return base_interval - - async def _update_stream_energy(self, stream_id: str, context: StreamContext) -> None: - """更新流的能量值 - - Args: - stream_id: 流ID - context: 流上下文 - """ - try: - from src.chat.message_receive.chat_stream import get_chat_manager - - # 获取聊天流 - chat_manager = get_chat_manager() - chat_stream = await chat_manager.get_stream(stream_id) - - if not chat_stream: - logger.debug(f"无法找到聊天流 {stream_id},跳过能量更新") - return - - # 合并未读消息和历史消息 - all_messages = [] - - # 添加历史消息 - history_messages = context.get_history_messages(limit=global_config.chat.max_context_size) - all_messages.extend(history_messages) - - # 添加未读消息 - unread_messages = context.get_unread_messages() - all_messages.extend(unread_messages) - - # 按时间排序并限制数量 - all_messages.sort(key=lambda m: m.time) - messages = all_messages[-global_config.chat.max_context_size:] - - # 获取用户ID - user_id = context.triggering_user_id - - # 使用能量管理器计算并缓存能量值 - energy = await energy_manager.calculate_focus_energy( - stream_id=stream_id, - messages=messages, - user_id=user_id - ) - - # 同步更新到 ChatStream - chat_stream._focus_energy = energy - - logger.debug(f"已更新流 {stream_id} 的能量值: {energy:.3f}") - - except Exception as e: - logger.warning(f"更新流能量失败 {stream_id}: {e}", exc_info=False) - - async def _on_schedule_triggered(self, stream_id: str) -> None: - """schedule 触发时的回调 - - Args: - stream_id: 流ID - """ - try: - task_name = self.stream_schedules.get(stream_id) - - logger.info( - f"⏰ Schedule 触发: 流={stream_id[:8]}..., " - f"task={task_name or 'None'}, " - f"开始处理消息" - ) - - # 获取流上下文 - context = await self._get_stream_context(stream_id) - if not context: - logger.warning(f"Schedule 触发时无法获取流上下文: {stream_id}") - return - - # 检查是否有未读消息 - if not context.unread_messages: - logger.debug(f"流 {stream_id} 没有未读消息,跳过处理") - return - - # 激活 chatter 处理(不需要锁,允许并发处理) - success = await self._process_stream(stream_id, context) - - # 更新统计 - self.stats["total_process_cycles"] += 1 - if not success: - self.stats["total_failures"] += 1 - - # 清理schedule记录 - 处理完成后总是清理本地记录 - removed_task_name = self.stream_schedules.pop(stream_id, None) - if removed_task_name: - logger.debug(f"清理schedule记录: 流={stream_id[:8]}..., task={removed_task_name}") - - # 检查缓存中是否有待处理的消息 - from src.chat.message_manager.message_manager import message_manager - - has_cached = message_manager.has_cached_messages(stream_id) - - if has_cached: - # 有缓存消息,立即创建新 schedule 继续处理 - logger.info( - f"🔁 处理完成但有缓存消息: 流={stream_id[:8]}..., " - f"立即创建新 schedule 继续处理" - ) - await self._create_schedule(stream_id, context) - else: - # 没有缓存消息,不创建 schedule,等待新消息到达 - logger.debug( - f"✅ 处理完成且无缓存消息: 流={stream_id[:8]}..., " - f"等待新消息到达" - ) - - except Exception as e: - logger.error(f"Schedule 回调执行失败 {stream_id}: {e}", exc_info=True) - - async def _process_stream(self, stream_id: str, context: StreamContext) -> bool: - """处理流消息 - - Args: - stream_id: 流ID - context: 流上下文 - - Returns: - bool: 是否处理成功 - """ - if not self.chatter_manager: - logger.warning(f"Chatter 管理器未设置: {stream_id}") - return False - - # 设置处理状态 - self._set_stream_processing_status(stream_id, True) - - try: - start_time = time.time() - - # 设置触发用户ID - last_message = context.get_last_message() - if last_message: - context.triggering_user_id = last_message.user_info.user_id - - # 创建异步任务刷新能量(不阻塞主流程) - energy_task = asyncio.create_task(self._refresh_focus_energy(stream_id)) - - # 设置 Chatter 正在处理的标志 - context.is_chatter_processing = True - logger.debug(f"设置 Chatter 处理标志: {stream_id}") - - try: - # 调用 chatter_manager 处理流上下文 - results = await self.chatter_manager.process_stream_context(stream_id, context) - success = results.get("success", False) - - if success: - process_time = time.time() - start_time - logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)") - else: - logger.warning(f"流处理失败: {stream_id} - {results.get('error_message', '未知错误')}") - - return success - - finally: - # 清除 Chatter 处理标志 - context.is_chatter_processing = False - logger.debug(f"清除 Chatter 处理标志: {stream_id}") - - # 等待能量刷新任务完成 - try: - await asyncio.wait_for(energy_task, timeout=5.0) - except asyncio.TimeoutError: - logger.warning(f"等待能量刷新超时: {stream_id}") - except Exception as e: - logger.debug(f"能量刷新任务异常: {e}") - - except Exception as e: - logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True) - return False - - finally: - # 设置处理状态为未处理 - self._set_stream_processing_status(stream_id, False) - - def _set_stream_processing_status(self, stream_id: str, is_processing: bool) -> None: - """设置流的处理状态""" - try: - from src.chat.message_manager.message_manager import message_manager - - if message_manager.is_running: - message_manager.set_stream_processing_status(stream_id, is_processing) - logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}") - - except ImportError: - logger.debug("MessageManager 不可用,跳过状态设置") - except Exception as e: - logger.warning(f"设置流处理状态失败: stream={stream_id}, error={e}") - - async def _refresh_focus_energy(self, stream_id: str) -> None: - """分发完成后刷新能量值""" - try: - chat_manager = get_chat_manager() - chat_stream = await chat_manager.get_stream(stream_id) - if not chat_stream: - logger.debug(f"刷新能量时未找到聊天流: {stream_id}") - return - - await chat_stream.context_manager.refresh_focus_energy_from_history() - logger.debug(f"已刷新聊天流 {stream_id} 的聚焦能量") - except Exception as e: - logger.warning(f"刷新聊天流 {stream_id} 能量失败: {e}") - - async def cleanup_invalid_schedules(self) -> int: - """清理无效的schedule记录 - - Returns: - int: 清理的记录数量 - """ - cleaned_count = 0 - invalid_streams = [] - - for stream_id, task_name in list(self.stream_schedules.items()): - # 验证schedule是否真的还在scheduler中活跃 - schedule_id = await unified_scheduler.find_schedule_by_name(task_name) - if not schedule_id: - invalid_streams.append(stream_id) - continue - - # 检查任务是否还在活跃状态 - task = await unified_scheduler.get_task_info(schedule_id) - if not task or not task['is_active']: - invalid_streams.append(stream_id) - - # 清理无效记录 - for stream_id in invalid_streams: - task_name = self.stream_schedules.pop(stream_id, None) - if task_name: - logger.info(f"清理无效schedule记录: 流={stream_id[:8]}..., task={task_name}") - cleaned_count += 1 - - return cleaned_count - - def get_statistics(self) -> dict[str, Any]: - """获取统计信息""" - uptime = time.time() - self.stats["start_time"] - return { - "is_running": self.is_running, - "active_schedules": len(self.stream_schedules), - "total_schedules_created": self.stats["total_schedules_created"], - "total_schedules_cancelled": self.stats["total_schedules_cancelled"], - "total_interruptions": self.stats["total_interruptions"], - "total_process_cycles": self.stats["total_process_cycles"], - "total_failures": self.stats["total_failures"], - "uptime": uptime, - } - - -# 全局实例 -scheduler_dispatcher = SchedulerDispatcher() diff --git a/src/chat/message_receive/chat_stream.py b/src/chat/message_receive/chat_stream.py index 36ec0dfbc..f021ddebe 100644 --- a/src/chat/message_receive/chat_stream.py +++ b/src/chat/message_receive/chat_stream.py @@ -565,6 +565,14 @@ class ChatManager: else: return None + def get_all_streams(self) -> dict[str, ChatStream]: + """获取所有聊天流 + + Returns: + dict[str, ChatStream]: 包含所有聊天流的字典,key为stream_id,value为ChatStream对象 + """ + return self.streams.copy() # 返回副本以防止外部修改 + @staticmethod def _prepare_stream_data(stream_data_dict: dict) -> dict: """准备聊天流保存数据""" diff --git a/src/common/data_models/message_manager_data_model.py b/src/common/data_models/message_manager_data_model.py index 5eb7f0f7b..801dc881b 100644 --- a/src/common/data_models/message_manager_data_model.py +++ b/src/common/data_models/message_manager_data_model.py @@ -5,6 +5,7 @@ import asyncio import time +from collections import deque from dataclasses import dataclass, field from enum import Enum from typing import TYPE_CHECKING, Optional @@ -66,6 +67,16 @@ class StreamContext(BaseDataModel): processing_message_id: str | None = None # 当前正在规划/处理的目标消息ID,用于防止重复回复 decision_history: list["DecisionRecord"] = field(default_factory=list) # 决策历史 + # 消息缓存系统相关字段 + message_cache: deque["DatabaseMessages"] = field(default_factory=deque) # 消息缓存队列 + is_cache_enabled: bool = False # 是否为此流启用缓存 + cache_stats: dict = field(default_factory=lambda: { + "total_cached_messages": 0, + "total_flushed_messages": 0, + "cache_hits": 0, + "cache_misses": 0 + }) # 缓存统计信息 + def add_action_to_message(self, message_id: str, action: str): """ 向指定消息添加执行的动作 @@ -189,13 +200,12 @@ class StreamContext(BaseDataModel): and hasattr(self.current_message, "additional_config") and self.current_message.additional_config ): + import orjson try: - import json - - config = json.loads(self.current_message.additional_config) + config = orjson.loads(self.current_message.additional_config) if config.get("template_info") and not config.get("template_default", True): return config.get("template_name") - except (json.JSONDecodeError, AttributeError): + except (orjson.JSONDecodeError, AttributeError): pass return None @@ -231,9 +241,8 @@ class StreamContext(BaseDataModel): # 优先从additional_config中获取format_info if hasattr(self.current_message, "additional_config") and self.current_message.additional_config: + import orjson try: - import orjson - logger.debug(f"[check_types] additional_config 类型: {type(self.current_message.additional_config)}") config = orjson.loads(self.current_message.additional_config) logger.debug(f"[check_types] 解析后的 config 键: {config.keys() if isinstance(config, dict) else 'N/A'}") @@ -311,6 +320,145 @@ class StreamContext(BaseDataModel): """获取优先级信息""" return self.priority_info + # ==================== 消息缓存系统方法 ==================== + + def enable_cache(self, enabled: bool = True): + """ + 启用或禁用消息缓存系统 + + Args: + enabled: 是否启用缓存 + """ + self.is_cache_enabled = enabled + logger.debug(f"StreamContext {self.stream_id} 缓存系统已{'启用' if enabled else '禁用'}") + + def add_message_to_cache(self, message: "DatabaseMessages") -> bool: + """ + 添加消息到缓存队列 + + Args: + message: 要缓存的消息 + + Returns: + bool: 是否成功添加到缓存 + """ + if not self.is_cache_enabled: + self.cache_stats["cache_misses"] += 1 + logger.debug(f"StreamContext {self.stream_id} 缓存未启用,消息无法缓存") + return False + + try: + self.message_cache.append(message) + self.cache_stats["total_cached_messages"] += 1 + self.cache_stats["cache_hits"] += 1 + logger.debug(f"消息已添加到缓存: stream={self.stream_id}, message_id={message.message_id}, 缓存大小={len(self.message_cache)}") + return True + except Exception as e: + logger.error(f"添加消息到缓存失败: stream={self.stream_id}, error={e}") + return False + + def flush_cached_messages(self) -> list["DatabaseMessages"]: + """ + 刷新缓存消息到未读消息列表 + + Returns: + list[DatabaseMessages]: 刷新的消息列表 + """ + if not self.message_cache: + logger.debug(f"StreamContext {self.stream_id} 缓存为空,无需刷新") + return [] + + try: + cached_messages = list(self.message_cache) + cache_size = len(cached_messages) + + # 清空缓存队列 + self.message_cache.clear() + + # 将缓存消息添加到未读消息列表 + self.unread_messages.extend(cached_messages) + + # 更新统计信息 + self.cache_stats["total_flushed_messages"] += cache_size + + logger.debug(f"缓存消息已刷新到未读列表: stream={self.stream_id}, 数量={cache_size}") + return cached_messages + + except Exception as e: + logger.error(f"刷新缓存消息失败: stream={self.stream_id}, error={e}") + return [] + + def get_cache_size(self) -> int: + """ + 获取当前缓存大小 + + Returns: + int: 缓存中的消息数量 + """ + return len(self.message_cache) + + def clear_cache(self): + """清空消息缓存""" + cache_size = len(self.message_cache) + self.message_cache.clear() + logger.debug(f"消息缓存已清空: stream={self.stream_id}, 清空数量={cache_size}") + + def has_cached_messages(self) -> bool: + """ + 检查是否有缓存的消息 + + Returns: + bool: 是否有缓存消息 + """ + return len(self.message_cache) > 0 + + def get_cache_stats(self) -> dict: + """ + 获取缓存统计信息 + + Returns: + dict: 缓存统计数据 + """ + stats = self.cache_stats.copy() + stats.update({ + "current_cache_size": len(self.message_cache), + "is_cache_enabled": self.is_cache_enabled, + "stream_id": self.stream_id + }) + return stats + + def add_message_with_cache_check(self, message: "DatabaseMessages", force_direct: bool = False) -> bool: + """ + 智能添加消息:根据缓存状态决定是缓存还是直接添加到未读列表 + + Args: + message: 要添加的消息 + force_direct: 是否强制直接添加到未读列表(跳过缓存) + + Returns: + bool: 是否成功添加 + """ + try: + # 如果强制直接添加或缓存未启用,直接添加到未读列表 + if force_direct or not self.is_cache_enabled: + self.unread_messages.append(message) + logger.debug(f"消息直接添加到未读列表: stream={self.stream_id}, message_id={message.message_id}") + return True + + # 如果正在处理中,添加到缓存 + if self.is_chatter_processing: + return self.add_message_to_cache(message) + + # 如果没有在处理,先刷新缓存再添加到未读列表 + self.flush_cached_messages() + self.unread_messages.append(message) + logger.debug(f"消息添加到未读列表(已刷新缓存): stream={self.stream_id}, message_id={message.message_id}") + return True + + except Exception as e: + logger.error(f"智能添加消息失败: stream={self.stream_id}, error={e}") + return False + def __deepcopy__(self, memo): """自定义深拷贝,跳过不可序列化的 asyncio.Task (processing_task)。 @@ -332,9 +480,16 @@ class StreamContext(BaseDataModel): memo[obj_id] = new for k, v in self.__dict__.items(): - if k == "processing_task": + if k in ["processing_task", "stream_loop_task"]: # 不复制 asyncio.Task,避免无法 pickling setattr(new, k, None) + elif k == "message_cache": + # 深拷贝消息缓存队列 + try: + setattr(new, k, copy.deepcopy(v, memo)) + except Exception: + # 如果拷贝失败,创建新的空队列 + setattr(new, k, deque()) else: try: setattr(new, k, copy.deepcopy(v, memo)) diff --git a/src/plugins/built_in/affinity_flow_chatter/core/affinity_chatter.py b/src/plugins/built_in/affinity_flow_chatter/core/affinity_chatter.py index aebe1254a..2f24d458a 100644 --- a/src/plugins/built_in/affinity_flow_chatter/core/affinity_chatter.py +++ b/src/plugins/built_in/affinity_flow_chatter/core/affinity_chatter.py @@ -87,7 +87,7 @@ class AffinityChatter(BaseChatter): self.stats["successful_executions"] += 1 self.last_activity_time = time.time() - result: ClassVar = { + result = { "success": True, "stream_id": self.stream_id, "plan_created": True, diff --git a/src/plugins/built_in/affinity_flow_chatter/planner/planner.py b/src/plugins/built_in/affinity_flow_chatter/planner/planner.py index 4f74a002b..587378f2a 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner/planner.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner/planner.py @@ -84,14 +84,12 @@ class ChatterActionPlanner: return [], None async def _enhanced_plan_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]: - """执行增强版规划流程""" + """执行增强版规划流程,根据模式分发到对应的处理函数""" try: - - - # 1. 生成初始 Plan + # 1. 确定当前模式 chat_mode = context.chat_mode if context else ChatMode.FOCUS - # 如果禁用了Normal模式,则强制将任何处于Normal模式的会话切换回Focus模式。 + # 2. 如果禁用了Normal模式,则强制切换回Focus模式 if not global_config.affinity_flow.enable_normal_mode and chat_mode == ChatMode.NORMAL: logger.info("Normal模式已禁用,强制切换回Focus模式") chat_mode = ChatMode.FOCUS @@ -99,12 +97,29 @@ class ChatterActionPlanner: context.chat_mode = ChatMode.FOCUS await self._sync_chat_mode_to_stream(context) - # Normal模式下使用简化流程 + # 3. 根据模式分发到对应的处理流程 if chat_mode == ChatMode.NORMAL: return await self._normal_mode_flow(context) + else: + return await self._focus_mode_flow(context) + except Exception as e: + logger.error(f"增强版规划流程出错: {e}") + self.planner_stats["failed_plans"] += 1 + # 清理处理标记 + if context: + context.processing_message_id = None + return [], None + + async def _focus_mode_flow(self, context: "StreamContext | None") -> tuple[list[dict[str, Any]], Any | None]: + """Focus模式下的完整plan流程 + + 执行完整的生成→筛选→执行流程,支持所有类型的动作,包括非回复动作。 + """ + try: unread_messages = context.get_unread_messages() if context else [] - # 2. 使用新的兴趣度管理系统进行评分 + + # 1. 使用新的兴趣度管理系统进行评分 max_message_interest = 0.0 reply_not_available = True aggregate_should_act = False @@ -123,7 +138,7 @@ class ChatterActionPlanner: message_should_act = getattr(message, "should_act", False) logger.debug( - f"消息 {message.message_id} 预计算标志: interest={message_interest:.3f}, " + f"Focus模式 - 消息 {message.message_id} 预计算标志: interest={message_interest:.3f}, " f"should_reply={message_should_reply}, should_act={message_should_act}" ) @@ -136,22 +151,22 @@ class ChatterActionPlanner: aggregate_should_act = True except Exception as e: - logger.warning(f"处理消息 {message.message_id} 失败: {e}") + logger.warning(f"Focus模式 - 处理消息 {message.message_id} 失败: {e}") message.interest_value = 0.0 message.should_reply = False message.should_act = False - # 检查兴趣度是否达到非回复动作阈值 + # 2. 检查兴趣度是否达到非回复动作阈值 non_reply_action_interest_threshold = global_config.affinity_flow.non_reply_action_interest_threshold if not aggregate_should_act: - logger.info("所有未读消息低于兴趣度阈值,不执行动作") + logger.info("Focus模式 - 所有未读消息低于兴趣度阈值,不执行动作") # 直接返回 no_action from src.common.data_models.info_data_model import ActionPlannerInfo no_action = ActionPlannerInfo( action_type="no_action", reasoning=( - "所有未读消息兴趣度未达阈值 " + "Focus模式 - 所有未读消息兴趣度未达阈值 " f"{non_reply_action_interest_threshold:.3f}" f"(最高兴趣度 {max_message_interest:.3f})" ), @@ -162,28 +177,27 @@ class ChatterActionPlanner: # 更新连续不回复计数 await self._update_interest_calculator_state(replied=False) - initial_plan = await self.generator.generate(chat_mode) + initial_plan = await self.generator.generate(ChatMode.FOCUS) filtered_plan = initial_plan filtered_plan.decided_actions = [no_action] else: - # 在规划前,先进行动作修改 + # 3. 在规划前,先进行动作修改 from src.chat.planner_actions.action_modifier import ActionModifier action_modifier = ActionModifier(self.action_manager, self.chat_id) await action_modifier.modify_actions() - # 在生成初始计划前,刷新缓存消息到未读列表 - await self._flush_cached_messages_to_unread(context) + # 4. 生成初始计划 + initial_plan = await self.generator.generate(ChatMode.FOCUS) - initial_plan = await self.generator.generate(chat_mode) - - # 确保Plan中包含所有当前可用的动作 + # 5. 确保Plan中包含所有当前可用的动作 initial_plan.available_actions = self.action_manager.get_using_actions() - # 4. 筛选 Plan + + # 6. 筛选 Plan available_actions = list(initial_plan.available_actions.keys()) plan_filter = ChatterPlanFilter(self.chat_id, available_actions) filtered_plan = await plan_filter.filter(reply_not_available, initial_plan) - # 4.5 检查是否正在处理相同的目标消息,防止重复回复 + # 7. 检查是否正在处理相同的目标消息,防止重复回复 target_message_id = None if filtered_plan and filtered_plan.decided_actions: for action in filtered_plan.decided_actions: @@ -195,17 +209,17 @@ class ChatterActionPlanner: target_message_id = action.action_message.get("message_id") break - # 如果找到目标消息ID,检查是否已经在处理中 + # 8. 如果找到目标消息ID,检查是否已经在处理中 if target_message_id and context: if context.processing_message_id == target_message_id: logger.warning( - f"目标消息 {target_message_id} 已经在处理中,跳过本次规划以防止重复回复" + f"Focus模式 - 目标消息 {target_message_id} 已经在处理中,跳过本次规划以防止重复回复" ) # 返回 no_action,避免重复处理 from src.common.data_models.info_data_model import ActionPlannerInfo no_action = ActionPlannerInfo( action_type="no_action", - reasoning=f"目标消息 {target_message_id} 已经在处理中,跳过以防止重复回复", + reasoning=f"Focus模式 - 目标消息 {target_message_id} 已经在处理中,跳过以防止重复回复", action_data={}, action_message=None, ) @@ -213,15 +227,15 @@ class ChatterActionPlanner: else: # 记录当前正在处理的消息ID context.processing_message_id = target_message_id - logger.debug(f"开始处理目标消息: {target_message_id}") + logger.debug(f"Focus模式 - 开始处理目标消息: {target_message_id}") - # 5. 使用 PlanExecutor 执行 Plan + # 9. 使用 PlanExecutor 执行 Plan execution_result = await self.executor.execute(filtered_plan) - # 6. 根据执行结果更新统计信息 + # 10. 根据执行结果更新统计信息 self._update_stats_from_execution_result(execution_result) - # 7. 更新兴趣计算器状态 + # 11. 更新兴趣计算器状态 if filtered_plan.decided_actions: has_reply = any( action.action_type in ["reply", "proactive_reply"] @@ -231,21 +245,20 @@ class ChatterActionPlanner: has_reply = False await self._update_interest_calculator_state(replied=has_reply) - # 8. Focus模式下如果执行了reply动作,根据focus_energy概率切换到Normal模式 - if chat_mode == ChatMode.FOCUS and context: - if has_reply and global_config.affinity_flow.enable_normal_mode: - await self._check_enter_normal_mode(context) + # 12. Focus模式下如果执行了reply动作,根据focus_energy概率切换到Normal模式 + if has_reply and context and global_config.affinity_flow.enable_normal_mode: + await self._check_enter_normal_mode(context) - # 9. 清理处理标记 + # 13. 清理处理标记 if context: context.processing_message_id = None - logger.debug("已清理处理标记,完成规划流程") + logger.debug("Focus模式 - 已清理处理标记,完成规划流程") - # 10. 返回结果 + # 14. 返回结果 return self._build_return_result(filtered_plan) except Exception as e: - logger.error(f"增强版规划流程出错: {e}") + logger.error(f"Focus模式流程出错: {e}") self.planner_stats["failed_plans"] += 1 # 清理处理标记 if context: @@ -258,32 +271,33 @@ class ChatterActionPlanner: 只计算兴趣值并判断是否达到reply阈值,不执行完整的plan流程。 根据focus_energy决定退出normal模式回到focus模式的概率。 """ - # 最后的保障措施,以防意外进入此流程 + # 安全检查:确保Normal模式已启用 if not global_config.affinity_flow.enable_normal_mode: - logger.warning("意外进入了Normal模式流程,但该模式已被禁用!将强制切换回Focus模式进行完整规划。") + logger.warning("Normal模式 - 意外进入了Normal模式流程,但该模式已被禁用!将强制切换回Focus模式进行完整规划。") if context: context.chat_mode = ChatMode.FOCUS await self._sync_chat_mode_to_stream(context) # 重新运行主规划流程,这次将正确使用Focus模式 return await self._enhanced_plan_flow(context) - try: - # Normal模式开始时,刷新缓存消息到未读列表 - await self._flush_cached_messages_to_unread(context) + try: unread_messages = context.get_unread_messages() if context else [] + # 1. 检查是否有未读消息 if not unread_messages: - logger.debug("Normal模式: 没有未读消息") + logger.debug("Normal模式 - 没有未读消息") from src.common.data_models.info_data_model import ActionPlannerInfo no_action = ActionPlannerInfo( action_type="no_action", - reasoning="Normal模式: 没有未读消息", + reasoning="Normal模式 - 没有未读消息", action_data={}, action_message=None, ) + # 检查是否需要退出Normal模式 + await self._check_exit_normal_mode(context) return [asdict(no_action)], None - # 检查是否有消息达到reply阈值 + # 2. 检查是否有消息达到reply阈值 should_reply = False target_message = None @@ -292,32 +306,34 @@ class ChatterActionPlanner: if message_should_reply: should_reply = True target_message = message - logger.info(f"Normal模式: 消息 {message.message_id} 达到reply阈值") + logger.info(f"Normal模式 - 消息 {message.message_id} 达到reply阈值,准备回复") break if should_reply and target_message: - # 检查是否正在处理相同的目标消息,防止重复回复 + # 3. 防重复检查:检查是否正在处理相同的目标消息 target_message_id = target_message.message_id if context and context.processing_message_id == target_message_id: logger.warning( - f"Normal模式: 目标消息 {target_message_id} 已经在处理中,跳过本次规划以防止重复回复" + f"Normal模式 - 目标消息 {target_message_id} 已经在处理中,跳过本次规划以防止重复回复" ) # 返回 no_action,避免重复处理 from src.common.data_models.info_data_model import ActionPlannerInfo no_action = ActionPlannerInfo( action_type="no_action", - reasoning=f"目标消息 {target_message_id} 已经在处理中,跳过以防止重复回复", + reasoning=f"Normal模式 - 目标消息 {target_message_id} 已经在处理中,跳过以防止重复回复", action_data={}, action_message=None, ) + # 检查是否需要退出Normal模式 + await self._check_exit_normal_mode(context) return [asdict(no_action)], None # 记录当前正在处理的消息ID if context: context.processing_message_id = target_message_id - logger.debug(f"Normal模式: 开始处理目标消息: {target_message_id}") + logger.debug(f"Normal模式 - 开始处理目标消息: {target_message_id}") - # 达到reply阈值,直接进入回复流程 + # 4. 构建回复动作(Normal模式的简化流程) from src.common.data_models.info_data_model import ActionPlannerInfo, Plan from src.plugin_system.base.component_types import ChatType @@ -326,7 +342,7 @@ class ChatterActionPlanner: reply_action = ActionPlannerInfo( action_type="reply", - reasoning="Normal模式: 兴趣度达到阈值,直接回复", + reasoning="Normal模式 - 兴趣度达到阈值,直接回复(简化流程)", action_data={"target_message_id": target_message.message_id}, action_message=target_message, should_quote_reply=False, # Normal模式默认不引用回复,保持对话流畅 @@ -341,31 +357,31 @@ class ChatterActionPlanner: decided_actions=[reply_action], ) - # 执行reply动作 + # 5. 执行reply动作 execution_result = await self.executor.execute(minimal_plan) self._update_stats_from_execution_result(execution_result) - logger.info("Normal模式: 执行reply动作完成") + logger.info("Normal模式 - 执行reply动作完成") - # 更新兴趣计算器状态(回复成功,重置不回复计数) + # 6. 更新兴趣计算器状态(回复成功,重置不回复计数) await self._update_interest_calculator_state(replied=True) - # 清理处理标记 + # 7. 清理处理标记 if context: context.processing_message_id = None - logger.debug("Normal模式: 已清理处理标记") + logger.debug("Normal模式 - 已清理处理标记") - # 无论是否回复,都进行退出normal模式的判定 + # 8. 检查是否需要退出Normal模式 await self._check_exit_normal_mode(context) return [asdict(reply_action)], target_message_dict else: # 未达到reply阈值 - logger.debug("Normal模式: 未达到reply阈值") + logger.debug("Normal模式 - 未达到reply阈值,不执行回复") from src.common.data_models.info_data_model import ActionPlannerInfo no_action = ActionPlannerInfo( action_type="no_action", - reasoning="Normal模式: 兴趣度未达到阈值", + reasoning="Normal模式 - 兴趣度未达到阈值", action_data={}, action_message=None, ) @@ -373,13 +389,13 @@ class ChatterActionPlanner: # 更新连续不回复计数 await self._update_interest_calculator_state(replied=False) - # 无论是否回复,都进行退出normal模式的判定 + # 检查是否需要退出Normal模式 await self._check_exit_normal_mode(context) return [asdict(no_action)], None except Exception as e: - logger.error(f"Normal模式流程出错: {e}") + logger.error(f"Normal模式 - 流程出错: {e}") self.planner_stats["failed_plans"] += 1 # 清理处理标记 if context: