From 87e0a7f079bb8fb1f4cd37d8634ece201cf3cd64 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Thu, 23 Oct 2025 19:31:23 +0800 Subject: [PATCH] =?UTF-8?q?feat(chat):=20=E6=94=B9=E8=BF=9B=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E4=BB=BB=E5=8A=A1=E7=AE=A1=E7=90=86=E5=92=8C=E5=8F=96?= =?UTF-8?q?=E6=B6=88=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 StreamLoopManager、MessageManager 和 DefaultReplyer 中增加子任务跟踪机制 - 统一处理 asyncio.CancelledError 异常,确保任务取消时能正确清理子任务 - 使用 child_tasks 集合管理子任务生命周期,防止任务泄漏 - 优化记忆存储等后台任务的创建方式,支持优雅取消 - 改进错误处理逻辑,确保异常情况下也能清理子任务资源 --- .../message_manager/distribution_manager.py | 22 +++++++- src/chat/message_manager/message_manager.py | 51 ++++++++++-------- src/chat/replyer/default_generator.py | 52 ++++++++++++++++--- 3 files changed, 93 insertions(+), 32 deletions(-) diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index 8ab231da2..48cc17e23 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -351,7 +351,7 @@ class StreamLoopManager: return False async def _process_stream_messages(self, stream_id: str, context: StreamContext) -> bool: - """处理流消息 + """处理流消息 - 支持子任务管理 Args: stream_id: 流ID @@ -367,6 +367,9 @@ class StreamLoopManager: # 设置处理状态为正在处理 self._set_stream_processing_status(stream_id, True) + # 子任务跟踪 + child_tasks = set() + try: start_time = time.time() @@ -375,6 +378,11 @@ class StreamLoopManager: if cached_messages: logger.info(f"处理开始前刷新缓存消息: stream={stream_id}, 数量={len(cached_messages)}") + # 创建子任务用于刷新能量(不阻塞主流程) + energy_task = asyncio.create_task(self._refresh_focus_energy(stream_id)) + child_tasks.add(energy_task) + energy_task.add_done_callback(lambda t: child_tasks.discard(t)) + # 直接调用chatter_manager处理流上下文 task = asyncio.create_task(self.chatter_manager.process_stream_context(stream_id, context)) self.chatter_manager.set_processing_task(stream_id, task) @@ -387,7 +395,6 @@ class StreamLoopManager: if additional_messages: logger.info(f"处理完成后刷新新消息: stream={stream_id}, 数量={len(additional_messages)}") - asyncio.create_task(self._refresh_focus_energy(stream_id)) process_time = time.time() - start_time logger.debug(f"流处理成功: {stream_id} (耗时: {process_time:.2f}s)") else: @@ -395,8 +402,19 @@ class StreamLoopManager: return success + except asyncio.CancelledError: + logger.info(f"流处理被取消: {stream_id}") + # 取消所有子任务 + for child_task in child_tasks: + if not child_task.done(): + child_task.cancel() + raise except Exception as e: logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True) + # 异常时也要清理子任务 + for child_task in child_tasks: + if not child_task.done(): + child_task.cancel() return False finally: # 无论成功或失败,都要设置处理状态为未处理 diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 19579dae4..1ca61c35d 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -434,7 +434,7 @@ class MessageManager: await self._trigger_reprocess(chat_stream) async def _trigger_reprocess(self, chat_stream: ChatStream): - """重新处理聊天流的核心逻辑""" + """重新处理聊天流的核心逻辑 - 支持子任务管理""" try: stream_id = chat_stream.stream_id @@ -454,38 +454,45 @@ class MessageManager: logger.info(f"💬 开始重新处理 {len(unread_messages)} 条未读消息: {stream_id}") - # 创建新的处理任务 + # 创建处理任务并使用try-catch实现子任务管理 task = asyncio.create_task( - self.chatter_manager.process_stream_context(stream_id, context), + self._managed_reprocess_with_cleanup(stream_id, context), name=f"reprocess_{stream_id}_{int(time.time())}" ) # 设置处理任务 self.chatter_manager.set_processing_task(stream_id, task) - # 等待处理完成(使用超时防止无限等待) - try: - result = await asyncio.wait_for(task, timeout=30.0) - success = result.get("success", False) - actions_count = result.get("actions_count", 0) - - if success: - logger.info(f"✅ 聊天流 {stream_id} 重新处理成功: 执行了 {actions_count} 个动作") - else: - logger.warning(f"❌ 聊天流 {stream_id} 重新处理失败") - - except asyncio.TimeoutError: - logger.warning(f"⏰ 聊天流 {stream_id} 重新处理超时") - if not task.done(): - task.cancel() - except Exception as e: - logger.error(f"💥 聊天流 {stream_id} 重新处理出错: {e}") - if not task.done(): - task.cancel() + # 不等待完成,让它异步执行 + # 如果需要等待,调用者会等待 chatter_manager.process_stream_context except Exception as e: logger.error(f"🚨 触发重新处理时出错: {e}") + async def _managed_reprocess_with_cleanup(self, stream_id: str, context): + """带清理功能的重新处理""" + child_tasks = set() # 跟踪子任务 + + try: + # 处理流上下文 + result = await self.chatter_manager.process_stream_context(stream_id, context) + return result + + except asyncio.CancelledError: + logger.info(f"重新处理任务被取消: {stream_id}") + # 取消所有子任务 + for child_task in child_tasks: + if not child_task.done(): + child_task.cancel() + raise + except Exception as e: + logger.error(f"重新处理任务执行出错: {stream_id} - {e}") + # 清理子任务 + for child_task in child_tasks: + if not child_task.done(): + child_task.cancel() + raise + async def clear_all_unread_messages(self, stream_id: str): """清除指定上下文中的所有未读消息,在消息处理完成后调用""" try: diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 211e345da..0a41f0f0b 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -290,6 +290,9 @@ class DefaultReplyer: # 初始化聊天信息 await self._initialize_chat_info() + # 子任务跟踪 - 用于取消管理 + child_tasks = set() + prompt = None selected_expressions = None if available_actions is None: @@ -366,19 +369,37 @@ class DefaultReplyer: # 回复生成成功后,异步存储聊天记忆(不阻塞返回) try: - await self._store_chat_memory_async(reply_to, reply_message) + # 将记忆存储作为子任务创建,可以被取消 + memory_task = asyncio.create_task( + self._store_chat_memory_async(reply_to, reply_message), + name=f"store_memory_{self.chat_stream.stream_id}" + ) + # 不等待完成,让它在后台运行 + # 如果父任务被取消,这个子任务也会被垃圾回收 + logger.debug(f"创建记忆存储子任务: {memory_task.get_name()}") except Exception as memory_e: # 记忆存储失败不应该影响回复生成的成功返回 logger.warning(f"记忆存储失败,但不影响回复生成: {memory_e}") return True, llm_response, prompt + except asyncio.CancelledError: + logger.info(f"回复生成被取消: {self.chat_stream.stream_id}") + # 取消所有子任务 + for child_task in child_tasks: + if not child_task.done(): + child_task.cancel() + raise except UserWarning as uw: raise uw except Exception as e: logger.error(f"回复生成意外失败: {e}") traceback.print_exc() - return False, None, prompt, selected_expressions + # 异常时也要清理子任务 + for child_task in child_tasks: + if not child_task.done(): + child_task.cancel() + return False, None, prompt async def rewrite_reply_with_context( self, @@ -1385,7 +1406,15 @@ class DefaultReplyer: logger.info(f"为超时任务 {task_name} 提供默认值") return task_name, default_values[task_name], timeout - task_results = await asyncio.gather(*(get_task_result(name, task) for name, task in tasks.items())) + try: + task_results = await asyncio.gather(*(get_task_result(name, task) for name, task in tasks.items())) + except asyncio.CancelledError: + logger.info("Prompt构建任务被取消,正在清理子任务") + # 取消所有未完成的子任务 + for name, task in tasks.items(): + if not task.done(): + task.cancel() + raise # 任务名称中英文映射 task_name_mapping = { @@ -1683,11 +1712,14 @@ class DefaultReplyer: ) # 并行执行2个构建任务 - (expression_habits_block, selected_expressions), relation_info = await asyncio.gather( - self.build_expression_habits(chat_talking_prompt_half, target), - self.build_relation_info(sender, target), - ) - + try: + expression_habits_block, relation_info = await asyncio.gather( + self.build_expression_habits(chat_talking_prompt_half, target), + self.build_relation_info(sender, target), + ) + except asyncio.CancelledError: + logger.info("表达式和关系信息构建被取消") + raise keywords_reaction_prompt = await self.build_keywords_reaction_prompt(target) @@ -2053,6 +2085,10 @@ class DefaultReplyer: logger.debug(f"已启动记忆存储任务,用户: {memory_user_display or memory_user_id}") + except asyncio.CancelledError: + logger.debug("记忆存储任务被取消") + # 这是正常情况,不需要清理子任务,因为是叶子节点 + raise except Exception as e: logger.error(f"存储聊天记忆失败: {e}")