fix(scheduler): 添加锁机制以避免调度创建/删除的竞态条件
This commit is contained in:
@@ -33,6 +33,9 @@ class SchedulerDispatcher:
|
|||||||
# 追踪每个流的 schedule_id
|
# 追踪每个流的 schedule_id
|
||||||
self.stream_schedules: dict[str, str] = {} # stream_id -> schedule_id
|
self.stream_schedules: dict[str, str] = {} # stream_id -> schedule_id
|
||||||
|
|
||||||
|
# 用于保护 schedule 创建/删除的锁,避免竞态条件
|
||||||
|
self.schedule_locks: dict[str, asyncio.Lock] = {} # stream_id -> Lock
|
||||||
|
|
||||||
# Chatter 管理器
|
# Chatter 管理器
|
||||||
self.chatter_manager: ChatterManager | None = None
|
self.chatter_manager: ChatterManager | None = None
|
||||||
|
|
||||||
@@ -81,6 +84,12 @@ class SchedulerDispatcher:
|
|||||||
"""设置 Chatter 管理器"""
|
"""设置 Chatter 管理器"""
|
||||||
self.chatter_manager = chatter_manager
|
self.chatter_manager = chatter_manager
|
||||||
logger.debug(f"设置 Chatter 管理器: {chatter_manager.__class__.__name__}")
|
logger.debug(f"设置 Chatter 管理器: {chatter_manager.__class__.__name__}")
|
||||||
|
|
||||||
|
def _get_schedule_lock(self, stream_id: str) -> asyncio.Lock:
|
||||||
|
"""获取流的 schedule 锁"""
|
||||||
|
if stream_id not in self.schedule_locks:
|
||||||
|
self.schedule_locks[stream_id] = asyncio.Lock()
|
||||||
|
return self.schedule_locks[stream_id]
|
||||||
|
|
||||||
async def on_message_received(self, stream_id: str) -> None:
|
async def on_message_received(self, stream_id: str) -> None:
|
||||||
"""消息接收时的处理逻辑
|
"""消息接收时的处理逻辑
|
||||||
@@ -100,21 +109,27 @@ class SchedulerDispatcher:
|
|||||||
return
|
return
|
||||||
|
|
||||||
# 2. 检查是否有活跃的 schedule
|
# 2. 检查是否有活跃的 schedule
|
||||||
has_active_schedule = stream_id in self.stream_schedules
|
async with self._get_schedule_lock(stream_id):
|
||||||
|
has_active_schedule = stream_id in self.stream_schedules
|
||||||
|
|
||||||
|
if has_active_schedule:
|
||||||
|
# 释放锁后再做打断检查(避免长时间持有锁)
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# 4. 创建新的 schedule(在锁内,避免重复创建)
|
||||||
|
await self._create_schedule(stream_id, context)
|
||||||
|
return
|
||||||
|
|
||||||
|
# 3. 检查打断判定(锁外执行,避免阻塞)
|
||||||
if has_active_schedule:
|
if has_active_schedule:
|
||||||
# 3. 检查打断判定
|
|
||||||
should_interrupt = await self._check_interruption(stream_id, context)
|
should_interrupt = await self._check_interruption(stream_id, context)
|
||||||
|
|
||||||
if should_interrupt:
|
if should_interrupt:
|
||||||
# 移除旧 schedule 并创建新的
|
# 移除旧 schedule 并创建新的(内部有锁保护)
|
||||||
await self._cancel_and_recreate_schedule(stream_id, context)
|
await self._cancel_and_recreate_schedule(stream_id, context)
|
||||||
logger.debug(f"⚡ 打断成功: 流={stream_id[:8]}..., 已重新创建 schedule")
|
logger.debug(f"⚡ 打断成功: 流={stream_id[:8]}..., 已重新创建 schedule")
|
||||||
else:
|
else:
|
||||||
logger.debug(f"打断判定失败,保持原有 schedule: 流={stream_id[:8]}...")
|
logger.debug(f"打断判定失败,保持原有 schedule: 流={stream_id[:8]}...")
|
||||||
else:
|
|
||||||
# 4. 创建新的 schedule
|
|
||||||
await self._create_schedule(stream_id, context)
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"处理消息接收事件失败 {stream_id}: {e}", exc_info=True)
|
logger.error(f"处理消息接收事件失败 {stream_id}: {e}", exc_info=True)
|
||||||
@@ -217,21 +232,27 @@ class SchedulerDispatcher:
|
|||||||
stream_id: 流ID
|
stream_id: 流ID
|
||||||
context: 流上下文
|
context: 流上下文
|
||||||
"""
|
"""
|
||||||
# 移除旧的 schedule
|
# 使用锁保护,避免与 _on_schedule_triggered 冲突
|
||||||
old_schedule_id = self.stream_schedules.get(stream_id)
|
async with self._get_schedule_lock(stream_id):
|
||||||
|
# 移除旧的 schedule
|
||||||
|
old_schedule_id = self.stream_schedules.get(stream_id)
|
||||||
if old_schedule_id:
|
if old_schedule_id:
|
||||||
success = await unified_scheduler.remove_schedule(old_schedule_id)
|
success = await unified_scheduler.remove_schedule(old_schedule_id)
|
||||||
if success:
|
if success:
|
||||||
logger.debug(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., ID={old_schedule_id[:8]}...")
|
logger.info(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., ID={old_schedule_id[:8]}...")
|
||||||
self.stats["total_schedules_cancelled"] += 1
|
self.stats["total_schedules_cancelled"] += 1
|
||||||
|
# 只有成功移除后才从追踪中删除
|
||||||
|
del self.stream_schedules[stream_id]
|
||||||
else:
|
else:
|
||||||
logger.warning(f"移除旧 schedule 失败: {stream_id}")
|
logger.error(
|
||||||
|
f"❌ 打断失败:无法移除旧 schedule: 流={stream_id[:8]}..., "
|
||||||
|
f"ID={old_schedule_id[:8]}..., 放弃创建新 schedule 避免重复"
|
||||||
|
)
|
||||||
|
# 移除失败,不创建新 schedule,避免重复
|
||||||
|
return
|
||||||
|
|
||||||
# 从追踪中删除
|
# 创建新的 schedule,使用即时处理模式(极短延迟)
|
||||||
del self.stream_schedules[stream_id]
|
await self._create_schedule(stream_id, context, immediate_mode=True)
|
||||||
|
|
||||||
# 创建新的 schedule,使用即时处理模式(极短延迟)
|
|
||||||
await self._create_schedule(stream_id, context, immediate_mode=True)
|
|
||||||
|
|
||||||
async def _create_schedule(self, stream_id: str, context: StreamContext, immediate_mode: bool = False) -> None:
|
async def _create_schedule(self, stream_id: str, context: StreamContext, immediate_mode: bool = False) -> None:
|
||||||
"""为聊天流创建新的 schedule
|
"""为聊天流创建新的 schedule
|
||||||
@@ -242,6 +263,16 @@ class SchedulerDispatcher:
|
|||||||
immediate_mode: 是否使用即时处理模式(打断时使用极短延迟)
|
immediate_mode: 是否使用即时处理模式(打断时使用极短延迟)
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
# 检查是否已有活跃的 schedule,如果有则先移除
|
||||||
|
if stream_id in self.stream_schedules:
|
||||||
|
old_schedule_id = self.stream_schedules[stream_id]
|
||||||
|
logger.warning(
|
||||||
|
f"⚠️ 流 {stream_id[:8]}... 已有活跃 schedule {old_schedule_id[:8]}..., "
|
||||||
|
f"这不应该发生,将先移除旧 schedule"
|
||||||
|
)
|
||||||
|
await unified_scheduler.remove_schedule(old_schedule_id)
|
||||||
|
del self.stream_schedules[stream_id]
|
||||||
|
|
||||||
# 如果是即时处理模式(打断时),使用固定的1秒延迟立即重新处理
|
# 如果是即时处理模式(打断时),使用固定的1秒延迟立即重新处理
|
||||||
if immediate_mode:
|
if immediate_mode:
|
||||||
delay = 1.0 # 硬编码1秒延迟,确保打断后能快速重新处理
|
delay = 1.0 # 硬编码1秒延迟,确保打断后能快速重新处理
|
||||||
@@ -271,10 +302,19 @@ class SchedulerDispatcher:
|
|||||||
self.stats["total_schedules_created"] += 1
|
self.stats["total_schedules_created"] += 1
|
||||||
|
|
||||||
mode_indicator = "⚡打断" if immediate_mode else "📅常规"
|
mode_indicator = "⚡打断" if immediate_mode else "📅常规"
|
||||||
logger.debug(
|
|
||||||
|
# 获取调用栈信息,帮助追踪重复创建的问题
|
||||||
|
import traceback
|
||||||
|
caller_info = ""
|
||||||
|
stack = traceback.extract_stack()
|
||||||
|
if len(stack) >= 2:
|
||||||
|
caller_frame = stack[-2]
|
||||||
|
caller_info = f", 调用自={caller_frame.name}"
|
||||||
|
|
||||||
|
logger.info(
|
||||||
f"{mode_indicator} 创建 schedule: 流={stream_id[:8]}..., "
|
f"{mode_indicator} 创建 schedule: 流={stream_id[:8]}..., "
|
||||||
f"延迟={delay:.3f}s, 未读={unread_count}, "
|
f"延迟={delay:.3f}s, 未读={unread_count}, "
|
||||||
f"ID={schedule_id[:8]}..."
|
f"ID={schedule_id[:8]}...{caller_info}"
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -391,10 +431,16 @@ class SchedulerDispatcher:
|
|||||||
stream_id: 流ID
|
stream_id: 流ID
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
logger.info(f"⏰ Schedule 触发: 流={stream_id[:8]}..., 开始处理消息")
|
# 使用锁保护,避免与打断逻辑冲突
|
||||||
|
async with self._get_schedule_lock(stream_id):
|
||||||
|
# 从追踪中移除(因为是一次性任务)
|
||||||
|
old_schedule_id = self.stream_schedules.pop(stream_id, None)
|
||||||
|
|
||||||
# 从追踪中移除(因为是一次性任务)
|
logger.info(
|
||||||
self.stream_schedules.pop(stream_id, None)
|
f"⏰ Schedule 触发: 流={stream_id[:8]}..., "
|
||||||
|
f"ID={old_schedule_id[:8] if old_schedule_id else 'None'}..., "
|
||||||
|
f"开始处理消息"
|
||||||
|
)
|
||||||
|
|
||||||
# 获取流上下文
|
# 获取流上下文
|
||||||
context = await self._get_stream_context(stream_id)
|
context = await self._get_stream_context(stream_id)
|
||||||
@@ -407,7 +453,7 @@ class SchedulerDispatcher:
|
|||||||
logger.debug(f"流 {stream_id} 没有未读消息,跳过处理")
|
logger.debug(f"流 {stream_id} 没有未读消息,跳过处理")
|
||||||
return
|
return
|
||||||
|
|
||||||
# 激活 chatter 处理
|
# 激活 chatter 处理(不需要锁,允许并发处理)
|
||||||
success = await self._process_stream(stream_id, context)
|
success = await self._process_stream(stream_id, context)
|
||||||
|
|
||||||
# 更新统计
|
# 更新统计
|
||||||
@@ -415,8 +461,17 @@ class SchedulerDispatcher:
|
|||||||
if not success:
|
if not success:
|
||||||
self.stats["total_failures"] += 1
|
self.stats["total_failures"] += 1
|
||||||
|
|
||||||
# 处理完成后,创建新的 schedule
|
# 处理完成后,创建新的 schedule(用锁保护,避免与打断冲突)
|
||||||
await self._create_schedule(stream_id, context)
|
async with self._get_schedule_lock(stream_id):
|
||||||
|
# 再次检查是否已有 schedule(可能在处理期间被打断创建了新的)
|
||||||
|
if stream_id in self.stream_schedules:
|
||||||
|
logger.info(
|
||||||
|
f"⚠️ 处理完成时发现已有新 schedule: 流={stream_id[:8]}..., "
|
||||||
|
f"可能是打断创建的,跳过创建新 schedule"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
await self._create_schedule(stream_id, context)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Schedule 回调执行失败 {stream_id}: {e}", exc_info=True)
|
logger.error(f"Schedule 回调执行失败 {stream_id}: {e}", exc_info=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user