refactor(chat): 优化任务管理机制支持多重回复

重构聊天管理器的任务处理系统,将单一任务追踪改为支持多重回复的任务列表管理。

主要变更:
- 将 `_processing_tasks` 从单任务字典改为任务列表字典
- 新增 `add_processing_task` 和 `get_all_processing_tasks` 方法
- 增强 `cancel_all_stream_tasks` 方法支持批量取消
- 修复消息打断机制,确保取消所有相关任务
- 优化任务清理逻辑,自动移除已完成任务

这些改进使系统能够更好地处理并发回复场景,提高任务管理的灵活性和可靠性。
This commit is contained in:
Windpicker-owo
2025-10-07 17:10:26 +08:00
parent 38e4116c18
commit 11e50b6521
3 changed files with 107 additions and 41 deletions

View File

@@ -16,7 +16,8 @@ class ChatterManager:
self.action_manager = action_manager
self.chatter_classes: dict[ChatType, list[type]] = {}
self.instances: dict[str, BaseChatter] = {}
self._processing_tasks: dict[str, asyncio.Task] = {}
# 🌟 优化:统一任务追踪,支持多重回复
self._processing_tasks: dict[str, list[asyncio.Task]] = {}
# 管理器统计
self.stats = {
@@ -174,15 +175,71 @@ class ChatterManager:
}
def set_processing_task(self, stream_id: str, task: asyncio.Task):
"""设置流的处理任务"""
self._processing_tasks[stream_id] = task
"""设置流的主要处理任务"""
if stream_id not in self._processing_tasks:
self._processing_tasks[stream_id] = []
self._processing_tasks[stream_id].insert(0, task) # 主要任务放在第一位
logger.debug(f"设置流 {stream_id} 的主要处理任务")
def get_processing_task(self, stream_id: str) -> asyncio.Task | None:
"""获取流的处理任务"""
return self._processing_tasks.get(stream_id)
"""获取流的主要处理任务"""
tasks = self._processing_tasks.get(stream_id, [])
return tasks[0] if tasks and not tasks[0].done() else None
def add_processing_task(self, stream_id: str, task: asyncio.Task):
"""添加处理任务到流(支持多重回复)"""
if stream_id not in self._processing_tasks:
self._processing_tasks[stream_id] = []
self._processing_tasks[stream_id].append(task)
logger.debug(f"添加处理任务到流 {stream_id},当前任务数: {len(self._processing_tasks[stream_id])}")
def get_all_processing_tasks(self, stream_id: str) -> list[asyncio.Task]:
"""获取流的所有活跃处理任务"""
if stream_id not in self._processing_tasks:
return []
# 清理已完成的任务并返回活跃任务
active_tasks = [task for task in self._processing_tasks[stream_id] if not task.done()]
self._processing_tasks[stream_id] = active_tasks
if len(active_tasks) == 0:
del self._processing_tasks[stream_id]
return active_tasks
def cancel_all_stream_tasks(self, stream_id: str) -> int:
"""取消指定流的所有处理任务(包括多重回复)
Args:
stream_id: 流ID
Returns:
int: 成功取消的任务数量
"""
if stream_id not in self._processing_tasks:
return 0
tasks = self._processing_tasks[stream_id]
cancelled_count = 0
logger.info(f"开始取消流 {stream_id} 的所有处理任务,共 {len(tasks)}")
for task in tasks:
try:
if not task.done():
task.cancel()
cancelled_count += 1
logger.debug(f"成功取消任务 {task.get_name() if hasattr(task, 'get_name') else 'unnamed'}")
except Exception as e:
logger.warning(f"取消任务时出错: {e}")
# 清理任务记录
del self._processing_tasks[stream_id]
logger.info(f"{stream_id} 的任务取消完成,成功取消 {cancelled_count} 个任务")
return cancelled_count
def cancel_processing_task(self, stream_id: str) -> bool:
"""取消流的处理任务
"""取消流的主要处理任务
Args:
stream_id: 流ID
@@ -190,14 +247,14 @@ class ChatterManager:
Returns:
bool: 是否成功取消了任务
"""
task = self._processing_tasks.get(stream_id)
if task and not task.done():
main_task = self.get_processing_task(stream_id)
if main_task and not main_task.done():
try:
task.cancel()
logger.info(f"已取消流 {stream_id} 的处理任务")
main_task.cancel()
logger.info(f"已取消流 {stream_id}主要处理任务")
return True
except Exception as e:
logger.warning(f"取消流 {stream_id} 的处理任务时出错: {e}")
logger.warning(f"取消流 {stream_id}主要处理任务时出错: {e}")
return False
return False
@@ -209,22 +266,30 @@ class ChatterManager:
"""
if stream_id in self._processing_tasks:
del self._processing_tasks[stream_id]
logger.debug(f"已移除流 {stream_id} 的处理任务记录")
logger.debug(f"已移除流 {stream_id}所有处理任务记录")
def get_active_processing_tasks(self) -> dict[str, asyncio.Task]:
"""获取所有活跃的处理任务
"""获取所有活跃的主要处理任务
Returns:
Dict[str, asyncio.Task]: 流ID到处理任务的映射
Dict[str, asyncio.Task]: 流ID到主要处理任务的映射
"""
# 过滤掉已完成的任务
# 过滤掉已完成的任务,只返回主要任务
active_tasks = {}
for stream_id, task in self._processing_tasks.items():
if not task.done():
active_tasks[stream_id] = task
else:
logger.debug(f"清理已完成的处理任务: {stream_id}")
del self._processing_tasks[stream_id]
for stream_id, task_list in list(self._processing_tasks.items()):
if task_list:
main_task = task_list[0] # 获取主要任务
if not main_task.done():
active_tasks[stream_id] = main_task
else:
# 清理已完成的主要任务
task_list = [t for t in task_list if not t.done()]
if task_list:
self._processing_tasks[stream_id] = task_list
active_tasks[stream_id] = task_list[0] # 新的主要任务
else:
del self._processing_tasks[stream_id]
logger.debug(f"清理已完成的处理任务: {stream_id}")
return active_tasks