From b24eaba7ff1544fbb6bc2a20d995e7148708ebaf Mon Sep 17 00:00:00 2001 From: tt-P607 <68868379+tt-P607@users.noreply.github.com> Date: Fri, 3 Oct 2025 10:15:24 +0800 Subject: [PATCH] =?UTF-8?q?fix(chat):=20=E7=A1=AE=E4=BF=9D=E5=9C=A8?= =?UTF-8?q?=E6=B5=81=E5=8F=96=E6=B6=88=E6=97=B6=E6=AD=A3=E7=A1=AE=E5=8F=96?= =?UTF-8?q?=E6=B6=88chatter=E5=A4=84=E7=90=86=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 先前,当一个流循环(stream loop)被取消时,为其创建的 chatter 处理任务(`process_stream_context`)不会被一并取消。 这可能导致任务泄露,即“孤儿”任务在后台继续运行,消耗资源并可能引发意外行为。 本次修改引入了一个任务跟踪机制: - `ChatterManager`现在会记录每个流正在运行的处理任务。 - 当流循环捕获到 `CancelledError` 时,它会主动取消关联的 chatter 任务。 这确保了在流停止时,相关的计算资源能够被正确、及时地释放,提高了系统的健壮性。 --- src/chat/chatter_manager.py | 10 ++++++++++ src/chat/message_manager/distribution_manager.py | 10 ++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/chat/chatter_manager.py b/src/chat/chatter_manager.py index d8eda9baa..cf9d3b039 100644 --- a/src/chat/chatter_manager.py +++ b/src/chat/chatter_manager.py @@ -1,6 +1,7 @@ import time from typing import Any +import asyncio from src.chat.planner_actions.action_manager import ChatterActionManager from src.common.data_models.message_manager_data_model import StreamContext from src.common.logger import get_logger @@ -15,6 +16,7 @@ class ChatterManager: self.action_manager = action_manager self.chatter_classes: dict[ChatType, list[type]] = {} self.instances: dict[str, BaseChatter] = {} + self._processing_tasks: dict[str, asyncio.Task] = {} # 管理器统计 self.stats = { @@ -155,3 +157,11 @@ class ChatterManager: "successful_executions": 0, "failed_executions": 0, } + + def set_processing_task(self, stream_id: str, task: asyncio.Task): + """设置流的处理任务""" + self._processing_tasks[stream_id] = task + + def get_processing_task(self, stream_id: str) -> asyncio.Task | None: + """获取流的处理任务""" + return self._processing_tasks.get(stream_id) diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index fa6bcea0d..516691bae 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -301,6 +301,11 @@ class StreamLoopManager: except asyncio.CancelledError: logger.info(f"流循环被取消: {stream_id}") + if self.chatter_manager: + task = self.chatter_manager.get_processing_task(stream_id) + if task and not task.done(): + task.cancel() + logger.debug(f"已取消 chatter 处理任务: {stream_id}") break except Exception as e: logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True) @@ -388,8 +393,9 @@ class StreamLoopManager: start_time = time.time() # 直接调用chatter_manager处理流上下文 - context.processing_task = asyncio.create_task(self.chatter_manager.process_stream_context(stream_id, context)) - results = await context.processing_task + task = asyncio.create_task(self.chatter_manager.process_stream_context(stream_id, context)) + self.chatter_manager.set_processing_task(stream_id, task) + results = await task success = results.get("success", False) if success: