feat(chat): 改进异步任务管理和取消处理
- 在 StreamLoopManager、MessageManager 和 DefaultReplyer 中增加子任务跟踪机制 - 统一处理 asyncio.CancelledError 异常,确保任务取消时能正确清理子任务 - 使用 child_tasks 集合管理子任务生命周期,防止任务泄漏 - 优化记忆存储等后台任务的创建方式,支持优雅取消 - 改进错误处理逻辑,确保异常情况下也能清理子任务资源
This commit is contained in:
@@ -434,7 +434,7 @@ class MessageManager:
|
||||
await self._trigger_reprocess(chat_stream)
|
||||
|
||||
async def _trigger_reprocess(self, chat_stream: ChatStream):
|
||||
"""重新处理聊天流的核心逻辑"""
|
||||
"""重新处理聊天流的核心逻辑 - 支持子任务管理"""
|
||||
try:
|
||||
stream_id = chat_stream.stream_id
|
||||
|
||||
@@ -454,38 +454,45 @@ class MessageManager:
|
||||
|
||||
logger.info(f"💬 开始重新处理 {len(unread_messages)} 条未读消息: {stream_id}")
|
||||
|
||||
# 创建新的处理任务
|
||||
# 创建处理任务并使用try-catch实现子任务管理
|
||||
task = asyncio.create_task(
|
||||
self.chatter_manager.process_stream_context(stream_id, context),
|
||||
self._managed_reprocess_with_cleanup(stream_id, context),
|
||||
name=f"reprocess_{stream_id}_{int(time.time())}"
|
||||
)
|
||||
|
||||
# 设置处理任务
|
||||
self.chatter_manager.set_processing_task(stream_id, task)
|
||||
|
||||
# 等待处理完成(使用超时防止无限等待)
|
||||
try:
|
||||
result = await asyncio.wait_for(task, timeout=30.0)
|
||||
success = result.get("success", False)
|
||||
actions_count = result.get("actions_count", 0)
|
||||
|
||||
if success:
|
||||
logger.info(f"✅ 聊天流 {stream_id} 重新处理成功: 执行了 {actions_count} 个动作")
|
||||
else:
|
||||
logger.warning(f"❌ 聊天流 {stream_id} 重新处理失败")
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"⏰ 聊天流 {stream_id} 重新处理超时")
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
except Exception as e:
|
||||
logger.error(f"💥 聊天流 {stream_id} 重新处理出错: {e}")
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
# 不等待完成,让它异步执行
|
||||
# 如果需要等待,调用者会等待 chatter_manager.process_stream_context
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"🚨 触发重新处理时出错: {e}")
|
||||
|
||||
async def _managed_reprocess_with_cleanup(self, stream_id: str, context):
|
||||
"""带清理功能的重新处理"""
|
||||
child_tasks = set() # 跟踪子任务
|
||||
|
||||
try:
|
||||
# 处理流上下文
|
||||
result = await self.chatter_manager.process_stream_context(stream_id, context)
|
||||
return result
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"重新处理任务被取消: {stream_id}")
|
||||
# 取消所有子任务
|
||||
for child_task in child_tasks:
|
||||
if not child_task.done():
|
||||
child_task.cancel()
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"重新处理任务执行出错: {stream_id} - {e}")
|
||||
# 清理子任务
|
||||
for child_task in child_tasks:
|
||||
if not child_task.done():
|
||||
child_task.cancel()
|
||||
raise
|
||||
|
||||
async def clear_all_unread_messages(self, stream_id: str):
|
||||
"""清除指定上下文中的所有未读消息,在消息处理完成后调用"""
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user