diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index e3b3c4070..6689d84ac 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -321,11 +321,14 @@ class StreamLoopManager: # 🔒 并发保护:如果 Chatter 正在处理中,跳过本轮 # 这可能发生在:1) 打断后重启循环 2) 处理时间超过轮询间隔 if context.is_chatter_processing: - logger.debug(f"🔒 [流工作器] stream={stream_id[:8]}, Chatter正在处理中,跳过本轮") - # 不打印"开始处理"日志,直接进入下一轮等待 - # 使用较短的等待时间,等待当前处理完成 - await asyncio.sleep(1.0) - continue + if self._recover_stale_chatter_state(stream_id, context): + logger.warning(f"🔄 [流工作器] stream={stream_id[:8]}, 处理标志疑似残留,已尝试自动修复") + else: + logger.debug(f"🔒 [流工作器] stream={stream_id[:8]}, Chatter正在处理中,跳过本轮") + # 不打印"开始处理"日志,直接进入下一轮等待 + # 使用较短的等待时间,等待当前处理完成 + await asyncio.sleep(1.0) + continue if force_dispatch: logger.info(f"⚡ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 未读消息 {unread_count} 条,触发强制分发") @@ -529,6 +532,21 @@ class StreamLoopManager: name=f"chatter_process_{stream_id}" ) + # 记录任务句柄,便于后续检测/自愈 + context.processing_task = chatter_task + + def _cleanup_processing_flag(task: asyncio.Task) -> None: + try: + context.processing_task = None + if context.is_chatter_processing: + context.is_chatter_processing = False + self._set_stream_processing_status(stream_id, False) + logger.debug(f"🔄 [并发保护] stream={stream_id[:8]}, chatter任务结束自动清理处理标志") + except Exception as callback_error: + logger.debug(f"清理chatter处理标志失败: {callback_error}") + + chatter_task.add_done_callback(_cleanup_processing_flag) + # 等待 chatter 任务完成 results = await chatter_task success = results.get("success", False) @@ -550,6 +568,7 @@ class StreamLoopManager: finally: # 清除 Chatter 处理标志 context.is_chatter_processing = False + context.processing_task = None logger.debug(f"清除 Chatter 处理标志: {stream_id}") # 无论成功或失败,都要设置处理状态为未处理 @@ -759,6 +778,35 @@ class StreamLoopManager: logger.debug(f"流 {stream_id} 使用默认间隔: {base_interval:.2f}s ({e})") return base_interval + def _recover_stale_chatter_state(self, stream_id: str, context: "StreamContext") -> bool: + """ + 检测并修复 Chatter 处理标志的假死状态。 + + 返回 True 表示已发现并修复了异常状态;False 表示未发现异常。 + """ + try: + processing_task = getattr(context, "processing_task", None) + + # 标志为 True 但没有任务句柄,直接修复 + if processing_task is None: + context.is_chatter_processing = False + self._set_stream_processing_status(stream_id, False) + logger.warning(f"🛠️ [自愈] stream={stream_id[:8]}, 发现无任务但标志为真,已重置") + return True + + # 标志为 True 但任务已经结束/被取消 + if processing_task.done(): + context.is_chatter_processing = False + context.processing_task = None + self._set_stream_processing_status(stream_id, False) + logger.warning(f"🛠️ [自愈] stream={stream_id[:8]}, 任务已结束但标志未清,已重置") + return True + + return False + except Exception as e: + logger.debug(f"检测 Chatter 状态异常失败: stream={stream_id}, error={e}") + return False + def get_queue_status(self) -> dict[str, Any]: """获取队列状态