fix(stream): 修复 Chatter 处理标志的假死状态并增强并发保护

This commit is contained in:
Windpicker-owo
2025-12-02 23:11:29 +08:00
parent 9a5ae357b5
commit d5e6746a21

View File

@@ -321,6 +321,9 @@ class StreamLoopManager:
# 🔒 并发保护:如果 Chatter 正在处理中,跳过本轮 # 🔒 并发保护:如果 Chatter 正在处理中,跳过本轮
# 这可能发生在1) 打断后重启循环 2) 处理时间超过轮询间隔 # 这可能发生在1) 打断后重启循环 2) 处理时间超过轮询间隔
if context.is_chatter_processing: if context.is_chatter_processing:
if self._recover_stale_chatter_state(stream_id, context):
logger.warning(f"🔄 [流工作器] stream={stream_id[:8]}, 处理标志疑似残留,已尝试自动修复")
else:
logger.debug(f"🔒 [流工作器] stream={stream_id[:8]}, Chatter正在处理中跳过本轮") logger.debug(f"🔒 [流工作器] stream={stream_id[:8]}, Chatter正在处理中跳过本轮")
# 不打印"开始处理"日志,直接进入下一轮等待 # 不打印"开始处理"日志,直接进入下一轮等待
# 使用较短的等待时间,等待当前处理完成 # 使用较短的等待时间,等待当前处理完成
@@ -529,6 +532,21 @@ class StreamLoopManager:
name=f"chatter_process_{stream_id}" name=f"chatter_process_{stream_id}"
) )
# 记录任务句柄,便于后续检测/自愈
context.processing_task = chatter_task
def _cleanup_processing_flag(task: asyncio.Task) -> None:
try:
context.processing_task = None
if context.is_chatter_processing:
context.is_chatter_processing = False
self._set_stream_processing_status(stream_id, False)
logger.debug(f"🔄 [并发保护] stream={stream_id[:8]}, chatter任务结束自动清理处理标志")
except Exception as callback_error:
logger.debug(f"清理chatter处理标志失败: {callback_error}")
chatter_task.add_done_callback(_cleanup_processing_flag)
# 等待 chatter 任务完成 # 等待 chatter 任务完成
results = await chatter_task results = await chatter_task
success = results.get("success", False) success = results.get("success", False)
@@ -550,6 +568,7 @@ class StreamLoopManager:
finally: finally:
# 清除 Chatter 处理标志 # 清除 Chatter 处理标志
context.is_chatter_processing = False context.is_chatter_processing = False
context.processing_task = None
logger.debug(f"清除 Chatter 处理标志: {stream_id}") logger.debug(f"清除 Chatter 处理标志: {stream_id}")
# 无论成功或失败,都要设置处理状态为未处理 # 无论成功或失败,都要设置处理状态为未处理
@@ -759,6 +778,35 @@ class StreamLoopManager:
logger.debug(f"{stream_id} 使用默认间隔: {base_interval:.2f}s ({e})") logger.debug(f"{stream_id} 使用默认间隔: {base_interval:.2f}s ({e})")
return base_interval return base_interval
def _recover_stale_chatter_state(self, stream_id: str, context: "StreamContext") -> bool:
"""
检测并修复 Chatter 处理标志的假死状态。
返回 True 表示已发现并修复了异常状态False 表示未发现异常。
"""
try:
processing_task = getattr(context, "processing_task", None)
# 标志为 True 但没有任务句柄,直接修复
if processing_task is None:
context.is_chatter_processing = False
self._set_stream_processing_status(stream_id, False)
logger.warning(f"🛠️ [自愈] stream={stream_id[:8]}, 发现无任务但标志为真,已重置")
return True
# 标志为 True 但任务已经结束/被取消
if processing_task.done():
context.is_chatter_processing = False
context.processing_task = None
self._set_stream_processing_status(stream_id, False)
logger.warning(f"🛠️ [自愈] stream={stream_id[:8]}, 任务已结束但标志未清,已重置")
return True
return False
except Exception as e:
logger.debug(f"检测 Chatter 状态异常失败: stream={stream_id}, error={e}")
return False
def get_queue_status(self) -> dict[str, Any]: def get_queue_status(self) -> dict[str, Any]:
"""获取队列状态 """获取队列状态