fix(scheduler): 添加锁机制以避免调度创建/删除的竞态条件

This commit is contained in:
Windpicker-owo
2025-11-04 23:44:27 +08:00
parent a349809c5d
commit 99f71d1bcf

View File

@@ -33,6 +33,9 @@ class SchedulerDispatcher:
# 追踪每个流的 schedule_id
self.stream_schedules: dict[str, str] = {} # stream_id -> schedule_id
# 用于保护 schedule 创建/删除的锁,避免竞态条件
self.schedule_locks: dict[str, asyncio.Lock] = {} # stream_id -> Lock
# Chatter 管理器
self.chatter_manager: ChatterManager | None = None
@@ -81,6 +84,12 @@ class SchedulerDispatcher:
"""设置 Chatter 管理器"""
self.chatter_manager = chatter_manager
logger.debug(f"设置 Chatter 管理器: {chatter_manager.__class__.__name__}")
def _get_schedule_lock(self, stream_id: str) -> asyncio.Lock:
"""获取流的 schedule 锁"""
if stream_id not in self.schedule_locks:
self.schedule_locks[stream_id] = asyncio.Lock()
return self.schedule_locks[stream_id]
async def on_message_received(self, stream_id: str) -> None:
"""消息接收时的处理逻辑
@@ -100,21 +109,27 @@ class SchedulerDispatcher:
return
# 2. 检查是否有活跃的 schedule
has_active_schedule = stream_id in self.stream_schedules
async with self._get_schedule_lock(stream_id):
has_active_schedule = stream_id in self.stream_schedules
if has_active_schedule:
# 释放锁后再做打断检查(避免长时间持有锁)
pass
else:
# 4. 创建新的 schedule在锁内避免重复创建
await self._create_schedule(stream_id, context)
return
# 3. 检查打断判定(锁外执行,避免阻塞)
if has_active_schedule:
# 3. 检查打断判定
should_interrupt = await self._check_interruption(stream_id, context)
if should_interrupt:
# 移除旧 schedule 并创建新的
# 移除旧 schedule 并创建新的(内部有锁保护)
await self._cancel_and_recreate_schedule(stream_id, context)
logger.debug(f"⚡ 打断成功: 流={stream_id[:8]}..., 已重新创建 schedule")
else:
logger.debug(f"打断判定失败,保持原有 schedule: 流={stream_id[:8]}...")
else:
# 4. 创建新的 schedule
await self._create_schedule(stream_id, context)
except Exception as e:
logger.error(f"处理消息接收事件失败 {stream_id}: {e}", exc_info=True)
@@ -217,21 +232,27 @@ class SchedulerDispatcher:
stream_id: 流ID
context: 流上下文
"""
# 移除旧的 schedule
old_schedule_id = self.stream_schedules.get(stream_id)
# 使用锁保护,避免与 _on_schedule_triggered 冲突
async with self._get_schedule_lock(stream_id):
# 移除旧的 schedule
old_schedule_id = self.stream_schedules.get(stream_id)
if old_schedule_id:
success = await unified_scheduler.remove_schedule(old_schedule_id)
if success:
logger.debug(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., ID={old_schedule_id[:8]}...")
logger.info(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., ID={old_schedule_id[:8]}...")
self.stats["total_schedules_cancelled"] += 1
# 只有成功移除后才从追踪中删除
del self.stream_schedules[stream_id]
else:
logger.warning(f"移除旧 schedule 失败: {stream_id}")
logger.error(
f"❌ 打断失败:无法移除旧 schedule: 流={stream_id[:8]}..., "
f"ID={old_schedule_id[:8]}..., 放弃创建新 schedule 避免重复"
)
# 移除失败,不创建新 schedule避免重复
return
# 从追踪中删除
del self.stream_schedules[stream_id]
# 创建新的 schedule使用即时处理模式极短延迟
await self._create_schedule(stream_id, context, immediate_mode=True)
# 创建新的 schedule使用即时处理模式极短延迟
await self._create_schedule(stream_id, context, immediate_mode=True)
async def _create_schedule(self, stream_id: str, context: StreamContext, immediate_mode: bool = False) -> None:
"""为聊天流创建新的 schedule
@@ -242,6 +263,16 @@ class SchedulerDispatcher:
immediate_mode: 是否使用即时处理模式(打断时使用极短延迟)
"""
try:
# 检查是否已有活跃的 schedule如果有则先移除
if stream_id in self.stream_schedules:
old_schedule_id = self.stream_schedules[stream_id]
logger.warning(
f"⚠️ 流 {stream_id[:8]}... 已有活跃 schedule {old_schedule_id[:8]}..., "
f"这不应该发生,将先移除旧 schedule"
)
await unified_scheduler.remove_schedule(old_schedule_id)
del self.stream_schedules[stream_id]
# 如果是即时处理模式打断时使用固定的1秒延迟立即重新处理
if immediate_mode:
delay = 1.0 # 硬编码1秒延迟确保打断后能快速重新处理
@@ -271,10 +302,19 @@ class SchedulerDispatcher:
self.stats["total_schedules_created"] += 1
mode_indicator = "⚡打断" if immediate_mode else "📅常规"
logger.debug(
# 获取调用栈信息,帮助追踪重复创建的问题
import traceback
caller_info = ""
stack = traceback.extract_stack()
if len(stack) >= 2:
caller_frame = stack[-2]
caller_info = f", 调用自={caller_frame.name}"
logger.info(
f"{mode_indicator} 创建 schedule: 流={stream_id[:8]}..., "
f"延迟={delay:.3f}s, 未读={unread_count}, "
f"ID={schedule_id[:8]}..."
f"ID={schedule_id[:8]}...{caller_info}"
)
except Exception as e:
@@ -391,10 +431,16 @@ class SchedulerDispatcher:
stream_id: 流ID
"""
try:
logger.info(f"⏰ Schedule 触发: 流={stream_id[:8]}..., 开始处理消息")
# 使用锁保护,避免与打断逻辑冲突
async with self._get_schedule_lock(stream_id):
# 从追踪中移除(因为是一次性任务)
old_schedule_id = self.stream_schedules.pop(stream_id, None)
# 从追踪中移除(因为是一次性任务)
self.stream_schedules.pop(stream_id, None)
logger.info(
f"⏰ Schedule 触发: 流={stream_id[:8]}..., "
f"ID={old_schedule_id[:8] if old_schedule_id else 'None'}..., "
f"开始处理消息"
)
# 获取流上下文
context = await self._get_stream_context(stream_id)
@@ -407,7 +453,7 @@ class SchedulerDispatcher:
logger.debug(f"{stream_id} 没有未读消息,跳过处理")
return
# 激活 chatter 处理
# 激活 chatter 处理(不需要锁,允许并发处理)
success = await self._process_stream(stream_id, context)
# 更新统计
@@ -415,8 +461,17 @@ class SchedulerDispatcher:
if not success:
self.stats["total_failures"] += 1
# 处理完成后,创建新的 schedule
await self._create_schedule(stream_id, context)
# 处理完成后,创建新的 schedule(用锁保护,避免与打断冲突)
async with self._get_schedule_lock(stream_id):
# 再次检查是否已有 schedule可能在处理期间被打断创建了新的
if stream_id in self.stream_schedules:
logger.info(
f"⚠️ 处理完成时发现已有新 schedule: 流={stream_id[:8]}..., "
f"可能是打断创建的,跳过创建新 schedule"
)
return
await self._create_schedule(stream_id, context)
except Exception as e:
logger.error(f"Schedule 回调执行失败 {stream_id}: {e}", exc_info=True)