feat(chat): 增强任务管理和取消机制
添加新的任务管理方法来更好地处理异步任务的生命周期: - 新增 cancel_processing_task、remove_processing_task 等方法 - 在流循环清理时自动取消关联的 chatter 处理任务 - 添加活跃任务统计信息到管理器统计中 - 改进 prompt 构建时的任务类型检查和错误处理
This commit is contained in:
@@ -141,12 +141,16 @@ class ChatterManager:
|
|||||||
self.stats["failed_executions"] += 1
|
self.stats["failed_executions"] += 1
|
||||||
logger.error(f"处理流 {stream_id} 时发生错误: {e}")
|
logger.error(f"处理流 {stream_id} 时发生错误: {e}")
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
|
# 无论成功还是失败,都要清理处理任务记录
|
||||||
|
self.remove_processing_task(stream_id)
|
||||||
|
|
||||||
def get_stats(self) -> dict[str, Any]:
|
def get_stats(self) -> dict[str, Any]:
|
||||||
"""获取管理器统计信息"""
|
"""获取管理器统计信息"""
|
||||||
stats = self.stats.copy()
|
stats = self.stats.copy()
|
||||||
stats["active_instances"] = len(self.instances)
|
stats["active_instances"] = len(self.instances)
|
||||||
stats["registered_chatter_types"] = len(self.chatter_classes)
|
stats["registered_chatter_types"] = len(self.chatter_classes)
|
||||||
|
stats["active_processing_tasks"] = len(self.get_active_processing_tasks())
|
||||||
return stats
|
return stats
|
||||||
|
|
||||||
def reset_stats(self):
|
def reset_stats(self):
|
||||||
@@ -165,3 +169,66 @@ class ChatterManager:
|
|||||||
def get_processing_task(self, stream_id: str) -> asyncio.Task | None:
|
def get_processing_task(self, stream_id: str) -> asyncio.Task | None:
|
||||||
"""获取流的处理任务"""
|
"""获取流的处理任务"""
|
||||||
return self._processing_tasks.get(stream_id)
|
return self._processing_tasks.get(stream_id)
|
||||||
|
|
||||||
|
def cancel_processing_task(self, stream_id: str) -> bool:
|
||||||
|
"""取消流的处理任务
|
||||||
|
|
||||||
|
Args:
|
||||||
|
stream_id: 流ID
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: 是否成功取消了任务
|
||||||
|
"""
|
||||||
|
task = self._processing_tasks.get(stream_id)
|
||||||
|
if task and not task.done():
|
||||||
|
try:
|
||||||
|
task.cancel()
|
||||||
|
logger.info(f"已取消流 {stream_id} 的处理任务")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"取消流 {stream_id} 的处理任务时出错: {e}")
|
||||||
|
return False
|
||||||
|
return False
|
||||||
|
|
||||||
|
def remove_processing_task(self, stream_id: str) -> None:
|
||||||
|
"""移除流的处理任务记录
|
||||||
|
|
||||||
|
Args:
|
||||||
|
stream_id: 流ID
|
||||||
|
"""
|
||||||
|
if stream_id in self._processing_tasks:
|
||||||
|
del self._processing_tasks[stream_id]
|
||||||
|
logger.debug(f"已移除流 {stream_id} 的处理任务记录")
|
||||||
|
|
||||||
|
def get_active_processing_tasks(self) -> dict[str, asyncio.Task]:
|
||||||
|
"""获取所有活跃的处理任务
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict[str, asyncio.Task]: 流ID到处理任务的映射
|
||||||
|
"""
|
||||||
|
# 过滤掉已完成的任务
|
||||||
|
active_tasks = {}
|
||||||
|
for stream_id, task in self._processing_tasks.items():
|
||||||
|
if not task.done():
|
||||||
|
active_tasks[stream_id] = task
|
||||||
|
else:
|
||||||
|
logger.debug(f"清理已完成的处理任务: {stream_id}")
|
||||||
|
del self._processing_tasks[stream_id]
|
||||||
|
|
||||||
|
return active_tasks
|
||||||
|
|
||||||
|
async def cancel_all_processing_tasks(self) -> int:
|
||||||
|
"""取消所有活跃的处理任务
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: 成功取消的任务数量
|
||||||
|
"""
|
||||||
|
active_tasks = self.get_active_processing_tasks()
|
||||||
|
cancelled_count = 0
|
||||||
|
|
||||||
|
for stream_id, task in active_tasks.items():
|
||||||
|
if self.cancel_processing_task(stream_id):
|
||||||
|
cancelled_count += 1
|
||||||
|
|
||||||
|
logger.info(f"已取消 {cancelled_count} 个活跃处理任务")
|
||||||
|
return cancelled_count
|
||||||
|
|||||||
@@ -67,12 +67,6 @@ class StreamLoopManager:
|
|||||||
self.is_running = False
|
self.is_running = False
|
||||||
|
|
||||||
# 取消所有流循环
|
# 取消所有流循环
|
||||||
try:
|
|
||||||
# 使用带超时的锁获取,避免无限等待
|
|
||||||
lock_acquired = await asyncio.wait_for(self.loop_lock.acquire(), timeout=10.0)
|
|
||||||
if not lock_acquired:
|
|
||||||
logger.error("停止管理器时获取锁超时")
|
|
||||||
else:
|
|
||||||
try:
|
try:
|
||||||
# 创建任务列表以便并发取消
|
# 创建任务列表以便并发取消
|
||||||
cancel_tasks = []
|
cancel_tasks = []
|
||||||
@@ -89,12 +83,16 @@ class StreamLoopManager:
|
|||||||
return_exceptions=True
|
return_exceptions=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# 取消所有活跃的 chatter 处理任务
|
||||||
|
if self.chatter_manager:
|
||||||
|
try:
|
||||||
|
cancelled_count = await self.chatter_manager.cancel_all_processing_tasks()
|
||||||
|
logger.info(f"已取消 {cancelled_count} 个活跃的 chatter 处理任务")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"取消 chatter 处理任务时出错: {e}")
|
||||||
|
|
||||||
self.stream_loops.clear()
|
self.stream_loops.clear()
|
||||||
logger.info("所有流循环已清理")
|
logger.info("所有流循环已清理")
|
||||||
finally:
|
|
||||||
self.loop_lock.release()
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
logger.error("停止管理器时获取锁超时")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"停止管理器时出错: {e}")
|
logger.error(f"停止管理器时出错: {e}")
|
||||||
|
|
||||||
@@ -193,31 +191,15 @@ class StreamLoopManager:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}")
|
logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}")
|
||||||
|
|
||||||
try:
|
# 取消关联的 chatter 处理任务
|
||||||
# 双重检查:在获取锁后再次检查流是否存在
|
if self.chatter_manager:
|
||||||
if stream_id not in self.stream_loops:
|
cancelled = self.chatter_manager.cancel_processing_task(stream_id)
|
||||||
logger.debug(f"流 {stream_id} 循环不存在(双重检查)")
|
if cancelled:
|
||||||
return False
|
logger.info(f"已取消关联的 chatter 处理任务: {stream_id}")
|
||||||
|
|
||||||
task = self.stream_loops[stream_id]
|
|
||||||
if not task.done():
|
|
||||||
task.cancel()
|
|
||||||
try:
|
|
||||||
# 设置取消超时,避免无限等待
|
|
||||||
await asyncio.wait_for(task, timeout=5.0)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
logger.debug(f"流循环任务已取消: {stream_id}")
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
logger.warning(f"流循环任务取消超时: {stream_id}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}")
|
|
||||||
|
|
||||||
del self.stream_loops[stream_id]
|
del self.stream_loops[stream_id]
|
||||||
logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})")
|
logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})")
|
||||||
return True
|
return True
|
||||||
finally:
|
|
||||||
# 确保锁被释放
|
|
||||||
self.loop_lock.release()
|
|
||||||
|
|
||||||
async def _stream_loop(self, stream_id: str) -> None:
|
async def _stream_loop(self, stream_id: str) -> None:
|
||||||
"""单个流的无限循环
|
"""单个流的无限循环
|
||||||
@@ -278,10 +260,12 @@ class StreamLoopManager:
|
|||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.info(f"流循环被取消: {stream_id}")
|
logger.info(f"流循环被取消: {stream_id}")
|
||||||
if self.chatter_manager:
|
if self.chatter_manager:
|
||||||
task = self.chatter_manager.get_processing_task(stream_id)
|
# 使用 ChatterManager 的新方法取消处理任务
|
||||||
if task and not task.done():
|
cancelled = self.chatter_manager.cancel_processing_task(stream_id)
|
||||||
task.cancel()
|
if cancelled:
|
||||||
logger.debug(f"已取消 chatter 处理任务: {stream_id}")
|
logger.info(f"成功取消 chatter 处理任务: {stream_id}")
|
||||||
|
else:
|
||||||
|
logger.debug(f"没有需要取消的 chatter 处理任务: {stream_id}")
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True)
|
logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True)
|
||||||
|
|||||||
@@ -397,9 +397,15 @@ class Prompt:
|
|||||||
task_timeout = task_timeouts.get(task_name, 2.0) # 默认2秒
|
task_timeout = task_timeouts.get(task_name, 2.0) # 默认2秒
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# 确保任务是一个协程对象
|
||||||
|
if asyncio.iscoroutine(task):
|
||||||
result = await asyncio.wait_for(task, timeout=task_timeout)
|
result = await asyncio.wait_for(task, timeout=task_timeout)
|
||||||
results.append(result)
|
results.append(result)
|
||||||
logger.debug(f"构建任务{task_name}完成 ({task_timeout}s)")
|
logger.debug(f"构建任务{task_name}完成 ({task_timeout}s)")
|
||||||
|
else:
|
||||||
|
logger.warning(f"任务{task_name}不是协程对象,类型: {type(task)},跳过处理")
|
||||||
|
default_result = self._get_default_result_for_task(task_name)
|
||||||
|
results.append(default_result)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
logger.warning(f"构建任务{task_name}超时 ({task_timeout}s),使用默认值")
|
logger.warning(f"构建任务{task_name}超时 ({task_timeout}s),使用默认值")
|
||||||
# 为超时任务提供默认值
|
# 为超时任务提供默认值
|
||||||
|
|||||||
Reference in New Issue
Block a user