fix(chat): 确保在流取消时正确取消chatter处理任务
先前,当一个流循环(stream loop)被取消时,为其创建的 chatter 处理任务(`process_stream_context`)不会被一并取消。 这可能导致任务泄露,即“孤儿”任务在后台继续运行,消耗资源并可能引发意外行为。 本次修改引入了一个任务跟踪机制: - `ChatterManager`现在会记录每个流正在运行的处理任务。 - 当流循环捕获到 `CancelledError` 时,它会主动取消关联的 chatter 任务。 这确保了在流停止时,相关的计算资源能够被正确、及时地释放,提高了系统的健壮性。
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
import time
|
import time
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
import asyncio
|
||||||
from src.chat.planner_actions.action_manager import ChatterActionManager
|
from src.chat.planner_actions.action_manager import ChatterActionManager
|
||||||
from src.common.data_models.message_manager_data_model import StreamContext
|
from src.common.data_models.message_manager_data_model import StreamContext
|
||||||
from src.common.logger import get_logger
|
from src.common.logger import get_logger
|
||||||
@@ -15,6 +16,7 @@ class ChatterManager:
|
|||||||
self.action_manager = action_manager
|
self.action_manager = action_manager
|
||||||
self.chatter_classes: dict[ChatType, list[type]] = {}
|
self.chatter_classes: dict[ChatType, list[type]] = {}
|
||||||
self.instances: dict[str, BaseChatter] = {}
|
self.instances: dict[str, BaseChatter] = {}
|
||||||
|
self._processing_tasks: dict[str, asyncio.Task] = {}
|
||||||
|
|
||||||
# 管理器统计
|
# 管理器统计
|
||||||
self.stats = {
|
self.stats = {
|
||||||
@@ -155,3 +157,11 @@ class ChatterManager:
|
|||||||
"successful_executions": 0,
|
"successful_executions": 0,
|
||||||
"failed_executions": 0,
|
"failed_executions": 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def set_processing_task(self, stream_id: str, task: asyncio.Task):
|
||||||
|
"""设置流的处理任务"""
|
||||||
|
self._processing_tasks[stream_id] = task
|
||||||
|
|
||||||
|
def get_processing_task(self, stream_id: str) -> asyncio.Task | None:
|
||||||
|
"""获取流的处理任务"""
|
||||||
|
return self._processing_tasks.get(stream_id)
|
||||||
|
|||||||
@@ -301,6 +301,11 @@ class StreamLoopManager:
|
|||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.info(f"流循环被取消: {stream_id}")
|
logger.info(f"流循环被取消: {stream_id}")
|
||||||
|
if self.chatter_manager:
|
||||||
|
task = self.chatter_manager.get_processing_task(stream_id)
|
||||||
|
if task and not task.done():
|
||||||
|
task.cancel()
|
||||||
|
logger.debug(f"已取消 chatter 处理任务: {stream_id}")
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True)
|
logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True)
|
||||||
@@ -388,8 +393,9 @@ class StreamLoopManager:
|
|||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
|
||||||
# 直接调用chatter_manager处理流上下文
|
# 直接调用chatter_manager处理流上下文
|
||||||
context.processing_task = asyncio.create_task(self.chatter_manager.process_stream_context(stream_id, context))
|
task = asyncio.create_task(self.chatter_manager.process_stream_context(stream_id, context))
|
||||||
results = await context.processing_task
|
self.chatter_manager.set_processing_task(stream_id, task)
|
||||||
|
results = await task
|
||||||
success = results.get("success", False)
|
success = results.get("success", False)
|
||||||
|
|
||||||
if success:
|
if success:
|
||||||
|
|||||||
Reference in New Issue
Block a user