Merge branch 'feature/kfc' of https://github.com/MoFox-Studio/MoFox-Core into feature/kfc
This commit is contained in:
@@ -52,6 +52,11 @@ class StreamLoopManager:
|
||||
|
||||
# 流循环启动锁:防止并发启动同一个流的多个循环任务
|
||||
self._stream_start_locks: dict[str, asyncio.Lock] = {}
|
||||
|
||||
# 死锁检测:记录每个流的最后活动时间
|
||||
self._stream_last_activity: dict[str, float] = {}
|
||||
self._deadlock_detector_task: asyncio.Task | None = None
|
||||
self._deadlock_threshold_seconds: float = 120.0 # 2分钟无活动视为可能死锁
|
||||
|
||||
logger.info(f"流循环管理器初始化完成 (最大并发流数: {self.max_concurrent_streams})")
|
||||
|
||||
@@ -62,6 +67,60 @@ class StreamLoopManager:
|
||||
return
|
||||
|
||||
self.is_running = True
|
||||
|
||||
# 启动死锁检测器
|
||||
self._deadlock_detector_task = asyncio.create_task(
|
||||
self._deadlock_detector_loop(),
|
||||
name="deadlock_detector"
|
||||
)
|
||||
logger.info("死锁检测器已启动")
|
||||
|
||||
async def _deadlock_detector_loop(self) -> None:
|
||||
"""死锁检测循环 - 定期检查所有流的活动状态"""
|
||||
while self.is_running:
|
||||
try:
|
||||
await asyncio.sleep(30.0) # 每30秒检查一次
|
||||
|
||||
current_time = time.time()
|
||||
suspected_deadlocks = []
|
||||
|
||||
# 检查所有活跃流的最后活动时间
|
||||
for stream_id, last_activity in list(self._stream_last_activity.items()):
|
||||
inactive_seconds = current_time - last_activity
|
||||
if inactive_seconds > self._deadlock_threshold_seconds:
|
||||
suspected_deadlocks.append((stream_id, inactive_seconds))
|
||||
|
||||
if suspected_deadlocks:
|
||||
logger.warning(
|
||||
f"🔴 [死锁检测] 发现 {len(suspected_deadlocks)} 个可能卡住的流:\n" +
|
||||
"\n".join([
|
||||
f" - stream={sid[:8]}, 无活动时间={inactive:.1f}s"
|
||||
for sid, inactive in suspected_deadlocks
|
||||
])
|
||||
)
|
||||
|
||||
# 打印当前所有 asyncio 任务的状态
|
||||
all_tasks = asyncio.all_tasks()
|
||||
stream_loop_tasks = [t for t in all_tasks if t.get_name().startswith("stream_loop_")]
|
||||
logger.warning(
|
||||
f"🔴 [死锁检测] 当前流循环任务状态:\n" +
|
||||
"\n".join([
|
||||
f" - {t.get_name()}: done={t.done()}, cancelled={t.cancelled()}"
|
||||
for t in stream_loop_tasks
|
||||
])
|
||||
)
|
||||
else:
|
||||
# 每5分钟报告一次正常状态
|
||||
if int(current_time) % 300 < 30:
|
||||
active_count = len(self._stream_last_activity)
|
||||
if active_count > 0:
|
||||
logger.info(f"🟢 [死锁检测] 所有 {active_count} 个流正常运行中")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("死锁检测器被取消")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"死锁检测器出错: {e}")
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""停止流循环管理器"""
|
||||
@@ -69,6 +128,15 @@ class StreamLoopManager:
|
||||
return
|
||||
|
||||
self.is_running = False
|
||||
|
||||
# 停止死锁检测器
|
||||
if self._deadlock_detector_task and not self._deadlock_detector_task.done():
|
||||
self._deadlock_detector_task.cancel()
|
||||
try:
|
||||
await self._deadlock_detector_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logger.info("死锁检测器已停止")
|
||||
|
||||
# 取消所有流循环
|
||||
try:
|
||||
@@ -214,11 +282,24 @@ class StreamLoopManager:
|
||||
"""
|
||||
task_id = id(asyncio.current_task())
|
||||
logger.info(f"🔄 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 启动")
|
||||
|
||||
# 死锁检测:记录循环次数和上次活动时间
|
||||
loop_count = 0
|
||||
|
||||
# 注册到活动跟踪
|
||||
self._stream_last_activity[stream_id] = time.time()
|
||||
|
||||
try:
|
||||
while self.is_running:
|
||||
loop_count += 1
|
||||
loop_start_time = time.time()
|
||||
|
||||
# 更新活动时间(死锁检测用)
|
||||
self._stream_last_activity[stream_id] = loop_start_time
|
||||
|
||||
try:
|
||||
# 1. 获取流上下文
|
||||
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 获取上下文...")
|
||||
context = await self._get_stream_context(stream_id)
|
||||
if not context:
|
||||
logger.warning(f"⚠️ [流工作器] stream={stream_id[:8]}, 无法获取流上下文")
|
||||
@@ -226,6 +307,7 @@ class StreamLoopManager:
|
||||
continue
|
||||
|
||||
# 2. 检查是否有消息需要处理
|
||||
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 刷新缓存消息...")
|
||||
await self._flush_cached_messages_to_unread(stream_id)
|
||||
unread_count = self._get_unread_count(context)
|
||||
force_dispatch = self._needs_force_dispatch_for_context(context, unread_count)
|
||||
@@ -245,11 +327,36 @@ class StreamLoopManager:
|
||||
logger.debug(f"更新流能量失败 {stream_id}: {e}")
|
||||
|
||||
# 4. 激活chatter处理
|
||||
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 开始chatter处理...")
|
||||
try:
|
||||
success = await asyncio.wait_for(self._process_stream_messages(stream_id, context), global_config.chat.thinking_timeout)
|
||||
# 在长时间处理期间定期更新活动时间,避免死锁检测误报
|
||||
async def process_with_activity_update():
|
||||
process_task = asyncio.create_task(
|
||||
self._process_stream_messages(stream_id, context)
|
||||
)
|
||||
activity_update_interval = 30.0 # 每30秒更新一次
|
||||
while not process_task.done():
|
||||
try:
|
||||
# 等待任务完成或超时
|
||||
await asyncio.wait_for(
|
||||
asyncio.shield(process_task),
|
||||
timeout=activity_update_interval
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
# 任务仍在运行,更新活动时间
|
||||
self._stream_last_activity[stream_id] = time.time()
|
||||
logger.debug(f"🔄 [流工作器] stream={stream_id[:8]}, 处理中,更新活动时间")
|
||||
return await process_task
|
||||
|
||||
success = await asyncio.wait_for(
|
||||
process_with_activity_update(),
|
||||
global_config.chat.thinking_timeout
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"⏱️ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理超时")
|
||||
success = False
|
||||
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, chatter处理完成, success={success}")
|
||||
|
||||
# 更新统计
|
||||
self.stats["total_process_cycles"] += 1
|
||||
if success:
|
||||
@@ -263,6 +370,7 @@ class StreamLoopManager:
|
||||
logger.warning(f"❌ [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 处理失败")
|
||||
|
||||
# 5. 计算下次检查间隔
|
||||
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count}, 计算间隔...")
|
||||
interval = await self._calculate_interval(stream_id, has_messages)
|
||||
|
||||
# 6. sleep等待下次检查
|
||||
@@ -271,7 +379,22 @@ class StreamLoopManager:
|
||||
if last_interval is None or abs(interval - last_interval) > 0.01:
|
||||
logger.info(f"流 {stream_id} 等待周期变化: {interval:.2f}s")
|
||||
self._last_intervals[stream_id] = interval
|
||||
await asyncio.sleep(interval)
|
||||
|
||||
loop_duration = time.time() - loop_start_time
|
||||
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count} 完成, 耗时={loop_duration:.2f}s, 即将sleep {interval:.2f}s")
|
||||
|
||||
# 使用分段sleep,每隔一段时间更新活动时间,避免死锁检测误报
|
||||
# 当间隔较长时(如等待用户回复),分段更新活动时间
|
||||
remaining_sleep = interval
|
||||
activity_update_interval = 30.0 # 每30秒更新一次活动时间
|
||||
while remaining_sleep > 0:
|
||||
sleep_chunk = min(remaining_sleep, activity_update_interval)
|
||||
await asyncio.sleep(sleep_chunk)
|
||||
remaining_sleep -= sleep_chunk
|
||||
# 更新活动时间,表明流仍在正常运行(只是在等待)
|
||||
self._stream_last_activity[stream_id] = time.time()
|
||||
|
||||
logger.debug(f"🔍 [流工作器] stream={stream_id[:8]}, 循环#{loop_count} sleep结束, 开始下一循环")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"🛑 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 被取消")
|
||||
@@ -293,6 +416,9 @@ class StreamLoopManager:
|
||||
|
||||
# 清理间隔记录
|
||||
self._last_intervals.pop(stream_id, None)
|
||||
|
||||
# 清理活动跟踪
|
||||
self._stream_last_activity.pop(stream_id, None)
|
||||
|
||||
logger.info(f"🏁 [流工作器] stream={stream_id[:8]}, 任务ID={task_id}, 循环结束")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user