From 2e7b434537975415191c190fb4d778444f088631 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Thu, 4 Dec 2025 22:40:12 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BD=BF=E7=94=A8=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E7=94=9F=E6=88=90=E5=99=A8=E8=BF=81=E7=A7=BB=E5=88=B0?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E9=A9=B1=E5=8A=A8=E6=A8=A1=E5=9E=8B=E4=BB=A5?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E8=81=8A=E5=A4=A9=E6=B5=81=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 用异步生成器替换了无限循环任务,以处理聊天流事件。 引入了`ConversationTick`数据模型来表示会话事件。 - 更新了`StreamLoopManager`,以使用新的基于生成器的方法来管理聊天流。 - 在聊天流处理过程中增强了错误处理和日志记录功能。 - 改进了聊天流的生命周期管理,包括启动和停止方法。 - 删除了与之前的循环工作线程实现相关的遗留代码。 --- src/chat/message_manager/__init__.py | 13 +- .../message_manager/distribution_manager.py | 919 ++++++++---------- 2 files changed, 415 insertions(+), 517 deletions(-) diff --git a/src/chat/message_manager/__init__.py b/src/chat/message_manager/__init__.py index 7b67424f9..07b4e5795 100644 --- a/src/chat/message_manager/__init__.py +++ b/src/chat/message_manager/__init__.py @@ -1,14 +1,25 @@ """ 消息管理器模块 提供统一的消息管理、上下文管理和流循环调度功能 + +基于 Generator + Tick 的事件驱动模式 """ -from .distribution_manager import StreamLoopManager, stream_loop_manager +from .distribution_manager import ( + ConversationTick, + StreamLoopManager, + conversation_loop, + run_chat_stream, + stream_loop_manager, +) from .message_manager import MessageManager, message_manager __all__ = [ + "ConversationTick", "MessageManager", "StreamLoopManager", + "conversation_loop", "message_manager", + "run_chat_stream", "stream_loop_manager", ] diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index 90c9929d1..be774081d 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -1,11 +1,18 @@ """ -流循环管理器 -为每个聊天流创建独立的无限循环任务,主动轮询处理消息 +流循环管理器 - 基于 Generator + Tick 的事件驱动模式 + +采用异步生成器替代无限循环任务,实现更简洁可控的消息处理流程。 + +核心概念: +- ConversationTick: 表示一次待处理的会话事件 +- conversation_loop: 异步生成器,按需产出 Tick 事件 +- run_chat_stream: 驱动器,消费 Tick 并调用 Chatter """ import asyncio import time -from typing import TYPE_CHECKING, Any +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, AsyncIterator, Callable, Awaitable from src.chat.chatter_manager import ChatterManager from src.chat.energy_system import energy_manager @@ -14,13 +21,221 @@ from src.config.config import global_config from src.chat.message_receive.chat_stream import get_chat_manager if TYPE_CHECKING: + from src.chat.message_receive.chat_stream import ChatStream from src.common.data_models.message_manager_data_model import StreamContext logger = get_logger("stream_loop_manager") +# ============================================================================ +# Tick 数据模型 +# ============================================================================ + + +@dataclass +class ConversationTick: + """ + 会话事件标记 - 表示一次待处理的会话事件 + + 这是一个轻量级的事件信号,不存储消息数据。 + 未读消息由 StreamContext 管理,能量值由 energy_manager 管理。 + """ + stream_id: str + tick_time: float = field(default_factory=time.time) + force_dispatch: bool = False # 是否为强制分发(未读消息超阈值) + tick_count: int = 0 # 当前流的 tick 计数 + + +# ============================================================================ +# 异步生成器 - 核心循环逻辑 +# ============================================================================ + + +async def conversation_loop( + stream_id: str, + get_context_func: Callable[[str], Awaitable["StreamContext | None"]], + calculate_interval_func: Callable[[str, bool], Awaitable[float]], + flush_cache_func: Callable[[str], Awaitable[None]], + check_force_dispatch_func: Callable[["StreamContext", int], bool], + is_running_func: Callable[[], bool], +) -> AsyncIterator[ConversationTick]: + """ + 会话循环生成器 - 按需产出 Tick 事件 + + 替代原有的无限循环任务,改为事件驱动的生成器模式。 + 只有调用 __anext__() 时才会执行,完全由消费者控制节奏。 + + Args: + stream_id: 流ID + get_context_func: 获取 StreamContext 的异步函数 + calculate_interval_func: 计算等待间隔的异步函数 + flush_cache_func: 刷新缓存消息的异步函数 + check_force_dispatch_func: 检查是否需要强制分发的函数 + is_running_func: 检查是否继续运行的函数 + + Yields: + ConversationTick: 会话事件 + """ + tick_count = 0 + last_interval = None + + while is_running_func(): + try: + # 1. 获取流上下文 + context = await get_context_func(stream_id) + if not context: + logger.warning(f" [生成器] stream={stream_id[:8]}, 无法获取流上下文") + await asyncio.sleep(10.0) + continue + + # 2. 刷新缓存消息到未读列表 + await flush_cache_func(stream_id) + + # 3. 检查是否有消息需要处理 + unread_messages = context.get_unread_messages() + unread_count = len(unread_messages) if unread_messages else 0 + + # 4. 检查是否需要强制分发 + force_dispatch = check_force_dispatch_func(context, unread_count) + + # 5. 如果有消息,产出 Tick + if unread_count > 0 or force_dispatch: + tick_count += 1 + yield ConversationTick( + stream_id=stream_id, + force_dispatch=force_dispatch, + tick_count=tick_count, + ) + + # 6. 计算并等待下次检查间隔 + has_messages = unread_count > 0 + interval = await calculate_interval_func(stream_id, has_messages) + + # 只在间隔发生变化时输出日志 + if last_interval is None or abs(interval - last_interval) > 0.01: + logger.debug(f"[生成器] stream={stream_id[:8]}, 等待间隔: {interval:.2f}s") + last_interval = interval + + await asyncio.sleep(interval) + + except asyncio.CancelledError: + logger.info(f" [生成器] stream={stream_id[:8]}, 被取消") + break + except Exception as e: + logger.error(f" [生成器] stream={stream_id[:8]}, 出错: {e}") + await asyncio.sleep(5.0) + + +# ============================================================================ +# 聊天流驱动器 +# ============================================================================ + + +async def run_chat_stream( + stream_id: str, + manager: "StreamLoopManager", +) -> None: + """ + 聊天流驱动器 - 消费 Tick 事件并调用 Chatter + + 替代原有的 _stream_loop_worker,结构更清晰。 + + Args: + stream_id: 流ID + manager: StreamLoopManager 实例 + """ + task_id = id(asyncio.current_task()) + logger.debug(f" [驱动器] stream={stream_id[:8]}, 任务ID={task_id}, 启动") + + try: + # 创建生成器 + tick_generator = conversation_loop( + stream_id=stream_id, + get_context_func=manager._get_stream_context, + calculate_interval_func=manager._calculate_interval, + flush_cache_func=manager._flush_cached_messages_to_unread, + check_force_dispatch_func=manager._needs_force_dispatch_for_context, + is_running_func=lambda: manager.is_running, + ) + + # 消费 Tick 事件 + async for tick in tick_generator: + try: + # 获取上下文 + context = await manager._get_stream_context(stream_id) + if not context: + continue + + # 并发保护:检查是否正在处理 + if context.is_chatter_processing: + if manager._recover_stale_chatter_state(stream_id, context): + logger.warning(f" [驱动器] stream={stream_id[:8]}, 处理标志残留已修复") + else: + logger.debug(f" [驱动器] stream={stream_id[:8]}, Chatter正在处理,跳过此Tick") + continue + + # 日志 + if tick.force_dispatch: + logger.info(f" [驱动器] stream={stream_id[:8]}, Tick#{tick.tick_count}, 强制分发") + else: + logger.debug(f" [驱动器] stream={stream_id[:8]}, Tick#{tick.tick_count}, 开始处理") + + # 更新能量值 + try: + await manager._update_stream_energy(stream_id, context) + except Exception as e: + logger.debug(f"更新能量失败: {e}") + + # 处理消息 + assert global_config is not None + try: + success = await asyncio.wait_for( + manager._process_stream_messages(stream_id, context), + global_config.chat.thinking_timeout + ) + except asyncio.TimeoutError: + logger.warning(f" [驱动器] stream={stream_id[:8]}, Tick#{tick.tick_count}, 处理超时") + success = False + + # 更新统计 + manager.stats["total_process_cycles"] += 1 + if success: + logger.debug(f" [驱动器] stream={stream_id[:8]}, Tick#{tick.tick_count}, 处理成功") + await asyncio.sleep(0.1) # 等待清理操作完成 + else: + manager.stats["total_failures"] += 1 + logger.debug(f" [驱动器] stream={stream_id[:8]}, Tick#{tick.tick_count}, 处理失败") + + except asyncio.CancelledError: + raise + except Exception as e: + logger.error(f" [驱动器] stream={stream_id[:8]}, 处理Tick时出错: {e}") + manager.stats["total_failures"] += 1 + + except asyncio.CancelledError: + logger.info(f" [驱动器] stream={stream_id[:8]}, 任务ID={task_id}, 被取消") + finally: + # 清理任务记录 + try: + context = await manager._get_stream_context(stream_id) + if context and context.stream_loop_task: + context.stream_loop_task = None + logger.debug(f" [驱动器] stream={stream_id[:8]}, 清理任务记录") + except Exception as e: + logger.debug(f"清理任务记录失败: {e}") + + +# ============================================================================ +# 流循环管理器 +# ============================================================================ + + class StreamLoopManager: - """流循环管理器 - 每个流一个独立的无限循环任务""" + """ + 流循环管理器 - 基于 Generator + Tick 的事件驱动模式 + + 管理所有聊天流的生命周期,为每个流创建独立的驱动器任务。 + """ def __init__(self, max_concurrent_streams: int | None = None): if global_config is None: @@ -50,21 +265,22 @@ class StreamLoopManager: # 状态控制 self.is_running = False - # 每个流的上一次间隔值(用于日志去重) - self._last_intervals: dict[str, float] = {} - - # 流循环启动锁:防止并发启动同一个流的多个循环任务 + # 流启动锁:防止并发启动同一个流的多个任务 self._stream_start_locks: dict[str, asyncio.Lock] = {} logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})") + # ======================================================================== + # 生命周期管理 + # ======================================================================== + async def start(self) -> None: """启动流循环管理器""" if self.is_running: logger.warning("流循环管理器已经在运行") return - self.is_running = True + logger.info("流循环管理器已启动") async def stop(self) -> None: """停止流循环管理器""" @@ -75,13 +291,9 @@ class StreamLoopManager: # 取消所有流循环 try: - # 获取所有活跃的流 - from src.plugin_system.apis.chat_api import get_chat_manager - chat_manager = get_chat_manager() all_streams = chat_manager.get_all_streams() - # 创建任务列表以便并发取消 cancel_tasks = [] for chat_stream in all_streams.values(): context = chat_stream.context @@ -89,7 +301,6 @@ class StreamLoopManager: context.stream_loop_task.cancel() cancel_tasks.append((chat_stream.stream_id, context.stream_loop_task)) - # 并发等待所有任务取消 if cancel_tasks: logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...") await asyncio.gather( @@ -103,13 +314,18 @@ class StreamLoopManager: logger.info("流循环管理器已停止") - async def start_stream_loop(self, stream_id: str, force: bool = False) -> bool: - """启动指定流的循环任务 - 优化版本使用自适应管理器 + # ======================================================================== + # 流循环控制 + # ======================================================================== + async def start_stream_loop(self, stream_id: str, force: bool = False) -> bool: + """ + 启动指定流的驱动器任务 + Args: stream_id: 流ID - force: 是否强制启动 - + force: 是否强制启动(会先取消现有任务) + Returns: bool: 是否成功启动 """ @@ -119,207 +335,83 @@ class StreamLoopManager: logger.warning(f"无法获取流上下文: {stream_id}") return False - # 快速路径:如果流已存在且不是强制启动,无需处理 + # 快速路径:如果流已存在且不是强制启动 if not force and context.stream_loop_task and not context.stream_loop_task.done(): - logger.debug(f"🔄 [流循环] stream={stream_id[:8]}, 循环已在运行,跳过启动") + logger.debug(f" [管理器] stream={stream_id[:8]}, 任务已在运行") return True - # 获取或创建该流的启动锁 + # 获取或创建启动锁 if stream_id not in self._stream_start_locks: self._stream_start_locks[stream_id] = asyncio.Lock() - lock = self._stream_start_locks[stream_id] - # 使用锁防止并发启动同一个流的多个循环任务 async with lock: - # 如果是强制启动且任务仍在运行,先取消旧任务 + # 强制启动时先取消旧任务 if force and context.stream_loop_task and not context.stream_loop_task.done(): - logger.warning(f"⚠️ [流循环] stream={stream_id[:8]}, 强制启动模式:先取消现有任务") + logger.warning(f" [管理器] stream={stream_id[:8]}, 强制启动:取消现有任务") old_task = context.stream_loop_task old_task.cancel() try: await asyncio.wait_for(old_task, timeout=2.0) - logger.debug(f"✅ [流循环] stream={stream_id[:8]}, 旧任务已结束") except (asyncio.TimeoutError, asyncio.CancelledError): - logger.debug(f"⏱️ [流循环] stream={stream_id[:8]}, 旧任务已取消或超时") + pass except Exception as e: - logger.warning(f"❌ [流循环] stream={stream_id[:8]}, 等待旧任务结束时出错: {e}") + logger.warning(f"等待旧任务结束时出错: {e}") - # 创建流循环任务 + # 创建新的驱动器任务 try: - # 检查是否有旧任务残留 - if context.stream_loop_task and not context.stream_loop_task.done(): - logger.error(f"🚨 [流循环] stream={stream_id[:8]}, 错误:旧任务仍在运行!这不应该发生!") - # 紧急取消 - context.stream_loop_task.cancel() - await asyncio.sleep(0.1) - - loop_task = asyncio.create_task(self._stream_loop_worker(stream_id), name=f"stream_loop_{stream_id}") - - # 将任务记录到 StreamContext 中 + loop_task = asyncio.create_task( + run_chat_stream(stream_id, self), + name=f"chat_stream_{stream_id}" + ) context.stream_loop_task = loop_task - # 更新统计信息 self.stats["active_streams"] += 1 self.stats["total_loops"] += 1 - logger.info(f"🚀 [流循环] stream={stream_id[:8]}, 启动新的流循环任务,任务ID: {id(loop_task)}") + logger.debug(f" [管理器] stream={stream_id[:8]}, 启动驱动器任务") return True except Exception as e: - logger.error(f"❌ [流循环] stream={stream_id[:8]}, 启动失败: {e}") + logger.error(f" [管理器] stream={stream_id[:8]}, 启动失败: {e}") return False async def stop_stream_loop(self, stream_id: str) -> bool: - """停止指定流的循环任务 - + """ + 停止指定流的驱动器任务 + Args: stream_id: 流ID - + Returns: bool: 是否成功停止 """ - # 获取流上下文 context = await self._get_stream_context(stream_id) if not context: - logger.debug(f"流 {stream_id} 上下文不存在,无需停止") return False - # 检查是否有 stream_loop_task if not context.stream_loop_task or context.stream_loop_task.done(): - logger.debug(f"流 {stream_id} 循环不存在或已结束,无需停止") return False task = context.stream_loop_task - if not task.done(): - task.cancel() - try: - # 设置取消超时,避免无限等待 - await asyncio.wait_for(task, timeout=5.0) - except asyncio.CancelledError: - logger.debug(f"流循环任务已取消: {stream_id}") - except asyncio.TimeoutError: - logger.warning(f"流循环任务取消超时: {stream_id}") - except Exception as e: - logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") + task.cancel() + try: + await asyncio.wait_for(task, timeout=5.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass + except Exception as e: + logger.error(f"停止任务时出错: {e}") - # 清空 StreamContext 中的任务记录 context.stream_loop_task = None - logger.debug(f"停止流循环: {stream_id}") return True - async def _stream_loop_worker(self, stream_id: str) -> None: - """单个流的工作循环 - 优化版本 - - Args: - stream_id: 流ID - """ - task_id = id(asyncio.current_task()) - logger.info(f"🔄 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 启动") - - try: - while self.is_running: - try: - # 1. 获取流上下文 - context = await self._get_stream_context(stream_id) - if not context: - logger.warning(f"⚠️ [流工作器] stream={stream_id[:8]}, 无法获取流上下文") - await asyncio.sleep(10.0) - continue - - # 2. 检查是否有消息需要处理 - await self._flush_cached_messages_to_unread(stream_id) - unread_count = self._get_unread_count(context) - force_dispatch = self._needs_force_dispatch_for_context(context, unread_count) - - has_messages = force_dispatch or await self._has_messages_to_process(context) - - if has_messages: - # 🔒 并发保护:如果 Chatter 正在处理中,跳过本轮 - # 这可能发生在:1) 打断后重启循环 2) 处理时间超过轮询间隔 - if context.is_chatter_processing: - if self._recover_stale_chatter_state(stream_id, context): - logger.warning(f"🔄 [流工作器] stream={stream_id[:8]}, 处理标志疑似残留,已尝试自动修复") - else: - logger.debug(f"🔒 [流工作器] stream={stream_id[:8]}, Chatter正在处理中,跳过本轮") - # 不打印"开始处理"日志,直接进入下一轮等待 - # 使用较短的等待时间,等待当前处理完成 - await asyncio.sleep(1.0) - continue - - if force_dispatch: - logger.info(f"⚡ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 未读消息 {unread_count} 条,触发强制分发") - else: - logger.info(f"📨 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 开始处理消息") - - # 3. 在处理前更新能量值(用于下次间隔计算) - try: - asyncio.create_task(self._update_stream_energy(stream_id, context)) - except Exception as e: - logger.debug(f"更新流能量失败 {stream_id}: {e}") - - # 4. 激活chatter处理 - try: - success = await asyncio.wait_for(self._process_stream_messages(stream_id, context), global_config.chat.thinking_timeout) - except asyncio.TimeoutError: - logger.warning(f"⏱️ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理超时") - success = False - # 更新统计 - self.stats["total_process_cycles"] += 1 - if success: - logger.info(f"✅ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理成功") - - # 🔒 处理成功后,等待一小段时间确保清理操作完成 - # 这样可以避免在 chatter_manager 清除未读消息之前就进入下一轮循环 - await asyncio.sleep(0.1) - else: - self.stats["total_failures"] += 1 - logger.debug(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理失败") - - # 5. 计算下次检查间隔 - interval = await self._calculate_interval(stream_id, has_messages) - - # 6. sleep等待下次检查 - # 只在间隔发生变化时输出日志,避免刷屏 - last_interval = self._last_intervals.get(stream_id) - if last_interval is None or abs(interval - last_interval) > 0.01: - logger.info(f"流 {stream_id} 等待周期变化: {interval:.2f}s") - self._last_intervals[stream_id] = interval - await asyncio.sleep(interval) - - except asyncio.CancelledError: - logger.info(f"🛑 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 被取消") - break - except Exception as e: - logger.error(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 出错: {e}") - self.stats["total_failures"] += 1 - await asyncio.sleep(5.0) # 错误时等待5秒再重试 - - finally: - # 清理 StreamContext 中的任务记录 - try: - context = await self._get_stream_context(stream_id) - if context and context.stream_loop_task: - context.stream_loop_task = None - logger.info(f"🧹 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 清理任务记录") - except Exception as e: - logger.debug(f"清理 StreamContext 任务记录失败: {e}") - - # 清理间隔记录 - self._last_intervals.pop(stream_id, None) - - logger.info(f"🏁 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 循环结束") + # ======================================================================== + # 内部方法 - 上下文管理 + # ======================================================================== async def _get_stream_context(self, stream_id: str) -> "StreamContext | None": - """获取流上下文 - - Args: - stream_id: 流ID - - Returns: - Optional[StreamContext]: 流上下文,如果不存在返回None - """ + """获取流上下文""" try: chat_manager = get_chat_manager() chat_stream = await chat_manager.get_stream(stream_id) @@ -330,32 +422,35 @@ class StreamLoopManager: logger.error(f"获取流上下文失败 {stream_id}: {e}") return None - async def _has_messages_to_process(self, context: "StreamContext") -> bool: - """检查是否有消息需要处理 - - Args: - context: 流上下文 - - Returns: - bool: 是否有未读消息 - """ + async def _flush_cached_messages_to_unread(self, stream_id: str) -> list: + """将缓存消息刷新到未读消息列表""" try: - # 检查是否有未读消息 - if hasattr(context, "unread_messages") and context.unread_messages: - return True + context = await self._get_stream_context(stream_id) + if not context: + return [] - return False + if hasattr(context, "flush_cached_messages"): + cached_messages = context.flush_cached_messages() + if cached_messages: + logger.debug(f"刷新缓存消息: stream={stream_id[:8]}, 数量={len(cached_messages)}") + return cached_messages + return [] except Exception as e: - logger.error(f"检查消息状态失败: {e}") - return False + logger.warning(f"刷新缓存失败: {e}") + return [] + + # ======================================================================== + # 内部方法 - 消息处理 + # ======================================================================== async def _process_stream_messages(self, stream_id: str, context: "StreamContext") -> bool: - """处理流消息 - 支持子任务管理 - + """ + 处理流消息 + Args: stream_id: 流ID context: 流上下文 - + Returns: bool: 是否处理成功 """ @@ -363,338 +458,259 @@ class StreamLoopManager: logger.warning(f"Chatter管理器未设置: {stream_id}") return False - # 🔒 二次并发保护(防御性检查) - # 正常情况下不应该触发,如果触发说明有竞态条件 + # 二次并发保护 if context.is_chatter_processing: - logger.warning(f"🔒 [并发保护] stream={stream_id[:8]}, Chatter正在处理中(二次检查触发,可能存在竞态)") + logger.warning(f" [并发保护] stream={stream_id[:8]}, 二次检查触发") return False - # 设置处理状态为正在处理 self._set_stream_processing_status(stream_id, True) chatter_task = None try: start_time = time.time() - # 检查未读消息,如果为空则直接返回(优化:避免无效的 chatter 调用) + + # 检查未读消息 unread_messages = context.get_unread_messages() if not unread_messages: - logger.debug(f"流 {stream_id} 未读消息为空,跳过 chatter 处理") - return True # 返回 True 表示处理完成(虽然没有实际处理) - - # 🔇 静默群组检查:在静默群组中,只有提到 Bot 名字/别名才响应 - if await self._should_skip_for_mute_group(stream_id, unread_messages): - # 清空未读消息,不触发 chatter - from .message_manager import message_manager - await message_manager.clear_stream_unread_messages(stream_id) - logger.debug(f"🔇 流 {stream_id} 在静默列表中且未提及Bot,跳过处理") + logger.debug(f"未读消息为空,跳过处理: {stream_id}") return True - logger.debug(f"流 {stream_id} 有 {len(unread_messages)} 条未读消息,开始处理") + # 静默群组检查 + if await self._should_skip_for_mute_group(stream_id, unread_messages): + from .message_manager import message_manager + await message_manager.clear_stream_unread_messages(stream_id) + logger.debug(f" 静默群组跳过: {stream_id}") + return True - # 设置触发用户ID,以实现回复保护 + logger.debug(f"处理 {len(unread_messages)} 条未读消息: {stream_id}") + + # 设置触发用户ID last_message = context.get_last_message() if last_message: context.triggering_user_id = last_message.user_info.user_id - # 设置 Chatter 正在处理的标志 + # 设置处理标志 context.is_chatter_processing = True - logger.debug(f"设置 Chatter 处理标志: {stream_id}") - # 创建 chatter 处理任务,以便可以在打断时取消 + # 创建 chatter 任务 chatter_task = asyncio.create_task( self.chatter_manager.process_stream_context(stream_id, context), - name=f"chatter_process_{stream_id}" + name=f"chatter_{stream_id}" ) - - # 记录任务句柄,便于后续检测/自愈 context.processing_task = chatter_task - def _cleanup_processing_flag(task: asyncio.Task) -> None: + # 任务完成回调 + def _cleanup(task: asyncio.Task) -> None: try: context.processing_task = None if context.is_chatter_processing: context.is_chatter_processing = False self._set_stream_processing_status(stream_id, False) - logger.debug(f"🔄 [并发保护] stream={stream_id[:8]}, chatter任务结束自动清理处理标志") - except Exception as callback_error: - logger.debug(f"清理chatter处理标志失败: {callback_error}") + except Exception: + pass - chatter_task.add_done_callback(_cleanup_processing_flag) + chatter_task.add_done_callback(_cleanup) - # 等待 chatter 任务完成 + # 等待完成 results = await chatter_task success = results.get("success", False) if success: - process_time = time.time() - start_time - logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)") + logger.debug(f"处理成功: {stream_id} (耗时: {time.time() - start_time:.2f}s)") else: - logger.warning(f"流处理失败: {stream_id} - {results.get('error_message', '未知错误')}") + logger.warning(f"处理失败: {stream_id} - {results.get('error_message', '未知错误')}") return success + except asyncio.CancelledError: if chatter_task and not chatter_task.done(): chatter_task.cancel() raise except Exception as e: - logger.error(f"流处理异常: {stream_id} - {e}") + logger.error(f"处理异常: {stream_id} - {e}") return False finally: - # 清除 Chatter 处理标志 context.is_chatter_processing = False context.processing_task = None - logger.debug(f"清除 Chatter 处理标志: {stream_id}") - - # 无论成功或失败,都要设置处理状态为未处理 self._set_stream_processing_status(stream_id, False) async def _should_skip_for_mute_group(self, stream_id: str, unread_messages: list) -> bool: - """检查是否应该因静默群组而跳过处理 - - 在静默群组中,只有当消息提及 Bot(@、回复、包含名字/别名)时才响应。 - - Args: - stream_id: 流ID - unread_messages: 未读消息列表 - - Returns: - bool: True 表示应该跳过,False 表示正常处理 - """ + """检查是否应该因静默群组而跳过处理""" if global_config is None: return False - - # 获取静默群组列表 + mute_group_list = getattr(global_config.message_receive, "mute_group_list", []) if not mute_group_list: return False - + try: - # 获取 chat_stream 来检查群组信息 chat_manager = get_chat_manager() chat_stream = await chat_manager.get_stream(stream_id) - + if not chat_stream or not chat_stream.group_info: - # 不是群聊,不适用静默规则 return False - + group_id = str(chat_stream.group_info.group_id) if group_id not in mute_group_list: - # 不在静默列表中 return False - - # 在静默列表中,检查是否有消息提及 Bot + + # 检查是否有消息提及 Bot bot_name = getattr(global_config.bot, "nickname", "") bot_aliases = getattr(global_config.bot, "alias_names", []) - bot_qq = str(getattr(global_config.bot, "qq_account", "")) - - # 构建需要检测的关键词列表 mention_keywords = [bot_name] + list(bot_aliases) if bot_name else list(bot_aliases) - mention_keywords = [k for k in mention_keywords if k] # 过滤空字符串 - + mention_keywords = [k for k in mention_keywords if k] + for msg in unread_messages: - # 检查是否被 @ 或回复 if getattr(msg, "is_at", False) or getattr(msg, "is_mentioned", False): - logger.debug(f"🔇 静默群组 {group_id}: 消息被@或回复,允许响应") return False - - # 检查消息内容是否包含 Bot 名字或别名 + content = getattr(msg, "processed_plain_text", "") or getattr(msg, "display_message", "") or "" for keyword in mention_keywords: if keyword and keyword in content: - logger.debug(f"🔇 静默群组 {group_id}: 消息包含关键词 '{keyword}',允许响应") return False - - # 没有任何消息提及 Bot - logger.debug(f"🔇 静默群组 {group_id}: {len(unread_messages)} 条消息均未提及Bot,跳过") + return True - + except Exception as e: - logger.warning(f"检查静默群组时出错: {stream_id}, error={e}") + logger.warning(f"检查静默群组出错: {e}") return False def _set_stream_processing_status(self, stream_id: str, is_processing: bool) -> None: """设置流的处理状态""" try: from .message_manager import message_manager - if message_manager.is_running: message_manager.set_stream_processing_status(stream_id, is_processing) - logger.debug(f"设置流处理状态: stream={stream_id}, processing={is_processing}") + except Exception: + pass - except ImportError: - logger.debug("MessageManager不可用,跳过状态设置") - except Exception as e: - logger.warning(f"设置流处理状态失败: stream={stream_id}, error={e}") - - async def _flush_cached_messages_to_unread(self, stream_id: str) -> list: - """将缓存消息刷新到未读消息列表""" + def _recover_stale_chatter_state(self, stream_id: str, context: "StreamContext") -> bool: + """检测并修复 Chatter 处理标志的假死状态""" try: - # 获取流上下文 - context = await self._get_stream_context(stream_id) - if not context: - logger.warning(f"无法获取流上下文: {stream_id}") - return [] + processing_task = getattr(context, "processing_task", None) - # 使用StreamContext的缓存刷新功能 - if hasattr(context, "flush_cached_messages"): - cached_messages = context.flush_cached_messages() - if cached_messages: - logger.debug(f"从StreamContext刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}") - return cached_messages - else: - logger.debug(f"StreamContext不支持缓存刷新: stream={stream_id}") - return [] + if processing_task is None: + context.is_chatter_processing = False + self._set_stream_processing_status(stream_id, False) + logger.warning(f" [自愈] stream={stream_id[:8]}, 无任务但标志为真") + return True - except Exception as e: - logger.warning(f"刷新StreamContext缓存失败: stream={stream_id}, error={e}") - return [] - async def _update_stream_energy(self, stream_id: str, context: Any) -> None: - """更新流的能量值 + if processing_task.done(): + context.is_chatter_processing = False + context.processing_task = None + self._set_stream_processing_status(stream_id, False) + logger.warning(f" [自愈] stream={stream_id[:8]}, 任务已结束但标志未清") + return True - Args: - stream_id: 流ID - context: 流上下文 (StreamContext) - """ - if global_config is None: - raise RuntimeError("Global config is not initialized") + return False + except Exception: + return False + # ======================================================================== + # 内部方法 - 能量与间隔计算 + # ======================================================================== + + async def _update_stream_energy(self, stream_id: str, context: "StreamContext") -> None: + """更新流的能量值""" try: - from src.chat.message_receive.chat_stream import get_chat_manager - - # 获取聊天流 chat_manager = get_chat_manager() chat_stream = await chat_manager.get_stream(stream_id) if not chat_stream: - logger.debug(f"无法找到聊天流 {stream_id},跳过能量更新") return - # 从 context 获取消息(包括未读和历史消息) - # 合并未读消息和历史消息 + assert global_config is not None + # 合并消息 all_messages = [] - - # 添加历史消息 history_messages = context.get_history_messages(limit=global_config.chat.max_context_size) all_messages.extend(history_messages) - - # 添加未读消息 - unread_messages = context.get_unread_messages() - all_messages.extend(unread_messages) - - # 按时间排序并限制数量 + all_messages.extend(context.get_unread_messages()) all_messages.sort(key=lambda m: m.time) messages = all_messages[-global_config.chat.max_context_size:] - # 获取用户ID - user_id = None - if context.triggering_user_id: - user_id = context.triggering_user_id + user_id = context.triggering_user_id - # 使用能量管理器计算并缓存能量值 energy = await energy_manager.calculate_focus_energy( stream_id=stream_id, messages=messages, user_id=user_id ) - # 同步更新到 ChatStream chat_stream._focus_energy = energy - - logger.debug(f"已更新流 {stream_id} 的能量值: {energy:.3f}") + logger.debug(f"更新能量: {stream_id[:8]} -> {energy:.3f}") except Exception as e: - logger.warning(f"更新流能量失败 {stream_id}: {e}", exc_info=False) + logger.warning(f"更新能量失败: {e}") async def _calculate_interval(self, stream_id: str, has_messages: bool) -> float: - """计算下次检查间隔 - - Args: - stream_id: 流ID - has_messages: 本次是否有消息处理 - - Returns: - float: 间隔时间(秒) - """ + """计算下次检查间隔""" if global_config is None: - raise RuntimeError("Global config is not initialized") + return 5.0 - # 私聊使用最小间隔,快速响应 + # 私聊快速响应 try: chat_manager = get_chat_manager() chat_stream = await chat_manager.get_stream(stream_id) if chat_stream and not chat_stream.group_info: - # 私聊:有消息时快速响应,空转时稍微等待 - min_interval = 0.5 if has_messages else 5.0 - logger.debug(f"流 {stream_id} 私聊模式,使用最小间隔: {min_interval:.2f}s") - return min_interval - except Exception as e: - logger.debug(f"检查流 {stream_id} 是否为私聊失败: {e}") + return 0.5 if has_messages else 5.0 + except Exception: + pass - # 基础间隔 base_interval = getattr(global_config.chat, "distribution_interval", 5.0) - # 如果没有消息,使用更长的间隔 if not has_messages: - return base_interval * 2.0 # 无消息时间隔加倍 + return base_interval * 2.0 - # 尝试使用能量管理器计算间隔 + # 基于能量计算间隔 try: - # 获取当前focus_energy focus_energy = energy_manager.energy_cache.get(stream_id, (0.5, 0))[0] - - # 使用能量管理器计算间隔 interval = energy_manager.get_distribution_interval(focus_energy) - - logger.debug(f"流 {stream_id} 动态间隔: {interval:.2f}s (能量: {focus_energy:.3f})") return interval - - except Exception as e: - logger.debug(f"流 {stream_id} 使用默认间隔: {base_interval:.2f}s ({e})") + except Exception: return base_interval - def _recover_stale_chatter_state(self, stream_id: str, context: "StreamContext") -> bool: - """ - 检测并修复 Chatter 处理标志的假死状态。 + def _needs_force_dispatch_for_context(self, context: "StreamContext", unread_count: int | None = None) -> bool: + """检查是否需要强制分发""" + if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0: + return False - 返回 True 表示已发现并修复了异常状态;False 表示未发现异常。 - """ + if unread_count is None: + try: + unread_count = len(context.unread_messages) if context.unread_messages else 0 + except Exception: + return False + + return unread_count > self.force_dispatch_unread_threshold + + # ======================================================================== + # 辅助方法 + # ======================================================================== + + async def _wait_for_task_cancel(self, stream_id: str, task: asyncio.Task) -> None: + """等待任务取消完成""" try: - processing_task = getattr(context, "processing_task", None) - - # 标志为 True 但没有任务句柄,直接修复 - if processing_task is None: - context.is_chatter_processing = False - self._set_stream_processing_status(stream_id, False) - logger.warning(f"🛠️ [自愈] stream={stream_id[:8]}, 发现无任务但标志为真,已重置") - return True - - # 标志为 True 但任务已经结束/被取消 - if processing_task.done(): - context.is_chatter_processing = False - context.processing_task = None - self._set_stream_processing_status(stream_id, False) - logger.warning(f"🛠️ [自愈] stream={stream_id[:8]}, 任务已结束但标志未清,已重置") - return True - - return False + await asyncio.wait_for(task, timeout=5.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass except Exception as e: - logger.debug(f"检测 Chatter 状态异常失败: stream={stream_id}, error={e}") - return False + logger.error(f"等待任务取消出错: {e}") + + def set_chatter_manager(self, chatter_manager: ChatterManager) -> None: + """设置 Chatter 管理器""" + self.chatter_manager = chatter_manager + logger.debug(f"设置 Chatter 管理器: {chatter_manager.__class__.__name__}") + + # ======================================================================== + # 统计信息 + # ======================================================================== def get_queue_status(self) -> dict[str, Any]: - """获取队列状态 - - Returns: - Dict[str, Any]: 队列状态信息 - """ + """获取队列状态""" current_time = time.time() uptime = current_time - self.stats["start_time"] if self.is_running else 0 - # 从统计信息中获取活跃流数量 - active_streams = self.stats.get("active_streams", 0) - return { - "active_streams": active_streams, + "active_streams": self.stats.get("active_streams", 0), "total_loops": self.stats["total_loops"], "max_concurrent": self.max_concurrent_streams, "is_running": self.is_running, @@ -704,153 +720,24 @@ class StreamLoopManager: "stats": self.stats.copy(), } - def set_chatter_manager(self, chatter_manager: ChatterManager) -> None: - """设置chatter管理器 - - Args: - chatter_manager: chatter管理器实例 - """ - self.chatter_manager = chatter_manager - logger.debug(f"设置chatter管理器: {chatter_manager.__class__.__name__}") - - async def _should_force_dispatch_for_stream(self, stream_id: str) -> bool: - if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0: - return False - - try: - chat_manager = get_chat_manager() - chat_stream = await chat_manager.get_stream(stream_id) - if not chat_stream: - return False - - unread = getattr(chat_stream.context, "unread_messages", []) - return len(unread) > self.force_dispatch_unread_threshold - except Exception as e: - logger.debug(f"检查流 {stream_id} 是否需要强制分发失败: {e}") - return False - - def _get_unread_count(self, context: "StreamContext") -> int: - try: - unread_messages = context.unread_messages - if unread_messages is None: - return 0 - return len(unread_messages) - except Exception: - return 0 - - def _needs_force_dispatch_for_context(self, context: "StreamContext", unread_count: int | None = None) -> bool: - if not self.force_dispatch_unread_threshold or self.force_dispatch_unread_threshold <= 0: - return False - - count = unread_count if unread_count is not None else self._get_unread_count(context) - return count > self.force_dispatch_unread_threshold - def get_performance_summary(self) -> dict[str, Any]: - """获取性能摘要 - - Returns: - Dict[str, Any]: 性能摘要 - """ + """获取性能摘要""" current_time = time.time() uptime = current_time - self.stats["start_time"] - - # 计算吞吐量 - throughput = self.stats["total_process_cycles"] / max(1, uptime / 3600) # 每小时处理次数 - - # 从统计信息中获取活跃流数量 - active_streams = self.stats.get("active_streams", 0) + throughput = self.stats["total_process_cycles"] / max(1, uptime / 3600) return { "uptime_hours": uptime / 3600, - "active_streams": active_streams, + "active_streams": self.stats.get("active_streams", 0), "total_process_cycles": self.stats["total_process_cycles"], "total_failures": self.stats["total_failures"], "throughput_per_hour": throughput, "max_concurrent_streams": self.max_concurrent_streams, } - async def _refresh_focus_energy(self, stream_id: str) -> None: - """分发完成后基于历史消息刷新能量值""" - try: - chat_manager = get_chat_manager() - chat_stream = await chat_manager.get_stream(stream_id) - if not chat_stream: - logger.debug(f"刷新能量时未找到聊天流: {stream_id}") - return - await chat_stream.context.refresh_focus_energy_from_history() - logger.debug(f"已刷新聊天流 {stream_id} 的聚焦能量") - except Exception as e: - logger.warning(f"刷新聊天流 {stream_id} 能量失败: {e}") +# ============================================================================ +# 全局实例 +# ============================================================================ - async def _wait_for_task_cancel(self, stream_id: str, task: asyncio.Task) -> None: - """等待任务取消完成,带有超时控制 - - Args: - stream_id: 流ID - task: 要等待取消的任务 - """ - try: - await asyncio.wait_for(task, timeout=5.0) - logger.debug(f"流循环任务已正常结束: {stream_id}") - except asyncio.CancelledError: - logger.debug(f"流循环任务已取消: {stream_id}") - except asyncio.TimeoutError: - logger.warning(f"流循环任务取消超时: {stream_id}") - except Exception as e: - logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") - - async def _force_dispatch_stream(self, stream_id: str) -> None: - """强制分发流处理 - - 当流的未读消息超过阈值时,强制触发分发处理 - 这个方法主要用于突破并发限制时的紧急处理 - - 注意:此方法目前未被使用,相关功能已集成到 start_stream_loop 方法中 - - Args: - stream_id: 流ID - """ - logger.debug(f"强制分发流处理: {stream_id}") - - try: - # 获取流上下文 - context = await self._get_stream_context(stream_id) - if not context: - logger.warning(f"强制分发时未找到流上下文: {stream_id}") - return - - # 检查是否有现有的 stream_loop_task - if context.stream_loop_task and not context.stream_loop_task.done(): - logger.debug(f"发现现有流循环 {stream_id},将先取消再重新创建") - existing_task = context.stream_loop_task - existing_task.cancel() - # 创建异步任务来等待取消完成,并添加异常处理 - cancel_task = asyncio.create_task( - self._wait_for_task_cancel(stream_id, existing_task), name=f"cancel_existing_loop_{stream_id}" - ) - # 为取消任务添加异常处理,避免孤儿任务 - cancel_task.add_done_callback( - lambda task: logger.debug(f"取消任务完成: {stream_id}") - if not task.exception() - else logger.error(f"取消任务异常: {stream_id} - {task.exception()}") - ) - - # 检查未读消息数量 - unread_count = self._get_unread_count(context) - logger.info(f"流 {stream_id} 当前未读消息数: {unread_count}") - - # 使用 start_stream_loop 重新创建流循环任务 - success = await self.start_stream_loop(stream_id, force=True) - - if success: - logger.info(f"已创建强制分发流循环: {stream_id}") - else: - logger.warning(f"创建强制分发流循环失败: {stream_id}") - - except Exception as e: - logger.error(f"强制分发流处理失败 {stream_id}: {e}") - - -# 全局流循环管理器实例 stream_loop_manager = StreamLoopManager()