From 74d022c489ecd2a087d0531f7795c5974d278eaa Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Fri, 3 Oct 2025 14:17:29 +0800 Subject: [PATCH] =?UTF-8?q?feat(chat):=20=E5=A2=9E=E5=BC=BA=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=AE=A1=E7=90=86=E5=92=8C=E5=8F=96=E6=B6=88=E6=9C=BA?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加新的任务管理方法来更好地处理异步任务的生命周期: - 新增 cancel_processing_task、remove_processing_task 等方法 - 在流循环清理时自动取消关联的 chatter 处理任务 - 添加活跃任务统计信息到管理器统计中 - 改进 prompt 构建时的任务类型检查和错误处理 --- src/chat/chatter_manager.py | 67 ++++++++++++++ .../message_manager/distribution_manager.py | 90 ++++++++----------- src/chat/utils/prompt.py | 12 ++- 3 files changed, 113 insertions(+), 56 deletions(-) diff --git a/src/chat/chatter_manager.py b/src/chat/chatter_manager.py index 8ccdfd84a..8a5f98ebf 100644 --- a/src/chat/chatter_manager.py +++ b/src/chat/chatter_manager.py @@ -141,12 +141,16 @@ class ChatterManager: self.stats["failed_executions"] += 1 logger.error(f"处理流 {stream_id} 时发生错误: {e}") raise + finally: + # 无论成功还是失败,都要清理处理任务记录 + self.remove_processing_task(stream_id) def get_stats(self) -> dict[str, Any]: """获取管理器统计信息""" stats = self.stats.copy() stats["active_instances"] = len(self.instances) stats["registered_chatter_types"] = len(self.chatter_classes) + stats["active_processing_tasks"] = len(self.get_active_processing_tasks()) return stats def reset_stats(self): @@ -165,3 +169,66 @@ class ChatterManager: def get_processing_task(self, stream_id: str) -> asyncio.Task | None: """获取流的处理任务""" return self._processing_tasks.get(stream_id) + + def cancel_processing_task(self, stream_id: str) -> bool: + """取消流的处理任务 + + Args: + stream_id: 流ID + + Returns: + bool: 是否成功取消了任务 + """ + task = self._processing_tasks.get(stream_id) + if task and not task.done(): + try: + task.cancel() + logger.info(f"已取消流 {stream_id} 的处理任务") + return True + except Exception as e: + logger.warning(f"取消流 {stream_id} 的处理任务时出错: {e}") + return False + return False + + def remove_processing_task(self, stream_id: str) -> None: + """移除流的处理任务记录 + + Args: + stream_id: 流ID + """ + if stream_id in self._processing_tasks: + del self._processing_tasks[stream_id] + logger.debug(f"已移除流 {stream_id} 的处理任务记录") + + def get_active_processing_tasks(self) -> dict[str, asyncio.Task]: + """获取所有活跃的处理任务 + + Returns: + Dict[str, asyncio.Task]: 流ID到处理任务的映射 + """ + # 过滤掉已完成的任务 + active_tasks = {} + for stream_id, task in self._processing_tasks.items(): + if not task.done(): + active_tasks[stream_id] = task + else: + logger.debug(f"清理已完成的处理任务: {stream_id}") + del self._processing_tasks[stream_id] + + return active_tasks + + async def cancel_all_processing_tasks(self) -> int: + """取消所有活跃的处理任务 + + Returns: + int: 成功取消的任务数量 + """ + active_tasks = self.get_active_processing_tasks() + cancelled_count = 0 + + for stream_id, task in active_tasks.items(): + if self.cancel_processing_task(stream_id): + cancelled_count += 1 + + logger.info(f"已取消 {cancelled_count} 个活跃处理任务") + return cancelled_count diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index edffe966a..75a0033e0 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -68,33 +68,31 @@ class StreamLoopManager: # 取消所有流循环 try: - # 使用带超时的锁获取,避免无限等待 - lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=10.0) - if not lock_acquired: - logger.error("停止管理器时获取锁超时") - else: + # 创建任务列表以便并发取消 + cancel_tasks = [] + for stream_id, task in list(self.stream_loops.items()): + if not task.done(): + task.cancel() + cancel_tasks.append((stream_id, task)) + + # 并发等待所有任务取消 + if cancel_tasks: + logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...") + await asyncio.gather( + *[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks], + return_exceptions=True + ) + + # 取消所有活跃的 chatter 处理任务 + if self.chatter_manager: try: - # 创建任务列表以便并发取消 - cancel_tasks = [] - for stream_id, task in list(self.stream_loops.items()): - if not task.done(): - task.cancel() - cancel_tasks.append((stream_id, task)) + cancelled_count = await self.chatter_manager.cancel_all_processing_tasks() + logger.info(f"已取消 {cancelled_count} 个活跃的 chatter 处理任务") + except Exception as e: + logger.error(f"取消 chatter 处理任务时出错: {e}") - # 并发等待所有任务取消 - if cancel_tasks: - logger.info(f"正在取消 {len(cancel_tasks)} 个流循环任务...") - await asyncio.gather( - *[self._wait_for_task_cancel(stream_id, task) for stream_id, task in cancel_tasks], - return_exceptions=True - ) - - self.stream_loops.clear() - logger.info("所有流循环已清理") - finally: - self.loop_lock.release() - except asyncio.TimeoutError: - logger.error("停止管理器时获取锁超时") + self.stream_loops.clear() + logger.info("所有流循环已清理") except Exception as e: logger.error(f"停止管理器时出错: {e}") @@ -193,31 +191,15 @@ class StreamLoopManager: except Exception as e: logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") - try: - # 双重检查:在获取锁后再次检查流是否存在 - if stream_id not in self.stream_loops: - logger.debug(f"流 {stream_id} 循环不存在(双重检查)") - return False + # 取消关联的 chatter 处理任务 + if self.chatter_manager: + cancelled = self.chatter_manager.cancel_processing_task(stream_id) + if cancelled: + logger.info(f"已取消关联的 chatter 处理任务: {stream_id}") - task = self.stream_loops[stream_id] - if not task.done(): - task.cancel() - try: - # 设置取消超时,避免无限等待 - await asyncio.wait_for(task, timeout=5.0) - except asyncio.CancelledError: - logger.debug(f"流循环任务已取消: {stream_id}") - except asyncio.TimeoutError: - logger.warning(f"流循环任务取消超时: {stream_id}") - except Exception as e: - logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}") - - del self.stream_loops[stream_id] - logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})") - return True - finally: - # 确保锁被释放 - self.loop_lock.release() + del self.stream_loops[stream_id] + logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})") + return True async def _stream_loop(self, stream_id: str) -> None: """单个流的无限循环 @@ -278,10 +260,12 @@ class StreamLoopManager: except asyncio.CancelledError: logger.info(f"流循环被取消: {stream_id}") if self.chatter_manager: - task = self.chatter_manager.get_processing_task(stream_id) - if task and not task.done(): - task.cancel() - logger.debug(f"已取消 chatter 处理任务: {stream_id}") + # 使用 ChatterManager 的新方法取消处理任务 + cancelled = self.chatter_manager.cancel_processing_task(stream_id) + if cancelled: + logger.info(f"成功取消 chatter 处理任务: {stream_id}") + else: + logger.debug(f"没有需要取消的 chatter 处理任务: {stream_id}") break except Exception as e: logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True) diff --git a/src/chat/utils/prompt.py b/src/chat/utils/prompt.py index fa4b82eea..5e1d1798e 100644 --- a/src/chat/utils/prompt.py +++ b/src/chat/utils/prompt.py @@ -397,9 +397,15 @@ class Prompt: task_timeout = task_timeouts.get(task_name, 2.0) # 默认2秒 try: - result = await asyncio.wait_for(task, timeout=task_timeout) - results.append(result) - logger.debug(f"构建任务{task_name}完成 ({task_timeout}s)") + # 确保任务是一个协程对象 + if asyncio.iscoroutine(task): + result = await asyncio.wait_for(task, timeout=task_timeout) + results.append(result) + logger.debug(f"构建任务{task_name}完成 ({task_timeout}s)") + else: + logger.warning(f"任务{task_name}不是协程对象,类型: {type(task)},跳过处理") + default_result = self._get_default_result_for_task(task_name) + results.append(default_result) except asyncio.TimeoutError: logger.warning(f"构建任务{task_name}超时 ({task_timeout}s),使用默认值") # 为超时任务提供默认值