diff --git a/docs/message_dispatcher_refactoring.md b/docs/message_dispatcher_refactoring.md new file mode 100644 index 000000000..de70b3522 --- /dev/null +++ b/docs/message_dispatcher_refactoring.md @@ -0,0 +1,210 @@ +# 消息分发器重构文档 + +## 重构日期 +2025-11-04 + +## 重构目标 +将基于异步任务循环的消息分发机制改为使用统一的 `unified_scheduler`,实现更优雅和可维护的消息处理流程。 + +## 重构内容 + +### 1. 修改 unified_scheduler 以支持完全并发执行 + +**文件**: `src/schedule/unified_scheduler.py` + +**主要改动**: +- 修改 `_check_and_trigger_tasks` 方法,使用 `asyncio.create_task` 为每个到期任务创建独立的异步任务 +- 新增 `_execute_task_callback` 方法,用于并发执行单个任务 +- 使用 `asyncio.gather` 并发等待所有任务完成,确保不同 schedule 之间完全异步执行,不会相互阻塞 + +**关键改进**: +```python +# 为每个任务创建独立的异步任务,确保并发执行 +execution_tasks = [] +for task in tasks_to_trigger: + execution_task = asyncio.create_task( + self._execute_task_callback(task, current_time), + name=f"execute_{task.task_name}" + ) + execution_tasks.append(execution_task) + +# 等待所有任务完成(使用 return_exceptions=True 避免单个任务失败影响其他任务) +results = await asyncio.gather(*execution_tasks, return_exceptions=True) +``` + +### 2. 创建新的 SchedulerDispatcher + +**文件**: `src/chat/message_manager/scheduler_dispatcher.py` + +**功能**: +基于 `unified_scheduler` 的消息分发器,替代原有的 `stream_loop_task` 循环机制。 + +**工作流程**: +1. **接收消息时**: 将消息添加到聊天流上下文(缓存) +2. **检查 schedule**: 查看该聊天流是否有活跃的 schedule +3. **打断判定**: 如果有活跃 schedule,检查是否需要打断 + - 如果需要打断,移除旧 schedule 并创建新的 + - 如果不需要打断,保持原有 schedule +4. **创建 schedule**: 如果没有活跃 schedule,创建新的 +5. **Schedule 触发**: 当 schedule 到期时,激活 chatter 进行处理 +6. **处理完成**: 计算下次间隔并根据需要注册新的 schedule + +**关键方法**: +- `on_message_received(stream_id)`: 消息接收时的处理入口 +- `_check_interruption(stream_id, context)`: 检查是否应该打断 +- `_create_schedule(stream_id, context)`: 创建新的 schedule +- `_cancel_and_recreate_schedule(stream_id, context)`: 取消并重新创建 schedule +- `_on_schedule_triggered(stream_id)`: schedule 触发时的回调 +- `_process_stream(stream_id, context)`: 激活 chatter 处理消息 + +### 3. 修改 MessageManager 集成新分发器 + +**文件**: `src/chat/message_manager/message_manager.py` + +**主要改动**: +1. 导入 `scheduler_dispatcher` +2. 启动时初始化 `scheduler_dispatcher` 而非 `stream_loop_manager` +3. 修改 `add_message` 方法: + - 将消息添加到上下文后 + - 调用 `scheduler_dispatcher.on_message_received(stream_id)` 处理消息接收事件 +4. 废弃 `_check_and_handle_interruption` 方法(打断逻辑已集成到 dispatcher) + +**新的消息接收流程**: +```python +async def add_message(self, stream_id: str, message: DatabaseMessages): + # 1. 检查 notice 消息 + if self._is_notice_message(message): + await self._handle_notice_message(stream_id, message) + if not global_config.notice.enable_notice_trigger_chat: + return + + # 2. 将消息添加到上下文 + chat_stream = await chat_manager.get_stream(stream_id) + await chat_stream.context_manager.add_message(message) + + # 3. 通知 scheduler_dispatcher 处理 + await scheduler_dispatcher.on_message_received(stream_id) +``` + +### 4. 更新模块导出 + +**文件**: `src/chat/message_manager/__init__.py` + +**改动**: +- 导出 `SchedulerDispatcher` 和 `scheduler_dispatcher` + +## 架构对比 + +### 旧架构 (基于 stream_loop_task) +``` +消息到达 -> add_message -> 添加到上下文 -> 检查打断 -> 取消 stream_loop_task + -> 重新创建 stream_loop_task + +stream_loop_task: while True: + 检查未读消息 -> 处理消息 -> 计算间隔 -> sleep(间隔) +``` + +**问题**: +- 每个聊天流维护一个独立的异步循环任务 +- 即使没有消息也需要持续轮询 +- 打断逻辑通过取消和重建任务实现,较为复杂 +- 难以统一管理和监控 + +### 新架构 (基于 unified_scheduler) +``` +消息到达 -> add_message -> 添加到上下文 -> dispatcher.on_message_received + -> 检查是否有活跃 schedule + -> 打断判定 + -> 创建/更新 schedule + +schedule 到期 -> _on_schedule_triggered -> 处理消息 -> 计算间隔 -> 创建新 schedule (如果需要) +``` + +**优势**: +- 使用统一的调度器管理所有聊天流 +- 按需创建 schedule,没有消息时不会创建 +- 打断逻辑清晰:移除旧 schedule + 创建新 schedule +- 易于监控和统计(统一的 scheduler 统计) +- 完全异步并发,多个 schedule 可以同时触发而不相互阻塞 + +## 兼容性 + +### 保留的组件 +- `stream_loop_manager`: 暂时保留但不启动,以便需要时回滚 +- `_check_and_handle_interruption`: 保留方法签名但不执行,避免破坏现有调用 + +### 移除的组件 +- 无(本次重构采用渐进式方式,先添加新功能,待稳定后再移除旧代码) + +## 配置项 + +所有配置项保持不变,新分发器完全兼容现有配置: +- `chat.interruption_enabled`: 是否启用打断 +- `chat.allow_reply_interruption`: 是否允许回复时打断 +- `chat.interruption_max_limit`: 最大打断次数 +- `chat.distribution_interval`: 基础分发间隔 +- `chat.force_dispatch_unread_threshold`: 强制分发阈值 +- `chat.force_dispatch_min_interval`: 强制分发最小间隔 + +## 测试建议 + +1. **基本功能测试** + - 单个聊天流接收消息并正常处理 + - 多个聊天流同时接收消息并并发处理 + +2. **打断测试** + - 在 chatter 处理过程中发送新消息,验证打断逻辑 + - 验证打断次数限制 + - 验证打断概率计算 + +3. **间隔计算测试** + - 验证基于能量的动态间隔计算 + - 验证强制分发阈值触发 + +4. **并发测试** + - 多个聊天流的 schedule 同时到期,验证并发执行 + - 验证不同 schedule 之间不会相互阻塞 + +5. **长时间稳定性测试** + - 运行较长时间,观察是否有内存泄漏 + - 观察 schedule 创建和销毁是否正常 + +## 回滚方案 + +如果新机制出现问题,可以通过以下步骤回滚: + +1. 在 `message_manager.py` 的 `start()` 方法中: + ```python + # 注释掉新分发器 + # await scheduler_dispatcher.start() + # scheduler_dispatcher.set_chatter_manager(self.chatter_manager) + + # 启用旧分发器 + await stream_loop_manager.start() + stream_loop_manager.set_chatter_manager(self.chatter_manager) + ``` + +2. 在 `add_message()` 方法中: + ```python + # 注释掉新逻辑 + # await scheduler_dispatcher.on_message_received(stream_id) + + # 恢复旧逻辑 + await self._check_and_handle_interruption(chat_stream, message) + ``` + +3. 在 `_check_and_handle_interruption()` 方法中移除开头的 `return` 语句 + +## 后续工作 + +1. 在确认新机制稳定后,完全移除 `stream_loop_manager` 相关代码 +2. 清理 `StreamContext` 中的 `stream_loop_task` 字段 +3. 移除 `_check_and_handle_interruption` 方法 +4. 更新相关文档和注释 + +## 性能预期 + +- **资源占用**: 减少(不再为每个流维护独立循环) +- **响应延迟**: 不变(仍基于相同的间隔计算) +- **并发能力**: 提升(完全异步执行,无阻塞) +- **可维护性**: 提升(逻辑更清晰,统一管理) diff --git a/src/chat/message_manager/__init__.py b/src/chat/message_manager/__init__.py index c8bd18a08..0edf5d1d3 100644 --- a/src/chat/message_manager/__init__.py +++ b/src/chat/message_manager/__init__.py @@ -1,16 +1,16 @@ """ 消息管理器模块 -提供统一的消息管理、上下文管理和流循环调度功能 +提供统一的消息管理、上下文管理和基于 scheduler 的消息分发功能 """ from .context_manager import SingleStreamContextManager -from .distribution_manager import StreamLoopManager, stream_loop_manager from .message_manager import MessageManager, message_manager +from .scheduler_dispatcher import SchedulerDispatcher, scheduler_dispatcher __all__ = [ "MessageManager", "SingleStreamContextManager", - "StreamLoopManager", + "SchedulerDispatcher", "message_manager", - "stream_loop_manager", + "scheduler_dispatcher", ] diff --git a/src/chat/message_manager/context_manager.py b/src/chat/message_manager/context_manager.py index 8d092f760..35434128a 100644 --- a/src/chat/message_manager/context_manager.py +++ b/src/chat/message_manager/context_manager.py @@ -15,11 +15,9 @@ from src.common.logger import get_logger from src.config.config import global_config from src.plugin_system.base.component_types import ChatType -from .distribution_manager import stream_loop_manager - logger = get_logger("context_manager") -# 全局背景任务集合 +# 全局背景任务集合(用于异步初始化等后台任务) _background_tasks = set() @@ -59,18 +57,26 @@ class SingleStreamContextManager: Args: message: 消息对象 - skip_energy_update: 是否跳过能量更新(兼容参数,当前忽略) + skip_energy_update: 是否跳过能量更新(兼容参数,当前忽略) Returns: bool: 是否成功添加 """ try: - # 使用MessageManager的内置缓存系统 + # 尝试使用MessageManager的内置缓存系统 + use_cache_system = False + message_manager = None try: - from .message_manager import message_manager + from .message_manager import message_manager as mm + message_manager = mm + use_cache_system = message_manager.is_running + except Exception as e: + logger.debug(f"MessageManager不可用,使用直接添加: {e}") + use_cache_system = False - # 如果MessageManager正在运行,使用缓存系统 - if message_manager.is_running: + if use_cache_system and message_manager: + # 使用缓存系统 + try: # 先计算兴趣值(需要在缓存前计算) await self._calculate_message_interest(message) message.is_read = False @@ -97,18 +103,18 @@ class SingleStreamContextManager: else: logger.debug(f"消息已缓存,等待当前处理完成: stream={self.stream_id}") - # 启动流的循环任务(如果还未启动) - task = asyncio.create_task(stream_loop_manager.start_stream_loop(self.stream_id)) - _background_tasks.add(task) - task.add_done_callback(_background_tasks.discard) logger.debug(f"添加消息到缓存系统: {self.stream_id}") return True else: logger.warning(f"消息缓存系统添加失败,回退到直接添加: {self.stream_id}") - except Exception as e: - logger.warning(f"消息缓存系统异常,回退到直接添加: {self.stream_id}, error={e}") + use_cache_system = False + except Exception as e: + logger.warning(f"消息缓存系统异常,回退到直接添加: {self.stream_id}, error={e}") + use_cache_system = False - # 回退方案:直接添加到未读消息 + # 回退方案:直接添加到未读消息 + # 这部分代码在缓存系统失败或不可用时执行 + if not use_cache_system: message.is_read = False self.context.unread_messages.append(message) @@ -119,12 +125,13 @@ class SingleStreamContextManager: await self._calculate_message_interest(message) self.total_messages += 1 self.last_access_time = time.time() - # 启动流的循环任务(如果还未启动) - task = asyncio.create_task(stream_loop_manager.start_stream_loop(self.stream_id)) - _background_tasks.add(task) - task.add_done_callback(_background_tasks.discard) + logger.debug(f"添加消息{message.processed_plain_text}到单流上下文: {self.stream_id}") return True + + # 不应该到达这里,但为了类型检查添加返回值 + return True + except Exception as e: logger.error(f"添加消息到单流上下文失败 {self.stream_id}: {e}", exc_info=True) return False diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py deleted file mode 100644 index e74062fce..000000000 --- a/src/chat/message_manager/distribution_manager.py +++ /dev/null @@ -1,694 +0,0 @@ -""" -流循环管理器 -为每个聊天流创建独立的无限循环任务,主动轮询处理消息 -""" - -import asyncio -import time -from typing import Any - -from src.chat.chatter_manager import ChatterManager -from src.chat.energy_system import energy_manager -from src.common.data_models.message_manager_data_model import StreamContext -from src.common.logger import get_logger -from src.config.config import global_config -from src.plugin_system.apis.chat_api import get_chat_manager - -logger = get_logger("stream_loop_manager") - - -class StreamLoopManager: - """流循环管理器 - 每个流一个独立的无限循环任务""" - - def __init__(self, max_concurrent_streams: int | None = None): - # 统计信息 - self.stats: dict[str, Any] = { - "active_streams": 0, - "total_loops": 0, - "total_process_cycles": 0, - "total_failures": 0, - "start_time": time.time(), - } - - # 配置参数 - self.max_concurrent_streams = max_concurrent_streams or global_config.chat.max_concurrent_distributions - - # 强制分发策略 - self.force_dispatch_unread_threshold: int | None = getattr( - global_config.chat, "force_dispatch_unread_threshold", 20 - ) - self.force_dispatch_min_interval: float = getattr(global_config.chat, "force_dispatch_min_interval", 0.1) - - # Chatter管理器 - self.chatter_manager: ChatterManager | None = None - - # 状态控制 - self.is_running = False - - # 每个流的上一次间隔值(用于日志去重) - self._last_intervals: dict[str, float] = {} - - logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})") - - async def start(self) -> None: - """启动流循环管理器""" - if self.is_running: - logger.warning("流循环管理器已经在运行") - return - - self.is_running = True - - async def stop(self) -> None: - """停止流循环管理器""" - if not self.is_running: - return - - self.is_running = False - - # 取消所有流循环 - try: - # 获取所有活跃的流 - from src.plugin_system.apis.chat_api import get_chat_manager - - chat_manager = get_chat_manager() - all_streams = await chat_manager.get_all_streams() - - # 创建任务列表以便并发取消 - cancel_tasks = [] - for chat_stream in all_streams: - context = chat_stream.context_manager.context - if context.stream_loop_task and not context.stream_loop_task.done(): - context.stream_loop_task.cancel() - cancel_tasks.append((chat_stream.stream_id, context.stream_loop_task)) - - # 并发等待所有任务取消 - if cancel_tasks: - logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...") - await asyncio.gather( - *[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks], - return_exceptions=True, - ) - - logger.info("所有流循环已清理") - except Exception as e: - logger.error(f"停止管理器时出错: {e}") - - logger.info("流循环管理器已停止") - - async def start_stream_loop(self, stream_id: str, force: bool = False) -> bool: - """启动指定流的循环任务 - 优化版本使用自适应管理器 - - Args: - stream_id: 流ID - force: 是否强制启动 - - Returns: - bool: 是否成功启动 - """ - # 获取流上下文 - context = await self._get_stream_context(stream_id) - if not context: - logger.warning(f"无法获取流上下文: {stream_id}") - return False - - # 快速路径:如果流已存在且不是强制启动,无需处理 - if not force and context.stream_loop_task and not context.stream_loop_task.done(): - logger.debug(f"流 {stream_id} 循环已在运行") - return True - - # 如果是强制启动且任务仍在运行,先取消旧任务 - if force and context.stream_loop_task and not context.stream_loop_task.done(): - logger.debug(f"强制启动模式:先取消现有流循环任务: {stream_id}") - old_task = context.stream_loop_task - old_task.cancel() - try: - await asyncio.wait_for(old_task, timeout=2.0) - logger.debug(f"旧流循环任务已结束: {stream_id}") - except (asyncio.TimeoutError, asyncio.CancelledError): - logger.debug(f"旧流循环任务已取消或超时: {stream_id}") - except Exception as e: - logger.warning(f"等待旧任务结束时出错: {e}") - - # 创建流循环任务 - try: - loop_task = asyncio.create_task(self._stream_loop_worker(stream_id), name=f"stream_loop_{stream_id}") - - # 将任务记录到 StreamContext 中 - context.stream_loop_task = loop_task - - # 更新统计信息 - self.stats["active_streams"] += 1 - self.stats["total_loops"] += 1 - - logger.debug(f"启动流循环任务: {stream_id}") - return True - - except Exception as e: - logger.error(f"启动流循环任务失败 {stream_id}: {e}") - return False - - async def stop_stream_loop(self, stream_id: str) -> bool: - """停止指定流的循环任务 - - Args: - stream_id: 流ID - - Returns: - bool: 是否成功停止 - """ - # 获取流上下文 - context = await self._get_stream_context(stream_id) - if not context: - logger.debug(f"流 {stream_id} 上下文不存在,无需停止") - return False - - # 检查是否有 stream_loop_task - if not context.stream_loop_task or context.stream_loop_task.done(): - logger.debug(f"流 {stream_id} 循环不存在或已结束,无需停止") - return False - - task = context.stream_loop_task - if not task.done(): - task.cancel() - try: - # 设置取消超时,避免无限等待 - await asyncio.wait_for(task, timeout=5.0) - except asyncio.CancelledError: - logger.debug(f"流循环任务已取消: {stream_id}") - except asyncio.TimeoutError: - logger.warning(f"流循环任务取消超时: {stream_id}") - except Exception as e: - logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") - - # 清空 StreamContext 中的任务记录 - context.stream_loop_task = None - - logger.debug(f"停止流循环: {stream_id}") - return True - - async def _stream_loop_worker(self, stream_id: str) -> None: - """单个流的工作循环 - 优化版本 - - Args: - stream_id: 流ID - """ - logger.debug(f"流循环工作器启动: {stream_id}") - - try: - while self.is_running: - try: - # 1. 获取流上下文 - context = await self._get_stream_context(stream_id) - if not context: - logger.warning(f"无法获取流上下文: {stream_id}") - await asyncio.sleep(10.0) - continue - - # 2. 检查是否有消息需要处理 - unread_count = self._get_unread_count(context) - force_dispatch = self._needs_force_dispatch_for_context(context, unread_count) - - has_messages = force_dispatch or await self._has_messages_to_process(context) - - if has_messages: - if force_dispatch: - logger.info("流 %s 未读消息 %d 条,触发强制分发", stream_id, unread_count) - - # 3. 在处理前更新能量值(用于下次间隔计算) - try: - await self._update_stream_energy(stream_id, context) - except Exception as e: - logger.debug(f"更新流能量失败 {stream_id}: {e}") - - # 4. 激活chatter处理 - success = await self._process_stream_messages(stream_id, context) - - # 更新统计 - self.stats["total_process_cycles"] += 1 - if success: - logger.debug(f"流处理成功: {stream_id}") - else: - self.stats["total_failures"] += 1 - logger.warning(f"流处理失败: {stream_id}") - - # 5. 计算下次检查间隔 - interval = await self._calculate_interval(stream_id, has_messages) - - # 6. sleep等待下次检查 - # 只在间隔发生变化时输出日志,避免刷屏 - last_interval = self._last_intervals.get(stream_id) - if last_interval is None or abs(interval - last_interval) > 0.01: - logger.info(f"流 {stream_id} 等待周期变化: {interval:.2f}s") - self._last_intervals[stream_id] = interval - await asyncio.sleep(interval) - - except asyncio.CancelledError: - logger.debug(f"流循环被取消: {stream_id}") - break - except Exception as e: - logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True) - self.stats["total_failures"] += 1 - await asyncio.sleep(5.0) # 错误时等待5秒再重试 - - finally: - # 清理 StreamContext 中的任务记录 - try: - context = await self._get_stream_context(stream_id) - if context and context.stream_loop_task: - context.stream_loop_task = None - logger.debug(f"清理 StreamContext 中的流循环任务: {stream_id}") - except Exception as e: - logger.debug(f"清理 StreamContext 任务记录失败: {e}") - - # 清理间隔记录 - self._last_intervals.pop(stream_id, None) - - logger.debug(f"流循环结束: {stream_id}") - - async def _get_stream_context(self, stream_id: str) -> Any | None: - """获取流上下文 - - Args: - stream_id: 流ID - - Returns: - Optional[Any]: 流上下文,如果不存在返回None - """ - try: - chat_manager = get_chat_manager() - chat_stream = await chat_manager.get_stream(stream_id) - if chat_stream: - return chat_stream.context_manager.context - return None - except Exception as e: - logger.error(f"获取流上下文失败 {stream_id}: {e}") - return None - - async def _has_messages_to_process(self, context: StreamContext) -> bool: - """检查是否有消息需要处理 - - Args: - context: 流上下文 - - Returns: - bool: 是否有未读消息 - """ - try: - # 检查是否有未读消息 - if hasattr(context, "unread_messages") and context.unread_messages: - return True - - # 检查其他需要处理的条件 - if hasattr(context, "has_pending_messages") and context.has_pending_messages: - return True - - return False - except Exception as e: - logger.error(f"检查消息状态失败: {e}") - return False - - async def _process_stream_messages(self, stream_id: str, context: StreamContext) -> bool: - """处理流消息 - 支持子任务管理 - - Args: - stream_id: 流ID - context: 流上下文 - - Returns: - bool: 是否处理成功 - """ - if not self.chatter_manager: - logger.warning(f"Chatter管理器未设置: {stream_id}") - return False - - # 设置处理状态为正在处理 - self._set_stream_processing_status(stream_id, True) - - # 子任务跟踪 - child_tasks = set() - - try: - start_time = time.time() - - # 注意:缓存消息刷新已移至planner开始时执行(动作修改器之后),此处不再刷新 - - # 设置触发用户ID,以实现回复保护 - last_message = context.get_last_message() - if last_message: - context.triggering_user_id = last_message.user_info.user_id - - # 创建子任务用于刷新能量(不阻塞主流程) - energy_task = asyncio.create_task(self._refresh_focus_energy(stream_id)) - child_tasks.add(energy_task) - energy_task.add_done_callback(lambda t: child_tasks.discard(t)) - - # 设置 Chatter 正在处理的标志 - context.is_chatter_processing = True - logger.debug(f"设置 Chatter 处理标志: {stream_id}") - - # 直接调用chatter_manager处理流上下文 - results = await self.chatter_manager.process_stream_context(stream_id, context) - success = results.get("success", False) - - if success: - # 处理成功后,再次刷新缓存中可能的新消息 - additional_messages = await self._flush_cached_messages_to_unread(stream_id) - if additional_messages: - logger.debug(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}") - - process_time = time.time() - start_time - logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)") - else: - logger.warning(f"流处理失败: {stream_id} - {results.get('error_message', '未知错误')}") - - return success - - except asyncio.CancelledError: - logger.debug(f"流处理被取消: {stream_id}") - # 取消所有子任务 - for child_task in child_tasks: - if not child_task.done(): - child_task.cancel() - raise - except Exception as e: - logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True) - # 异常时也要清理子任务 - for child_task in child_tasks: - if not child_task.done(): - child_task.cancel() - return False - finally: - # 清除 Chatter 处理标志 - context.is_chatter_processing = False - logger.debug(f"清除 Chatter 处理标志: {stream_id}") - - # 无论成功或失败,都要设置处理状态为未处理 - self._set_stream_processing_status(stream_id, False) - - def _set_stream_processing_status(self, stream_id: str, is_processing: bool) -> None: - """设置流的处理状态""" - try: - from .message_manager import message_manager - - if message_manager.is_running: - message_manager.set_stream_processing_status(stream_id, is_processing) - logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}") - - except ImportError: - logger.debug("MessageManager不可用,跳过状态设置") - except Exception as e: - logger.warning(f"设置流处理状态失败: stream={stream_id}, error={e}") - - async def _flush_cached_messages_to_unread(self, stream_id: str) -> list: - """将缓存消息刷新到未读消息列表""" - try: - from .message_manager import message_manager - - if message_manager.is_running and message_manager.has_cached_messages(stream_id): - # 获取缓存消息 - cached_messages = message_manager.flush_cached_messages(stream_id) - - if cached_messages: - # 获取聊天流并添加到未读消息 - from src.plugin_system.apis.chat_api import get_chat_manager - - chat_manager = get_chat_manager() - chat_stream = await chat_manager.get_stream(stream_id) - - if chat_stream: - for message in cached_messages: - chat_stream.context_manager.context.unread_messages.append(message) - logger.debug(f"刷新缓存消息到未读列表: stream={stream_id}, 数量={len(cached_messages)}") - else: - logger.warning(f"无法找到聊天流: {stream_id}") - - return cached_messages - - return [] - - except ImportError: - logger.debug("MessageManager不可用,跳过缓存刷新") - return [] - except Exception as e: - logger.warning(f"刷新缓存消息失败: stream={stream_id}, error={e}") - return [] - - async def _update_stream_energy(self, stream_id: str, context: Any) -> None: - """更新流的能量值 - - Args: - stream_id: 流ID - context: 流上下文 (StreamContext) - """ - try: - from src.chat.message_receive.chat_stream import get_chat_manager - - # 获取聊天流 - chat_manager = get_chat_manager() - chat_stream = await chat_manager.get_stream(stream_id) - - if not chat_stream: - logger.debug(f"无法找到聊天流 {stream_id},跳过能量更新") - return - - # 从 context_manager 获取消息(包括未读和历史消息) - # 合并未读消息和历史消息 - all_messages = [] - - # 添加历史消息 - history_messages = context.get_history_messages(limit=global_config.chat.max_context_size) - all_messages.extend(history_messages) - - # 添加未读消息 - unread_messages = context.get_unread_messages() - all_messages.extend(unread_messages) - - # 按时间排序并限制数量 - all_messages.sort(key=lambda m: m.time) - messages = all_messages[-global_config.chat.max_context_size:] - - # 获取用户ID - user_id = None - if context.triggering_user_id: - user_id = context.triggering_user_id - - # 使用能量管理器计算并缓存能量值 - energy = await energy_manager.calculate_focus_energy( - stream_id=stream_id, - messages=messages, - user_id=user_id - ) - - # 同步更新到 ChatStream - chat_stream._focus_energy = energy - - logger.debug(f"已更新流 {stream_id} 的能量值: {energy:.3f}") - - except Exception as e: - logger.warning(f"更新流能量失败 {stream_id}: {e}", exc_info=False) - - async def _calculate_interval(self, stream_id: str, has_messages: bool) -> float: - """计算下次检查间隔 - - Args: - stream_id: 流ID - has_messages: 本次是否有消息处理 - - Returns: - float: 间隔时间(秒) - """ - # 基础间隔 - base_interval = getattr(global_config.chat, "distribution_interval", 5.0) - - # 如果没有消息,使用更长的间隔 - if not has_messages: - return base_interval * 2.0 # 无消息时间隔加倍 - - # 尝试使用能量管理器计算间隔 - try: - # 获取当前focus_energy - focus_energy = energy_manager.energy_cache.get(stream_id, (0.5, 0))[0] - - # 使用能量管理器计算间隔 - interval = energy_manager.get_distribution_interval(focus_energy) - - logger.debug(f"流 {stream_id} 动态间隔: {interval:.2f}s (能量: {focus_energy:.3f})") - return interval - - except Exception as e: - logger.debug(f"流 {stream_id} 使用默认间隔: {base_interval:.2f}s ({e})") - return base_interval - - def get_queue_status(self) -> dict[str, Any]: - """获取队列状态 - - Returns: - Dict[str, Any]: 队列状态信息 - """ - current_time = time.time() - uptime = current_time - self.stats["start_time"] if self.is_running else 0 - - # 从统计信息中获取活跃流数量 - active_streams = self.stats.get("active_streams", 0) - - return { - "active_streams": active_streams, - "total_loops": self.stats["total_loops"], - "max_concurrent": self.max_concurrent_streams, - "is_running": self.is_running, - "uptime": uptime, - "total_process_cycles": self.stats["total_process_cycles"], - "total_failures": self.stats["total_failures"], - "stats": self.stats.copy(), - } - - def set_chatter_manager(self, chatter_manager: ChatterManager) -> None: - """设置chatter管理器 - - Args: - chatter_manager: chatter管理器实例 - """ - self.chatter_manager = chatter_manager - logger.debug(f"设置chatter管理器: {chatter_manager.__class__.__name__}") - - async def _should_force_dispatch_for_stream(self, stream_id: str) -> bool: - if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0: - return False - - try: - chat_manager = get_chat_manager() - chat_stream = await chat_manager.get_stream(stream_id) - if not chat_stream: - return False - - unread = getattr(chat_stream.context_manager.context, "unread_messages", []) - return len(unread) > self.force_dispatch_unread_threshold - except Exception as e: - logger.debug(f"检查流 {stream_id} 是否需要强制分发失败: {e}") - return False - - def _get_unread_count(self, context: StreamContext) -> int: - try: - unread_messages = context.unread_messages - if unread_messages is None: - return 0 - return len(unread_messages) - except Exception: - return 0 - - def _needs_force_dispatch_for_context(self, context: StreamContext, unread_count: int | None = None) -> bool: - if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0: - return False - - count = unread_count if unread_count is not None else self._get_unread_count(context) - return count > self.force_dispatch_unread_threshold - - def get_performance_summary(self) -> dict[str, Any]: - """获取性能摘要 - - Returns: - Dict[str, Any]: 性能摘要 - """ - current_time = time.time() - uptime = current_time - self.stats["start_time"] - - # 计算吞吐量 - throughput = self.stats["total_process_cycles"] / max(1, uptime / 3600) # 每小时处理次数 - - # 从统计信息中获取活跃流数量 - active_streams = self.stats.get("active_streams", 0) - - return { - "uptime_hours": uptime / 3600, - "active_streams": active_streams, - "total_process_cycles": self.stats["total_process_cycles"], - "total_failures": self.stats["total_failures"], - "throughput_per_hour": throughput, - "max_concurrent_streams": self.max_concurrent_streams, - } - - async def _refresh_focus_energy(self, stream_id: str) -> None: - """分发完成后基于历史消息刷新能量值""" - try: - chat_manager = get_chat_manager() - chat_stream = await chat_manager.get_stream(stream_id) - if not chat_stream: - logger.debug(f"刷新能量时未找到聊天流: {stream_id}") - return - - await chat_stream.context_manager.refresh_focus_energy_from_history() - logger.debug(f"已刷新聊天流 {stream_id} 的聚焦能量") - except Exception as e: - logger.warning(f"刷新聊天流 {stream_id} 能量失败: {e}") - - async def _wait_for_task_cancel(self, stream_id: str, task: asyncio.Task) -> None: - """等待任务取消完成,带有超时控制 - - Args: - stream_id: 流ID - task: 要等待取消的任务 - """ - try: - await asyncio.wait_for(task, timeout=5.0) - logger.debug(f"流循环任务已正常结束: {stream_id}") - except asyncio.CancelledError: - logger.debug(f"流循环任务已取消: {stream_id}") - except asyncio.TimeoutError: - logger.warning(f"流循环任务取消超时: {stream_id}") - except Exception as e: - logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") - - async def _force_dispatch_stream(self, stream_id: str) -> None: - """强制分发流处理 - - 当流的未读消息超过阈值时,强制触发分发处理 - 这个方法主要用于突破并发限制时的紧急处理 - - 注意:此方法目前未被使用,相关功能已集成到 start_stream_loop 方法中 - - Args: - stream_id: 流ID - """ - logger.debug(f"强制分发流处理: {stream_id}") - - try: - # 获取流上下文 - context = await self._get_stream_context(stream_id) - if not context: - logger.warning(f"强制分发时未找到流上下文: {stream_id}") - return - - # 检查是否有现有的 stream_loop_task - if context.stream_loop_task and not context.stream_loop_task.done(): - logger.debug(f"发现现有流循环 {stream_id},将先取消再重新创建") - existing_task = context.stream_loop_task - existing_task.cancel() - # 创建异步任务来等待取消完成,并添加异常处理 - cancel_task = asyncio.create_task( - self._wait_for_task_cancel(stream_id, existing_task), name=f"cancel_existing_loop_{stream_id}" - ) - # 为取消任务添加异常处理,避免孤儿任务 - cancel_task.add_done_callback( - lambda task: logger.debug(f"取消任务完成: {stream_id}") - if not task.exception() - else logger.error(f"取消任务异常: {stream_id} - {task.exception()}") - ) - - # 检查未读消息数量 - unread_count = self._get_unread_count(context) - logger.info(f"流 {stream_id} 当前未读消息数: {unread_count}") - - # 使用 start_stream_loop 重新创建流循环任务 - success = await self.start_stream_loop(stream_id, force=True) - - if success: - logger.info(f"已创建强制分发流循环: {stream_id}") - else: - logger.warning(f"创建强制分发流循环失败: {stream_id}") - - except Exception as e: - logger.error(f"强制分发流处理失败 {stream_id}: {e}", exc_info=True) - - -# 全局流循环管理器实例 -stream_loop_manager = StreamLoopManager() diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 762c6d164..2a3082b0f 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -18,8 +18,8 @@ from src.common.logger import get_logger from src.config.config import global_config from src.plugin_system.apis.chat_api import get_chat_manager -from .distribution_manager import stream_loop_manager from .global_notice_manager import NoticeScope, global_notice_manager +from .scheduler_dispatcher import scheduler_dispatcher if TYPE_CHECKING: pass @@ -74,11 +74,16 @@ class MessageManager: # 启动消息缓存系统(内置) logger.debug("消息缓存系统已启动") - # 启动流循环管理器并设置chatter_manager - await stream_loop_manager.start() - stream_loop_manager.set_chatter_manager(self.chatter_manager) + # 启动基于 scheduler 的消息分发器 + await scheduler_dispatcher.start() + scheduler_dispatcher.set_chatter_manager(self.chatter_manager) + + # 保留旧的流循环管理器(暂时)以便平滑过渡 + # TODO: 在确认新机制稳定后移除 + # await stream_loop_manager.start() + # stream_loop_manager.set_chatter_manager(self.chatter_manager) - logger.info("消息管理器已启动") + logger.info("消息管理器已启动(使用 Scheduler 分发器)") async def stop(self): """停止消息管理器""" @@ -101,13 +106,22 @@ class MessageManager: self.stream_processing_status.clear() logger.debug("消息缓存系统已停止") - # 停止流循环管理器 - await stream_loop_manager.stop() + # 停止基于 scheduler 的消息分发器 + await scheduler_dispatcher.stop() + + # 停止旧的流循环管理器(如果启用) + # await stream_loop_manager.stop() logger.info("消息管理器已停止") async def add_message(self, stream_id: str, message: DatabaseMessages): - """添加消息到指定聊天流""" + """添加消息到指定聊天流 + + 新的流程: + 1. 检查 notice 消息 + 2. 将消息添加到上下文(缓存) + 3. 通知 scheduler_dispatcher 处理(检查打断、创建/更新 schedule) + """ try: # 检查是否为notice消息 @@ -130,8 +144,13 @@ class MessageManager: if not chat_stream: logger.warning(f"MessageManager.add_message: 聊天流 {stream_id} 不存在") return - await self._check_and_handle_interruption(chat_stream, message) + + # 将消息添加到上下文 await chat_stream.context_manager.add_message(message) + + # 通知 scheduler_dispatcher 处理消息接收事件 + # dispatcher 会检查是否需要打断、创建或更新 schedule + await scheduler_dispatcher.on_message_received(stream_id) except Exception as e: logger.error(f"添加消息到聊天流 {stream_id} 时发生错误: {e}") @@ -299,122 +318,9 @@ class MessageManager: except Exception as e: logger.error(f"清理不活跃聊天流时发生错误: {e}") - async def _check_and_handle_interruption(self, chat_stream: ChatStream | None = None, message: DatabaseMessages | None = None): - """检查并处理消息打断 - 通过取消 stream_loop_task 实现""" - if not global_config.chat.interruption_enabled or not chat_stream or not message: - return - - # 检查是否正在回复,以及是否允许在回复时打断 - if chat_stream.context_manager.context.is_replying: - if not global_config.chat.allow_reply_interruption: - logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复中,且配置不允许回复时打断,跳过打断检查") - return - else: - logger.debug(f"聊天流 {chat_stream.stream_id} 正在回复中,但配置允许回复时打断") - - # 检查是否为表情包消息 - if message.is_picid or message.is_emoji: - logger.info(f"消息 {message.message_id} 是表情包或Emoji,跳过打断检查") - return - - # 检查上下文 - context = chat_stream.context_manager.context - - # 只有当 Chatter 真正在处理时才检查打断 - if not context.is_chatter_processing: - logger.debug(f"聊天流 {chat_stream.stream_id} Chatter 未在处理,跳过打断检查") - return - - # 检查是否有 stream_loop_task 在运行 - stream_loop_task = context.stream_loop_task - - if stream_loop_task and not stream_loop_task.done(): - # 检查触发用户ID - triggering_user_id = context.triggering_user_id - if triggering_user_id and message.user_info.user_id != triggering_user_id: - logger.info(f"消息来自非触发用户 {message.user_info.user_id},实际触发用户为 {triggering_user_id},跳过打断检查") - return - - # 计算打断概率 - interruption_probability = context.calculate_interruption_probability( - global_config.chat.interruption_max_limit - ) - - # 检查是否已达到最大打断次数 - if context.interruption_count >= global_config.chat.interruption_max_limit: - logger.debug( - f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},跳过打断检查" - ) - return - - # 根据概率决定是否打断 - if random.random() < interruption_probability: - logger.info(f"聊天流 {chat_stream.stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}") - - # 取消 stream_loop_task,子任务会通过 try-catch 自动取消 - try: - stream_loop_task.cancel() - - # 等待任务真正结束(设置超时避免死锁) - try: - await asyncio.wait_for(stream_loop_task, timeout=2.0) - logger.info(f"流循环任务已完全结束: {chat_stream.stream_id}") - except asyncio.TimeoutError: - logger.warning(f"等待流循环任务结束超时: {chat_stream.stream_id}") - except asyncio.CancelledError: - logger.info(f"流循环任务已被取消: {chat_stream.stream_id}") - except Exception as e: - logger.warning(f"取消流循环任务失败: {chat_stream.stream_id} - {e}") - - # 增加打断计数 - await context.increment_interruption_count() - - # 打断后重新创建 stream_loop 任务 - await self._trigger_reprocess(chat_stream) - - # 检查是否已达到最大次数 - if context.interruption_count >= global_config.chat.interruption_max_limit: - logger.warning( - f"聊天流 {chat_stream.stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断" - ) - else: - logger.info( - f"聊天流 {chat_stream.stream_id} 已打断并重新进入处理流程,当前打断次数: {context.interruption_count}/{global_config.chat.interruption_max_limit}" - ) - else: - logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f}") - - async def _trigger_reprocess(self, chat_stream: ChatStream): - """重新处理聊天流的核心逻辑 - 重新创建 stream_loop 任务""" - try: - stream_id = chat_stream.stream_id - - logger.info(f"🚀 打断后重新创建流循环任务: {stream_id}") - - # 等待一小段时间确保当前消息已经添加到未读消息中 - await asyncio.sleep(0.1) - - # 获取当前的stream context - context = chat_stream.context_manager.context - - # 确保有未读消息需要处理 - unread_messages = context.get_unread_messages() - if not unread_messages: - logger.debug(f"聊天流 {stream_id} 没有未读消息,跳过重新处理") - return - - logger.debug(f"准备重新处理 {len(unread_messages)} 条未读消息: {stream_id}") - - # 重新创建 stream_loop 任务 - success = await stream_loop_manager.start_stream_loop(stream_id, force=True) - - if success: - logger.debug(f"成功重新创建流循环任务: {stream_id}") - else: - logger.warning(f"重新创建流循环任务失败: {stream_id}") - - except Exception as e: - logger.error(f"触发重新处理时出错: {e}") + # === 已废弃的方法已移除 === + # _check_and_handle_interruption 和 _trigger_reprocess 已由 scheduler_dispatcher 接管 + # 如需查看历史代码,请参考 git 历史记录 async def clear_all_unread_messages(self, stream_id: str): """清除指定上下文中的所有未读消息,在消息处理完成后调用""" diff --git a/src/chat/message_manager/scheduler_dispatcher.py b/src/chat/message_manager/scheduler_dispatcher.py new file mode 100644 index 000000000..dec8ee940 --- /dev/null +++ b/src/chat/message_manager/scheduler_dispatcher.py @@ -0,0 +1,534 @@ +""" +基于 unified_scheduler 的消息分发管理器 +替代原有的 stream_loop_task 循环机制,使用统一的调度器来管理消息处理时机 +""" + +import asyncio +import time +from typing import Any + +from src.chat.chatter_manager import ChatterManager +from src.chat.energy_system import energy_manager +from src.common.data_models.message_manager_data_model import StreamContext +from src.common.logger import get_logger +from src.config.config import global_config +from src.plugin_system.apis.chat_api import get_chat_manager +from src.schedule.unified_scheduler import TriggerType, unified_scheduler + +logger = get_logger("scheduler_dispatcher") + + +class SchedulerDispatcher: + """基于 scheduler 的消息分发器 + + 工作流程: + 1. 接收消息时,将消息添加到聊天流上下文 + 2. 检查是否有活跃的 schedule,如果没有则创建 + 3. 如果有,检查打断判定,成功则移除旧 schedule 并创建新的 + 4. schedule 到期时,激活 chatter 处理 + 5. 处理完成后,计算下次间隔并注册新 schedule + """ + + def __init__(self): + # 追踪每个流的 schedule_id + self.stream_schedules: dict[str, str] = {} # stream_id -> schedule_id + + # Chatter 管理器 + self.chatter_manager: ChatterManager | None = None + + # 统计信息 + self.stats = { + "total_schedules_created": 0, + "total_schedules_cancelled": 0, + "total_interruptions": 0, + "total_process_cycles": 0, + "total_failures": 0, + "start_time": time.time(), + } + + self.is_running = False + + logger.info("基于 Scheduler 的消息分发器初始化完成") + + async def start(self) -> None: + """启动分发器""" + if self.is_running: + logger.warning("分发器已在运行") + return + + self.is_running = True + logger.info("基于 Scheduler 的消息分发器已启动") + + async def stop(self) -> None: + """停止分发器""" + if not self.is_running: + return + + self.is_running = False + + # 取消所有活跃的 schedule + schedule_ids = list(self.stream_schedules.values()) + for schedule_id in schedule_ids: + try: + await unified_scheduler.remove_schedule(schedule_id) + except Exception as e: + logger.error(f"移除 schedule {schedule_id} 失败: {e}") + + self.stream_schedules.clear() + logger.info("基于 Scheduler 的消息分发器已停止") + + def set_chatter_manager(self, chatter_manager: ChatterManager) -> None: + """设置 Chatter 管理器""" + self.chatter_manager = chatter_manager + logger.debug(f"设置 Chatter 管理器: {chatter_manager.__class__.__name__}") + + async def on_message_received(self, stream_id: str) -> None: + """消息接收时的处理逻辑 + + Args: + stream_id: 聊天流ID + """ + if not self.is_running: + logger.warning("分发器未运行,忽略消息") + return + + try: + # 1. 获取流上下文 + context = await self._get_stream_context(stream_id) + if not context: + logger.warning(f"无法获取流上下文: {stream_id}") + return + + # 2. 检查是否有活跃的 schedule + has_active_schedule = stream_id in self.stream_schedules + + if has_active_schedule: + # 3. 检查打断判定 + should_interrupt = await self._check_interruption(stream_id, context) + + if should_interrupt: + # 移除旧 schedule 并创建新的 + await self._cancel_and_recreate_schedule(stream_id, context) + logger.debug(f"⚡ 打断成功: 流={stream_id[:8]}..., 已重新创建 schedule") + else: + logger.debug(f"打断判定失败,保持原有 schedule: 流={stream_id[:8]}...") + else: + # 4. 创建新的 schedule + await self._create_schedule(stream_id, context) + + except Exception as e: + logger.error(f"处理消息接收事件失败 {stream_id}: {e}", exc_info=True) + + async def _get_stream_context(self, stream_id: str) -> StreamContext | None: + """获取流上下文""" + try: + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(stream_id) + if chat_stream: + return chat_stream.context_manager.context + return None + except Exception as e: + logger.error(f"获取流上下文失败 {stream_id}: {e}") + return None + + async def _check_interruption(self, stream_id: str, context: StreamContext) -> bool: + """检查是否应该打断当前处理 + + Args: + stream_id: 流ID + context: 流上下文 + + Returns: + bool: 是否应该打断 + """ + # 检查是否启用打断 + if not global_config.chat.interruption_enabled: + return False + + # 检查是否正在回复,以及是否允许在回复时打断 + if context.is_replying: + if not global_config.chat.allow_reply_interruption: + logger.debug(f"聊天流 {stream_id} 正在回复中,且配置不允许回复时打断") + return False + else: + logger.debug(f"聊天流 {stream_id} 正在回复中,但配置允许回复时打断") + + # 只有当 Chatter 真正在处理时才检查打断 + if not context.is_chatter_processing: + logger.debug(f"聊天流 {stream_id} Chatter 未在处理,无需打断") + return False + + # 检查最后一条消息 + last_message = context.get_last_message() + if not last_message: + return False + + # 检查是否为表情包消息 + if last_message.is_picid or last_message.is_emoji: + logger.info(f"消息 {last_message.message_id} 是表情包或Emoji,跳过打断检查") + return False + + # 检查触发用户ID + triggering_user_id = context.triggering_user_id + if triggering_user_id and last_message.user_info.user_id != triggering_user_id: + logger.info(f"消息来自非触发用户 {last_message.user_info.user_id},实际触发用户为 {triggering_user_id},跳过打断检查") + return False + + # 检查是否已达到最大打断次数 + if context.interruption_count >= global_config.chat.interruption_max_limit: + logger.debug( + f"聊天流 {stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit}" + ) + return False + + # 计算打断概率 + interruption_probability = context.calculate_interruption_probability( + global_config.chat.interruption_max_limit + ) + + # 根据概率决定是否打断 + import random + if random.random() < interruption_probability: + logger.debug(f"聊天流 {stream_id} 触发消息打断,打断概率: {interruption_probability:.2f}") + + # 增加打断计数 + await context.increment_interruption_count() + self.stats["total_interruptions"] += 1 + + # 检查是否已达到最大次数 + if context.interruption_count >= global_config.chat.interruption_max_limit: + logger.warning( + f"聊天流 {stream_id} 已达到最大打断次数 {context.interruption_count}/{global_config.chat.interruption_max_limit},后续消息将不再打断" + ) + else: + logger.info( + f"聊天流 {stream_id} 已打断,当前打断次数: {context.interruption_count}/{global_config.chat.interruption_max_limit}" + ) + + return True + else: + logger.debug(f"聊天流 {stream_id} 未触发打断,打断概率: {interruption_probability:.2f}") + return False + + async def _cancel_and_recreate_schedule(self, stream_id: str, context: StreamContext) -> None: + """取消旧的 schedule 并创建新的(打断模式,使用极短延迟) + + Args: + stream_id: 流ID + context: 流上下文 + """ + # 移除旧的 schedule + old_schedule_id = self.stream_schedules.get(stream_id) + if old_schedule_id: + success = await unified_scheduler.remove_schedule(old_schedule_id) + if success: + logger.debug(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., ID={old_schedule_id[:8]}...") + self.stats["total_schedules_cancelled"] += 1 + else: + logger.warning(f"移除旧 schedule 失败: {stream_id}") + + # 从追踪中删除 + del self.stream_schedules[stream_id] + + # 创建新的 schedule,使用即时处理模式(极短延迟) + await self._create_schedule(stream_id, context, immediate_mode=True) + + async def _create_schedule(self, stream_id: str, context: StreamContext, immediate_mode: bool = False) -> None: + """为聊天流创建新的 schedule + + Args: + stream_id: 流ID + context: 流上下文 + immediate_mode: 是否使用即时处理模式(打断时使用极短延迟) + """ + try: + # 如果是即时处理模式(打断时),使用固定的1秒延迟立即重新处理 + if immediate_mode: + delay = 1.0 # 硬编码1秒延迟,确保打断后能快速重新处理 + logger.debug( + f"⚡ 打断模式启用: 流={stream_id[:8]}..., " + f"使用即时延迟={delay:.1f}s 立即重新处理" + ) + else: + # 常规模式:计算初始延迟 + delay = await self._calculate_initial_delay(stream_id, context) + + # 获取未读消息数量用于日志 + unread_count = len(context.unread_messages) if context.unread_messages else 0 + + # 创建 schedule + schedule_id = await unified_scheduler.create_schedule( + callback=self._on_schedule_triggered, + trigger_type=TriggerType.TIME, + trigger_config={"delay_seconds": delay}, + is_recurring=False, # 一次性任务,处理完后会创建新的 + task_name=f"dispatch_{stream_id[:8]}", + callback_args=(stream_id,), + ) + + # 追踪 schedule + self.stream_schedules[stream_id] = schedule_id + self.stats["total_schedules_created"] += 1 + + mode_indicator = "⚡打断" if immediate_mode else "📅常规" + logger.debug( + f"{mode_indicator} 创建 schedule: 流={stream_id[:8]}..., " + f"延迟={delay:.3f}s, 未读={unread_count}, " + f"ID={schedule_id[:8]}..." + ) + + except Exception as e: + logger.error(f"创建 schedule 失败 {stream_id}: {e}", exc_info=True) + + async def _calculate_initial_delay(self, stream_id: str, context: StreamContext) -> float: + """计算初始延迟时间 + + Args: + stream_id: 流ID + context: 流上下文 + + Returns: + float: 延迟时间(秒) + """ + # 基础间隔 + base_interval = getattr(global_config.chat, "distribution_interval", 5.0) + + # 检查是否有未读消息 + unread_count = len(context.unread_messages) if context.unread_messages else 0 + + # 强制分发阈值 + force_dispatch_threshold = getattr(global_config.chat, "force_dispatch_unread_threshold", 20) + + # 如果未读消息过多,使用最小间隔 + if force_dispatch_threshold and unread_count > force_dispatch_threshold: + min_interval = getattr(global_config.chat, "force_dispatch_min_interval", 0.1) + logger.warning( + f"⚠️ 强制分发触发: 流={stream_id[:8]}..., " + f"未读={unread_count} (阈值={force_dispatch_threshold}), " + f"使用最小间隔={min_interval}s" + ) + return min_interval + + # 尝试使用能量管理器计算间隔 + try: + # 更新能量值 + await self._update_stream_energy(stream_id, context) + + # 获取当前 focus_energy + focus_energy = energy_manager.energy_cache.get(stream_id, (0.5, 0))[0] + + # 使用能量管理器计算间隔 + interval = energy_manager.get_distribution_interval(focus_energy) + + logger.info( + f"📊 动态间隔计算: 流={stream_id[:8]}..., " + f"能量={focus_energy:.3f}, 间隔={interval:.2f}s" + ) + return interval + + except Exception as e: + logger.info( + f"📊 使用默认间隔: 流={stream_id[:8]}..., " + f"间隔={base_interval:.2f}s (动态计算失败: {e})" + ) + return base_interval + + async def _update_stream_energy(self, stream_id: str, context: StreamContext) -> None: + """更新流的能量值 + + Args: + stream_id: 流ID + context: 流上下文 + """ + try: + from src.chat.message_receive.chat_stream import get_chat_manager + + # 获取聊天流 + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(stream_id) + + if not chat_stream: + logger.debug(f"无法找到聊天流 {stream_id},跳过能量更新") + return + + # 合并未读消息和历史消息 + all_messages = [] + + # 添加历史消息 + history_messages = context.get_history_messages(limit=global_config.chat.max_context_size) + all_messages.extend(history_messages) + + # 添加未读消息 + unread_messages = context.get_unread_messages() + all_messages.extend(unread_messages) + + # 按时间排序并限制数量 + all_messages.sort(key=lambda m: m.time) + messages = all_messages[-global_config.chat.max_context_size:] + + # 获取用户ID + user_id = context.triggering_user_id + + # 使用能量管理器计算并缓存能量值 + energy = await energy_manager.calculate_focus_energy( + stream_id=stream_id, + messages=messages, + user_id=user_id + ) + + # 同步更新到 ChatStream + chat_stream._focus_energy = energy + + logger.debug(f"已更新流 {stream_id} 的能量值: {energy:.3f}") + + except Exception as e: + logger.warning(f"更新流能量失败 {stream_id}: {e}", exc_info=False) + + async def _on_schedule_triggered(self, stream_id: str) -> None: + """schedule 触发时的回调 + + Args: + stream_id: 流ID + """ + try: + logger.info(f"⏰ Schedule 触发: 流={stream_id[:8]}..., 开始处理消息") + + # 从追踪中移除(因为是一次性任务) + self.stream_schedules.pop(stream_id, None) + + # 获取流上下文 + context = await self._get_stream_context(stream_id) + if not context: + logger.warning(f"Schedule 触发时无法获取流上下文: {stream_id}") + return + + # 检查是否有未读消息 + if not context.unread_messages: + logger.debug(f"流 {stream_id} 没有未读消息,跳过处理") + return + + # 激活 chatter 处理 + success = await self._process_stream(stream_id, context) + + # 更新统计 + self.stats["total_process_cycles"] += 1 + if not success: + self.stats["total_failures"] += 1 + + # 处理完成后,创建新的 schedule + await self._create_schedule(stream_id, context) + + except Exception as e: + logger.error(f"Schedule 回调执行失败 {stream_id}: {e}", exc_info=True) + + async def _process_stream(self, stream_id: str, context: StreamContext) -> bool: + """处理流消息 + + Args: + stream_id: 流ID + context: 流上下文 + + Returns: + bool: 是否处理成功 + """ + if not self.chatter_manager: + logger.warning(f"Chatter 管理器未设置: {stream_id}") + return False + + # 设置处理状态 + self._set_stream_processing_status(stream_id, True) + + try: + start_time = time.time() + + # 设置触发用户ID + last_message = context.get_last_message() + if last_message: + context.triggering_user_id = last_message.user_info.user_id + + # 创建异步任务刷新能量(不阻塞主流程) + energy_task = asyncio.create_task(self._refresh_focus_energy(stream_id)) + + # 设置 Chatter 正在处理的标志 + context.is_chatter_processing = True + logger.debug(f"设置 Chatter 处理标志: {stream_id}") + + try: + # 调用 chatter_manager 处理流上下文 + results = await self.chatter_manager.process_stream_context(stream_id, context) + success = results.get("success", False) + + if success: + process_time = time.time() - start_time + logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)") + else: + logger.warning(f"流处理失败: {stream_id} - {results.get('error_message', '未知错误')}") + + return success + + finally: + # 清除 Chatter 处理标志 + context.is_chatter_processing = False + logger.debug(f"清除 Chatter 处理标志: {stream_id}") + + # 等待能量刷新任务完成 + try: + await asyncio.wait_for(energy_task, timeout=5.0) + except asyncio.TimeoutError: + logger.warning(f"等待能量刷新超时: {stream_id}") + except Exception as e: + logger.debug(f"能量刷新任务异常: {e}") + + except Exception as e: + logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True) + return False + + finally: + # 设置处理状态为未处理 + self._set_stream_processing_status(stream_id, False) + + def _set_stream_processing_status(self, stream_id: str, is_processing: bool) -> None: + """设置流的处理状态""" + try: + from src.chat.message_manager.message_manager import message_manager + + if message_manager.is_running: + message_manager.set_stream_processing_status(stream_id, is_processing) + logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}") + + except ImportError: + logger.debug("MessageManager 不可用,跳过状态设置") + except Exception as e: + logger.warning(f"设置流处理状态失败: stream={stream_id}, error={e}") + + async def _refresh_focus_energy(self, stream_id: str) -> None: + """分发完成后刷新能量值""" + try: + chat_manager = get_chat_manager() + chat_stream = await chat_manager.get_stream(stream_id) + if not chat_stream: + logger.debug(f"刷新能量时未找到聊天流: {stream_id}") + return + + await chat_stream.context_manager.refresh_focus_energy_from_history() + logger.debug(f"已刷新聊天流 {stream_id} 的聚焦能量") + except Exception as e: + logger.warning(f"刷新聊天流 {stream_id} 能量失败: {e}") + + def get_statistics(self) -> dict[str, Any]: + """获取统计信息""" + uptime = time.time() - self.stats["start_time"] + return { + "is_running": self.is_running, + "active_schedules": len(self.stream_schedules), + "total_schedules_created": self.stats["total_schedules_created"], + "total_schedules_cancelled": self.stats["total_schedules_cancelled"], + "total_interruptions": self.stats["total_interruptions"], + "total_process_cycles": self.stats["total_process_cycles"], + "total_failures": self.stats["total_failures"], + "uptime": uptime, + } + + +# 全局实例 +scheduler_dispatcher = SchedulerDispatcher() diff --git a/src/chat/planner_actions/action_manager.py b/src/chat/planner_actions/action_manager.py index f3d058efb..4c2cada69 100644 --- a/src/chat/planner_actions/action_manager.py +++ b/src/chat/planner_actions/action_manager.py @@ -204,20 +204,19 @@ class ChatterActionManager: action_prompt_display=reason, ) else: - asyncio.create_task( # noqa: RUF006 - database_api.store_action_info( - chat_stream=chat_stream, - action_build_into_prompt=False, - action_prompt_display=reason, - action_done=True, - thinking_id=thinking_id or "", - action_data={"reason": reason}, - action_name="no_reply", - ) + # 改为同步等待,确保存储完成 + await database_api.store_action_info( + chat_stream=chat_stream, + action_build_into_prompt=False, + action_prompt_display=reason, + action_done=True, + thinking_id=thinking_id or "", + action_data={"reason": reason}, + action_name="no_reply", ) - # 自动清空所有未读消息 - asyncio.create_task(self._clear_all_unread_messages(chat_stream.stream_id, "no_reply")) # noqa: RUF006 + # 自动清空所有未读消息(改为同步等待) + await self._clear_all_unread_messages(chat_stream.stream_id, "no_reply") return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""} @@ -233,16 +232,14 @@ class ChatterActionManager: target_message, ) - # 记录执行的动作到目标消息 + # 记录执行的动作到目标消息(改为同步等待) if success: - asyncio.create_task( # noqa: RUF006 - self._record_action_to_message(chat_stream, action_name, target_message, action_data) - ) + await self._record_action_to_message(chat_stream, action_name, target_message, action_data) # 自动清空所有未读消息 if clear_unread_messages: - asyncio.create_task(self._clear_all_unread_messages(chat_stream.stream_id, action_name)) # noqa: RUF006 + await self._clear_all_unread_messages(chat_stream.stream_id, action_name) # 重置打断计数 - asyncio.create_task(self._reset_interruption_count_after_action(chat_stream.stream_id)) # noqa: RUF006 + await self._reset_interruption_count_after_action(chat_stream.stream_id) return { "action_type": action_name, @@ -292,14 +289,14 @@ class ChatterActionManager: should_quote_reply, # 传递should_quote_reply参数 ) - # 记录回复动作到目标消息 - asyncio.create_task(self._record_action_to_message(chat_stream, "reply", target_message, action_data)) # noqa: RUF006 + # 记录回复动作到目标消息(改为同步等待) + await self._record_action_to_message(chat_stream, "reply", target_message, action_data) if clear_unread_messages: - asyncio.create_task(self._clear_all_unread_messages(chat_stream.stream_id, "reply")) # noqa: RUF006 + await self._clear_all_unread_messages(chat_stream.stream_id, "reply") - # 回复成功,重置打断计数 - asyncio.create_task(self._reset_interruption_count_after_action(chat_stream.stream_id)) # noqa: RUF006 + # 回复成功,重置打断计数(改为同步等待) + await self._reset_interruption_count_after_action(chat_stream.stream_id) return {"action_type": "reply", "success": True, "reply_text": reply_text, "loop_info": loop_info} diff --git a/src/schedule/unified_scheduler.py b/src/schedule/unified_scheduler.py index d8d9ccc59..cae135d02 100644 --- a/src/schedule/unified_scheduler.py +++ b/src/schedule/unified_scheduler.py @@ -81,6 +81,7 @@ class UnifiedScheduler: self._check_task: asyncio.Task | None = None self._lock = asyncio.Lock() self._event_subscriptions: set[str] = set() # 追踪已订阅的事件 + self._executing_tasks: dict[str, asyncio.Task] = {} # 追踪正在执行的任务 async def _handle_event_trigger(self, event_name: str | EventType, event_params: dict[str, Any]) -> None: """处理来自 event_manager 的事件通知 @@ -182,9 +183,20 @@ class UnifiedScheduler: except ImportError: pass + # 取消所有正在执行的任务 + executing_tasks = list(self._executing_tasks.values()) + if executing_tasks: + logger.debug(f"取消 {len(executing_tasks)} 个正在执行的任务") + for task in executing_tasks: + if not task.done(): + task.cancel() + # 等待所有任务取消完成 + await asyncio.gather(*executing_tasks, return_exceptions=True) + logger.info("统一调度器已停止") self._tasks.clear() self._event_subscriptions.clear() + self._executing_tasks.clear() async def _check_loop(self): """主循环:每秒检查一次所有任务""" @@ -202,7 +214,7 @@ class UnifiedScheduler: async def _check_and_trigger_tasks(self): """检查并触发到期任务 - 注意:为了避免死锁,回调执行必须在锁外进行 + 注意:为了避免死锁和阻塞,回调执行必须在锁外并且并发进行 """ current_time = datetime.now() @@ -221,34 +233,71 @@ class UnifiedScheduler: except Exception as e: logger.error(f"检查任务 {task.task_name} 时发生错误: {e}", exc_info=True) - # 第二阶段:在锁外执行回调(避免死锁) - tasks_to_remove = [] + # 第二阶段:在锁外并发执行所有回调(避免死锁和阻塞) + if not tasks_to_trigger: + return + # 为每个任务创建独立的异步任务,确保并发执行 + execution_tasks = [] for task in tasks_to_trigger: - try: - logger.debug(f"[调度器] 触发定时任务: {task.task_name}") + execution_task = asyncio.create_task( + self._execute_task_callback(task, current_time), + name=f"execute_{task.task_name}" + ) + execution_tasks.append(execution_task) + + # 追踪正在执行的任务,以便在 remove_schedule 时可以取消 + self._executing_tasks[task.schedule_id] = execution_task - # 执行回调 - await self._execute_callback(task) + # 等待所有任务完成(使用 return_exceptions=True 避免单个任务失败影响其他任务) + results = await asyncio.gather(*execution_tasks, return_exceptions=True) + + # 清理执行追踪 + for task in tasks_to_trigger: + self._executing_tasks.pop(task.schedule_id, None) - # 更新任务状态 - task.last_triggered_at = current_time - task.trigger_count += 1 + # 第三阶段:收集需要移除的任务并在锁内移除 + tasks_to_remove = [] + for task, result in zip(tasks_to_trigger, results): + if isinstance(result, Exception): + logger.error(f"[调度器] 执行任务 {task.task_name} 时发生错误: {result}", exc_info=result) + elif result is True and not task.is_recurring: + # 成功执行且是一次性任务,标记为删除 + tasks_to_remove.append(task.schedule_id) + logger.debug(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除") - # 如果不是循环任务,标记为删除 - if not task.is_recurring: - tasks_to_remove.append(task.schedule_id) - logger.debug(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除") - - except Exception as e: - logger.error(f"[调度器] 执行任务 {task.task_name} 时发生错误: {e}", exc_info=True) - - # 第三阶段:在锁内移除已完成的任务 if tasks_to_remove: async with self._lock: for schedule_id in tasks_to_remove: await self._remove_task_internal(schedule_id) + async def _execute_task_callback(self, task: ScheduleTask, current_time: datetime) -> bool: + """执行单个任务的回调(用于并发执行) + + Args: + task: 要执行的任务 + current_time: 当前时间 + + Returns: + bool: 执行是否成功 + """ + try: + logger.debug(f"[调度器] 触发任务: {task.task_name}") + + # 执行回调 + await self._execute_callback(task) + + # 更新任务状态 + task.last_triggered_at = current_time + task.trigger_count += 1 + + logger.debug(f"[调度器] 任务 {task.task_name} 执行完成") + return True + + except Exception as e: + logger.error(f"[调度器] 执行任务 {task.task_name} 时发生错误: {e}", exc_info=True) + return False + async def _should_trigger_task(self, task: ScheduleTask, current_time: datetime) -> bool: """判断任务是否应该触发""" if task.trigger_type == TriggerType.TIME: @@ -375,13 +424,25 @@ class UnifiedScheduler: return schedule_id async def remove_schedule(self, schedule_id: str) -> bool: - """移除调度任务""" + """移除调度任务 + + 如果任务正在执行,会取消执行中的任务 + """ async with self._lock: if schedule_id not in self._tasks: logger.warning(f"尝试移除不存在的任务: {schedule_id}") return False task = self._tasks[schedule_id] + + # 检查是否有正在执行的任务 + executing_task = self._executing_tasks.get(schedule_id) + if executing_task and not executing_task.done(): + logger.debug(f"取消正在执行的任务: {task.task_name}") + executing_task.cancel() + # 不需要等待,让它在后台取消 + self._executing_tasks.pop(schedule_id, None) + await self._remove_task_internal(schedule_id) logger.debug(f"移除调度任务: {task.task_name}") return True