feat(chatter_manager): 清理 processing_message_id 以防止重复回复检测失效

feat(distribution_manager): 添加子任务跟踪和取消逻辑,优化流处理
feat(default_generator): 优化 respond 和 reply 模式下的消息处理逻辑
feat(affinity_chatter): 处理取消异常时清理 processing_message_id
feat(planner): 确保在规划流程取消时清理 processing_message_id
This commit is contained in:
Windpicker-owo
2025-11-10 17:12:20 +08:00
parent 861cc18e7d
commit aab3f19f10
5 changed files with 124 additions and 23 deletions

View File

@@ -332,8 +332,9 @@ class StreamLoopManager:
# 设置处理状态为正在处理
self._set_stream_processing_status(stream_id, True)
# 子任务跟踪
# 子任务跟踪 - 存储到 context 中以便打断时可以访问
child_tasks = set()
context._active_process_tasks = child_tasks # 临时存储子任务集合
try:
start_time = time.time()
@@ -362,8 +363,16 @@ class StreamLoopManager:
context.is_chatter_processing = True
logger.debug(f"设置 Chatter 处理标志: {stream_id}")
# 直接调用chatter_manager处理流上下文
results = await self.chatter_manager.process_stream_context(stream_id, context)
# 创建 chatter 处理任务,以便可以在打断时取消
chatter_task = asyncio.create_task(
self.chatter_manager.process_stream_context(stream_id, context),
name=f"chatter_process_{stream_id}"
)
child_tasks.add(chatter_task)
context._chatter_task = chatter_task # 保存主 chatter 任务引用
# 等待 chatter 任务完成
results = await chatter_task
success = results.get("success", False)
if success:
@@ -380,11 +389,25 @@ class StreamLoopManager:
return success
except asyncio.CancelledError:
logger.debug(f"流处理被取消: {stream_id}")
# 取消所有子任务
for child_task in child_tasks:
logger.info(f"流处理被取消,正在清理所有子任务: {stream_id}")
# 取消所有子任务(包括 chatter 任务和其后台任务)
cancel_count = 0
for child_task in list(child_tasks): # 使用 list() 创建副本避免迭代时修改
if not child_task.done():
child_task.cancel()
cancel_count += 1
if cancel_count > 0:
logger.info(f"已取消 {cancel_count} 个子任务: {stream_id}")
# 等待所有子任务真正结束(设置短超时)
try:
await asyncio.wait_for(
asyncio.gather(*child_tasks, return_exceptions=True),
timeout=1.0
)
except asyncio.TimeoutError:
logger.warning(f"等待子任务取消超时: {stream_id}")
raise
except Exception as e:
logger.error(f"流处理异常: {stream_id} - {e}", exc_info=True)
@@ -398,6 +421,12 @@ class StreamLoopManager:
context.is_chatter_processing = False
logger.debug(f"清除 Chatter 处理标志: {stream_id}")
# 清理临时存储的任务引用
if hasattr(context, '_active_process_tasks'):
delattr(context, '_active_process_tasks')
if hasattr(context, '_chatter_task'):
delattr(context, '_chatter_task')
# 无论成功或失败,都要设置处理状态为未处理
self._set_stream_processing_status(stream_id, False)