feat(chat): 改进异步任务管理和取消处理

- 在 StreamLoopManager、MessageManager 和 DefaultReplyer 中增加子任务跟踪机制
- 统一处理 asyncio.CancelledError 异常,确保任务取消时能正确清理子任务
- 使用 child_tasks 集合管理子任务生命周期,防止任务泄漏
- 优化记忆存储等后台任务的创建方式,支持优雅取消
- 改进错误处理逻辑,确保异常情况下也能清理子任务资源
This commit is contained in:
Windpicker-owo
2025-10-23 19:31:23 +08:00
parent 5b29399d94
commit ca69b8affe
3 changed files with 92 additions and 30 deletions

View File

@@ -435,7 +435,7 @@ class MessageManager:
await self._trigger_reprocess(chat_stream)
async def _trigger_reprocess(self, chat_stream: ChatStream):
"""重新处理聊天流的核心逻辑"""
"""重新处理聊天流的核心逻辑 - 支持子任务管理"""
try:
stream_id = chat_stream.stream_id
@@ -455,38 +455,45 @@ class MessageManager:
logger.info(f"💬 开始重新处理 {len(unread_messages)} 条未读消息: {stream_id}")
# 创建新的处理任务
# 创建处理任务并使用try-catch实现子任务管理
task = asyncio.create_task(
self.chatter_manager.process_stream_context(stream_id, context),
self._managed_reprocess_with_cleanup(stream_id, context),
name=f"reprocess_{stream_id}_{int(time.time())}"
)
# 设置处理任务
self.chatter_manager.set_processing_task(stream_id, task)
# 等待处理完成(使用超时防止无限等待)
try:
result = await asyncio.wait_for(task, timeout=30.0)
success = result.get("success", False)
actions_count = result.get("actions_count", 0)
if success:
logger.info(f"✅ 聊天流 {stream_id} 重新处理成功: 执行了 {actions_count} 个动作")
else:
logger.warning(f"❌ 聊天流 {stream_id} 重新处理失败")
except asyncio.TimeoutError:
logger.warning(f"⏰ 聊天流 {stream_id} 重新处理超时")
if not task.done():
task.cancel()
except Exception as e:
logger.error(f"💥 聊天流 {stream_id} 重新处理出错: {e}")
if not task.done():
task.cancel()
# 等待完成,让它异步执行
# 如果需要等待,调用者会等待 chatter_manager.process_stream_context
except Exception as e:
logger.error(f"🚨 触发重新处理时出错: {e}")
async def _managed_reprocess_with_cleanup(self, stream_id: str, context):
"""带清理功能的重新处理"""
child_tasks = set() # 跟踪子任务
try:
# 处理流上下文
result = await self.chatter_manager.process_stream_context(stream_id, context)
return result
except asyncio.CancelledError:
logger.info(f"重新处理任务被取消: {stream_id}")
# 取消所有子任务
for child_task in child_tasks:
if not child_task.done():
child_task.cancel()
raise
except Exception as e:
logger.error(f"重新处理任务执行出错: {stream_id} - {e}")
# 清理子任务
for child_task in child_tasks:
if not child_task.done():
child_task.cancel()
raise
async def clear_all_unread_messages(self, stream_id: str):
"""清除指定上下文中的所有未读消息,在消息处理完成后调用"""
try: