From 99f71d1bcfb5d80ede7bc32531e44047320bdfac Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Tue, 4 Nov 2025 23:44:27 +0800 Subject: [PATCH] =?UTF-8?q?fix(scheduler):=20=E6=B7=BB=E5=8A=A0=E9=94=81?= =?UTF-8?q?=E6=9C=BA=E5=88=B6=E4=BB=A5=E9=81=BF=E5=85=8D=E8=B0=83=E5=BA=A6?= =?UTF-8?q?=E5=88=9B=E5=BB=BA/=E5=88=A0=E9=99=A4=E7=9A=84=E7=AB=9E?= =?UTF-8?q?=E6=80=81=E6=9D=A1=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message_manager/scheduler_dispatcher.py | 101 ++++++++++++++---- 1 file changed, 78 insertions(+), 23 deletions(-) diff --git a/src/chat/message_manager/scheduler_dispatcher.py b/src/chat/message_manager/scheduler_dispatcher.py index dec8ee940..3b7ddf7c5 100644 --- a/src/chat/message_manager/scheduler_dispatcher.py +++ b/src/chat/message_manager/scheduler_dispatcher.py @@ -33,6 +33,9 @@ class SchedulerDispatcher: # 追踪每个流的 schedule_id self.stream_schedules: dict[str, str] = {} # stream_id -> schedule_id + # 用于保护 schedule 创建/删除的锁,避免竞态条件 + self.schedule_locks: dict[str, asyncio.Lock] = {} # stream_id -> Lock + # Chatter 管理器 self.chatter_manager: ChatterManager | None = None @@ -81,6 +84,12 @@ class SchedulerDispatcher: """设置 Chatter 管理器""" self.chatter_manager = chatter_manager logger.debug(f"设置 Chatter 管理器: {chatter_manager.__class__.__name__}") + + def _get_schedule_lock(self, stream_id: str) -> asyncio.Lock: + """获取流的 schedule 锁""" + if stream_id not in self.schedule_locks: + self.schedule_locks[stream_id] = asyncio.Lock() + return self.schedule_locks[stream_id] async def on_message_received(self, stream_id: str) -> None: """消息接收时的处理逻辑 @@ -100,21 +109,27 @@ class SchedulerDispatcher: return # 2. 检查是否有活跃的 schedule - has_active_schedule = stream_id in self.stream_schedules + async with self._get_schedule_lock(stream_id): + has_active_schedule = stream_id in self.stream_schedules + + if has_active_schedule: + # 释放锁后再做打断检查(避免长时间持有锁) + pass + else: + # 4. 创建新的 schedule(在锁内,避免重复创建) + await self._create_schedule(stream_id, context) + return + # 3. 检查打断判定(锁外执行,避免阻塞) if has_active_schedule: - # 3. 检查打断判定 should_interrupt = await self._check_interruption(stream_id, context) if should_interrupt: - # 移除旧 schedule 并创建新的 + # 移除旧 schedule 并创建新的(内部有锁保护) await self._cancel_and_recreate_schedule(stream_id, context) logger.debug(f"⚡ 打断成功: 流={stream_id[:8]}..., 已重新创建 schedule") else: logger.debug(f"打断判定失败,保持原有 schedule: 流={stream_id[:8]}...") - else: - # 4. 创建新的 schedule - await self._create_schedule(stream_id, context) except Exception as e: logger.error(f"处理消息接收事件失败 {stream_id}: {e}", exc_info=True) @@ -217,21 +232,27 @@ class SchedulerDispatcher: stream_id: 流ID context: 流上下文 """ - # 移除旧的 schedule - old_schedule_id = self.stream_schedules.get(stream_id) + # 使用锁保护,避免与 _on_schedule_triggered 冲突 + async with self._get_schedule_lock(stream_id): + # 移除旧的 schedule + old_schedule_id = self.stream_schedules.get(stream_id) if old_schedule_id: success = await unified_scheduler.remove_schedule(old_schedule_id) if success: - logger.debug(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., ID={old_schedule_id[:8]}...") + logger.info(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., ID={old_schedule_id[:8]}...") self.stats["total_schedules_cancelled"] += 1 + # 只有成功移除后才从追踪中删除 + del self.stream_schedules[stream_id] else: - logger.warning(f"移除旧 schedule 失败: {stream_id}") + logger.error( + f"❌ 打断失败:无法移除旧 schedule: 流={stream_id[:8]}..., " + f"ID={old_schedule_id[:8]}..., 放弃创建新 schedule 避免重复" + ) + # 移除失败,不创建新 schedule,避免重复 + return - # 从追踪中删除 - del self.stream_schedules[stream_id] - - # 创建新的 schedule,使用即时处理模式(极短延迟) - await self._create_schedule(stream_id, context, immediate_mode=True) + # 创建新的 schedule,使用即时处理模式(极短延迟) + await self._create_schedule(stream_id, context, immediate_mode=True) async def _create_schedule(self, stream_id: str, context: StreamContext, immediate_mode: bool = False) -> None: """为聊天流创建新的 schedule @@ -242,6 +263,16 @@ class SchedulerDispatcher: immediate_mode: 是否使用即时处理模式(打断时使用极短延迟) """ try: + # 检查是否已有活跃的 schedule,如果有则先移除 + if stream_id in self.stream_schedules: + old_schedule_id = self.stream_schedules[stream_id] + logger.warning( + f"⚠️ 流 {stream_id[:8]}... 已有活跃 schedule {old_schedule_id[:8]}..., " + f"这不应该发生,将先移除旧 schedule" + ) + await unified_scheduler.remove_schedule(old_schedule_id) + del self.stream_schedules[stream_id] + # 如果是即时处理模式(打断时),使用固定的1秒延迟立即重新处理 if immediate_mode: delay = 1.0 # 硬编码1秒延迟,确保打断后能快速重新处理 @@ -271,10 +302,19 @@ class SchedulerDispatcher: self.stats["total_schedules_created"] += 1 mode_indicator = "⚡打断" if immediate_mode else "📅常规" - logger.debug( + + # 获取调用栈信息,帮助追踪重复创建的问题 + import traceback + caller_info = "" + stack = traceback.extract_stack() + if len(stack) >= 2: + caller_frame = stack[-2] + caller_info = f", 调用自={caller_frame.name}" + + logger.info( f"{mode_indicator} 创建 schedule: 流={stream_id[:8]}..., " f"延迟={delay:.3f}s, 未读={unread_count}, " - f"ID={schedule_id[:8]}..." + f"ID={schedule_id[:8]}...{caller_info}" ) except Exception as e: @@ -391,10 +431,16 @@ class SchedulerDispatcher: stream_id: 流ID """ try: - logger.info(f"⏰ Schedule 触发: 流={stream_id[:8]}..., 开始处理消息") + # 使用锁保护,避免与打断逻辑冲突 + async with self._get_schedule_lock(stream_id): + # 从追踪中移除(因为是一次性任务) + old_schedule_id = self.stream_schedules.pop(stream_id, None) - # 从追踪中移除(因为是一次性任务) - self.stream_schedules.pop(stream_id, None) + logger.info( + f"⏰ Schedule 触发: 流={stream_id[:8]}..., " + f"ID={old_schedule_id[:8] if old_schedule_id else 'None'}..., " + f"开始处理消息" + ) # 获取流上下文 context = await self._get_stream_context(stream_id) @@ -407,7 +453,7 @@ class SchedulerDispatcher: logger.debug(f"流 {stream_id} 没有未读消息,跳过处理") return - # 激活 chatter 处理 + # 激活 chatter 处理(不需要锁,允许并发处理) success = await self._process_stream(stream_id, context) # 更新统计 @@ -415,8 +461,17 @@ class SchedulerDispatcher: if not success: self.stats["total_failures"] += 1 - # 处理完成后,创建新的 schedule - await self._create_schedule(stream_id, context) + # 处理完成后,创建新的 schedule(用锁保护,避免与打断冲突) + async with self._get_schedule_lock(stream_id): + # 再次检查是否已有 schedule(可能在处理期间被打断创建了新的) + if stream_id in self.stream_schedules: + logger.info( + f"⚠️ 处理完成时发现已有新 schedule: 流={stream_id[:8]}..., " + f"可能是打断创建的,跳过创建新 schedule" + ) + return + + await self._create_schedule(stream_id, context) except Exception as e: logger.error(f"Schedule 回调执行失败 {stream_id}: {e}", exc_info=True)