feat(chat): 增强任务管理和取消机制
添加新的任务管理方法来更好地处理异步任务的生命周期: - 新增 cancel_processing_task、remove_processing_task 等方法 - 在流循环清理时自动取消关联的 chatter 处理任务 - 添加活跃任务统计信息到管理器统计中 - 改进 prompt 构建时的任务类型检查和错误处理
This commit is contained in:
@@ -83,6 +83,14 @@ class StreamLoopManager:
|
||||
return_exceptions=True
|
||||
)
|
||||
|
||||
# 取消所有活跃的 chatter 处理任务
|
||||
if self.chatter_manager:
|
||||
try:
|
||||
cancelled_count = await self.chatter_manager.cancel_all_processing_tasks()
|
||||
logger.info(f"已取消 {cancelled_count} 个活跃的 chatter 处理任务")
|
||||
except Exception as e:
|
||||
logger.error(f"取消 chatter 处理任务时出错: {e}")
|
||||
|
||||
self.stream_loops.clear()
|
||||
logger.info("所有流循环已清理")
|
||||
except Exception as e:
|
||||
@@ -183,6 +191,12 @@ class StreamLoopManager:
|
||||
except Exception as e:
|
||||
logger.error(f"等待流循环任务结束时出错: {stream_id} - {e}")
|
||||
|
||||
# 取消关联的 chatter 处理任务
|
||||
if self.chatter_manager:
|
||||
cancelled = self.chatter_manager.cancel_processing_task(stream_id)
|
||||
if cancelled:
|
||||
logger.info(f"已取消关联的 chatter 处理任务: {stream_id}")
|
||||
|
||||
del self.stream_loops[stream_id]
|
||||
logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})")
|
||||
return True
|
||||
@@ -246,10 +260,12 @@ class StreamLoopManager:
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"流循环被取消: {stream_id}")
|
||||
if self.chatter_manager:
|
||||
task = self.chatter_manager.get_processing_task(stream_id)
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
logger.debug(f"已取消 chatter 处理任务: {stream_id}")
|
||||
# 使用 ChatterManager 的新方法取消处理任务
|
||||
cancelled = self.chatter_manager.cancel_processing_task(stream_id)
|
||||
if cancelled:
|
||||
logger.info(f"成功取消 chatter 处理任务: {stream_id}")
|
||||
else:
|
||||
logger.debug(f"没有需要取消的 chatter 处理任务: {stream_id}")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True)
|
||||
|
||||
Reference in New Issue
Block a user