feat(stream_loop_manager): 优化流循环管理,增强日志记录和错误处理,添加并发保护机制

This commit is contained in:
Windpicker-owo
2025-11-10 20:03:01 +08:00
parent 80819ad150
commit c46df81bca

View File

@@ -124,24 +124,31 @@ class StreamLoopManager:
# 快速路径:如果流已存在且不是强制启动,无需处理
if not force and context.stream_loop_task and not context.stream_loop_task.done():
logger.debug(f"{stream_id} 循环已在运行")
logger.debug(f"🔄 [流循环] stream={stream_id[:8]}, 循环已在运行,跳过启动")
return True
# 如果是强制启动且任务仍在运行,先取消旧任务
if force and context.stream_loop_task and not context.stream_loop_task.done():
logger.debug(f"强制启动模式:先取消现有流循环任务: {stream_id}")
logger.warning(f"⚠️ [流循环] stream={stream_id[:8]}, 强制启动模式:先取消现有任务")
old_task = context.stream_loop_task
old_task.cancel()
try:
await asyncio.wait_for(old_task, timeout=2.0)
logger.debug(f"旧流循环任务已结束: {stream_id}")
logger.debug(f"✅ [流循环] stream={stream_id[:8]}, 旧任务已结束")
except (asyncio.TimeoutError, asyncio.CancelledError):
logger.debug(f"旧流循环任务已取消或超时: {stream_id}")
logger.debug(f"⏱️ [流循环] stream={stream_id[:8]}, 旧任务已取消或超时")
except Exception as e:
logger.warning(f"等待旧任务结束时出错: {e}")
logger.warning(f"❌ [流循环] stream={stream_id[:8]}, 等待旧任务结束时出错: {e}")
# 创建流循环任务
try:
# 检查是否有旧任务残留
if context.stream_loop_task and not context.stream_loop_task.done():
logger.error(f"🚨 [流循环] stream={stream_id[:8]}, 错误:旧任务仍在运行!这不应该发生!")
# 紧急取消
context.stream_loop_task.cancel()
await asyncio.sleep(0.1)
loop_task = asyncio.create_task(self._stream_loop_worker(stream_id), name=f"stream_loop_{stream_id}")
# 将任务记录到 StreamContext 中
@@ -151,11 +158,11 @@ class StreamLoopManager:
self.stats["active_streams"] += 1
self.stats["total_loops"] += 1
logger.debug(f"启动流循环任务: {stream_id}")
logger.info(f"🚀 [流循环] stream={stream_id[:8]}, 启动新的流循环任务任务ID: {id(loop_task)}")
return True
except Exception as e:
logger.error(f"启动流循环任务失败 {stream_id}: {e}")
logger.error(f"❌ [流循环] stream={stream_id[:8]}, 启动失败: {e}")
return False
async def stop_stream_loop(self, stream_id: str) -> bool:
@@ -203,7 +210,8 @@ class StreamLoopManager:
Args:
stream_id: 流ID
"""
logger.debug(f"流循环工作器启动: {stream_id}")
task_id = id(asyncio.current_task())
logger.info(f"🔄 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 启动")
try:
while self.is_running:
@@ -211,7 +219,7 @@ class StreamLoopManager:
# 1. 获取流上下文
context = await self._get_stream_context(stream_id)
if not context:
logger.warning(f"无法获取流上下文: {stream_id}")
logger.warning(f"⚠️ [流工作器] stream={stream_id[:8]}, 无法获取流上下文")
await asyncio.sleep(10.0)
continue
@@ -224,7 +232,9 @@ class StreamLoopManager:
if has_messages:
if force_dispatch:
logger.info("%s 未读消息 %d 条,触发强制分发", stream_id, unread_count)
logger.info(f"⚡ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 未读消息 {unread_count} 条,触发强制分发")
else:
logger.info(f"📨 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 开始处理消息")
# 3. 在处理前更新能量值(用于下次间隔计算)
try:
@@ -238,10 +248,10 @@ class StreamLoopManager:
# 更新统计
self.stats["total_process_cycles"] += 1
if success:
logger.debug(f"流处理成功: {stream_id}")
logger.info(f"✅ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理成功")
else:
self.stats["total_failures"] += 1
logger.warning(f"流处理失败: {stream_id}")
logger.warning(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理失败")
# 5. 计算下次检查间隔
interval = await self._calculate_interval(stream_id, has_messages)
@@ -255,10 +265,10 @@ class StreamLoopManager:
await asyncio.sleep(interval)
except asyncio.CancelledError:
logger.debug(f"流循环被取消: {stream_id}")
logger.info(f"🛑 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 被取消")
break
except Exception as e:
logger.error(f"流循环出错 {stream_id}: {e}", exc_info=True)
logger.error(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 出错: {e}", exc_info=True)
self.stats["total_failures"] += 1
await asyncio.sleep(5.0) # 错误时等待5秒再重试
@@ -268,14 +278,14 @@ class StreamLoopManager:
context = await self._get_stream_context(stream_id)
if context and context.stream_loop_task:
context.stream_loop_task = None
logger.debug(f"清理 StreamContext 中的流循环任务: {stream_id}")
logger.info(f"🧹 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 清理任务记录")
except Exception as e:
logger.debug(f"清理 StreamContext 任务记录失败: {e}")
# 清理间隔记录
self._last_intervals.pop(stream_id, None)
logger.debug(f"流循环结束: {stream_id}")
logger.info(f"🏁 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 循环结束")
async def _get_stream_context(self, stream_id: str) -> StreamContext | None:
"""获取流上下文
@@ -329,6 +339,11 @@ class StreamLoopManager:
logger.warning(f"Chatter管理器未设置: {stream_id}")
return False
# 🔒 防止并发处理:如果已经在处理中,直接返回
if context.is_chatter_processing:
logger.warning(f"🔒 [并发保护] stream={stream_id[:8]}, Chatter 正在处理中,跳过本次处理请求")
return False
# 设置处理状态为正在处理
self._set_stream_processing_status(stream_id, True)