From c46df81bcafbeb78cd2b59075d25eebdd10297f1 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Mon, 10 Nov 2025 20:03:01 +0800 Subject: [PATCH] =?UTF-8?q?feat(stream=5Floop=5Fmanager):=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E6=B5=81=E5=BE=AA=E7=8E=AF=E7=AE=A1=E7=90=86=EF=BC=8C?= =?UTF-8?q?=E5=A2=9E=E5=BC=BA=E6=97=A5=E5=BF=97=E8=AE=B0=E5=BD=95=E5=92=8C?= =?UTF-8?q?=E9=94=99=E8=AF=AF=E5=A4=84=E7=90=86=EF=BC=8C=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E4=BF=9D=E6=8A=A4=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message_manager/distribution_manager.py | 47 ++++++++++++------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index 7f7b1e65f..966e6ea10 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -124,24 +124,31 @@ class StreamLoopManager: # 快速路径:如果流已存在且不是强制启动,无需处理 if not force and context.stream_loop_task and not context.stream_loop_task.done(): - logger.debug(f"流 {stream_id} 循环已在运行") + logger.debug(f"🔄 [流循环] stream={stream_id[:8]}, 循环已在运行,跳过启动") return True # 如果是强制启动且任务仍在运行,先取消旧任务 if force and context.stream_loop_task and not context.stream_loop_task.done(): - logger.debug(f"强制启动模式:先取消现有流循环任务: {stream_id}") + logger.warning(f"⚠️ [流循环] stream={stream_id[:8]}, 强制启动模式:先取消现有任务") old_task = context.stream_loop_task old_task.cancel() try: await asyncio.wait_for(old_task, timeout=2.0) - logger.debug(f"旧流循环任务已结束: {stream_id}") + logger.debug(f"✅ [流循环] stream={stream_id[:8]}, 旧任务已结束") except (asyncio.TimeoutError, asyncio.CancelledError): - logger.debug(f"旧流循环任务已取消或超时: {stream_id}") + logger.debug(f"⏱️ [流循环] stream={stream_id[:8]}, 旧任务已取消或超时") except Exception as e: - logger.warning(f"等待旧任务结束时出错: {e}") + logger.warning(f"❌ [流循环] stream={stream_id[:8]}, 等待旧任务结束时出错: {e}") # 创建流循环任务 try: + # 检查是否有旧任务残留 + if context.stream_loop_task and not context.stream_loop_task.done(): + logger.error(f"🚨 [流循环] stream={stream_id[:8]}, 错误:旧任务仍在运行!这不应该发生!") + # 紧急取消 + context.stream_loop_task.cancel() + await asyncio.sleep(0.1) + loop_task = asyncio.create_task(self._stream_loop_worker(stream_id), name=f"stream_loop_{stream_id}") # 将任务记录到 StreamContext 中 @@ -151,11 +158,11 @@ class StreamLoopManager: self.stats["active_streams"] += 1 self.stats["total_loops"] += 1 - logger.debug(f"启动流循环任务: {stream_id}") + logger.info(f"🚀 [流循环] stream={stream_id[:8]}, 启动新的流循环任务,任务ID: {id(loop_task)}") return True except Exception as e: - logger.error(f"启动流循环任务失败 {stream_id}: {e}") + logger.error(f"❌ [流循环] stream={stream_id[:8]}, 启动失败: {e}") return False async def stop_stream_loop(self, stream_id: str) -> bool: @@ -203,7 +210,8 @@ class StreamLoopManager: Args: stream_id: 流ID """ - logger.debug(f"流循环工作器启动: {stream_id}") + task_id = id(asyncio.current_task()) + logger.info(f"🔄 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 启动") try: while self.is_running: @@ -211,7 +219,7 @@ class StreamLoopManager: # 1. 获取流上下文 context = await self._get_stream_context(stream_id) if not context: - logger.warning(f"无法获取流上下文: {stream_id}") + logger.warning(f"⚠️ [流工作器] stream={stream_id[:8]}, 无法获取流上下文") await asyncio.sleep(10.0) continue @@ -224,7 +232,9 @@ class StreamLoopManager: if has_messages: if force_dispatch: - logger.info("流 %s 未读消息 %d 条,触发强制分发", stream_id, unread_count) + logger.info(f"⚡ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 未读消息 {unread_count} 条,触发强制分发") + else: + logger.info(f"📨 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 开始处理消息") # 3. 在处理前更新能量值(用于下次间隔计算) try: @@ -238,10 +248,10 @@ class StreamLoopManager: # 更新统计 self.stats["total_process_cycles"] += 1 if success: - logger.debug(f"流处理成功: {stream_id}") + logger.info(f"✅ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理成功") else: self.stats["total_failures"] += 1 - logger.warning(f"流处理失败: {stream_id}") + logger.warning(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理失败") # 5. 计算下次检查间隔 interval = await self._calculate_interval(stream_id, has_messages) @@ -255,10 +265,10 @@ class StreamLoopManager: await asyncio.sleep(interval) except asyncio.CancelledError: - logger.debug(f"流循环被取消: {stream_id}") + logger.info(f"🛑 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 被取消") break except Exception as e: - logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True) + logger.error(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 出错: {e}", exc_info=True) self.stats["total_failures"] += 1 await asyncio.sleep(5.0) # 错误时等待5秒再重试 @@ -268,14 +278,14 @@ class StreamLoopManager: context = await self._get_stream_context(stream_id) if context and context.stream_loop_task: context.stream_loop_task = None - logger.debug(f"清理 StreamContext 中的流循环任务: {stream_id}") + logger.info(f"🧹 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 清理任务记录") except Exception as e: logger.debug(f"清理 StreamContext 任务记录失败: {e}") # 清理间隔记录 self._last_intervals.pop(stream_id, None) - logger.debug(f"流循环结束: {stream_id}") + logger.info(f"🏁 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 循环结束") async def _get_stream_context(self, stream_id: str) -> StreamContext | None: """获取流上下文 @@ -329,6 +339,11 @@ class StreamLoopManager: logger.warning(f"Chatter管理器未设置: {stream_id}") return False + # 🔒 防止并发处理:如果已经在处理中,直接返回 + if context.is_chatter_processing: + logger.warning(f"🔒 [并发保护] stream={stream_id[:8]}, Chatter 正在处理中,跳过本次处理请求") + return False + # 设置处理状态为正在处理 self._set_stream_processing_status(stream_id, True)