From c6fe8636a0203034a314529e1bb158947b63bc9c Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Wed, 5 Nov 2025 00:00:38 +0800 Subject: [PATCH] =?UTF-8?q?fix(scheduler):=20=E4=BC=98=E5=8C=96=E8=B0=83?= =?UTF-8?q?=E5=BA=A6=E5=88=9B=E5=BB=BA=E9=80=BB=E8=BE=91=EF=BC=8C=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=AE=8C=E6=88=90=E5=90=8E=E6=A3=80=E6=9F=A5=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E6=B6=88=E6=81=AF=E4=BB=A5=E5=86=B3=E5=AE=9A=E6=98=AF?= =?UTF-8?q?=E5=90=A6=E5=88=9B=E5=BB=BA=E6=96=B0=E8=B0=83=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message_manager/scheduler_dispatcher.py | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/chat/message_manager/scheduler_dispatcher.py b/src/chat/message_manager/scheduler_dispatcher.py index 3b7ddf7c5..5953c958d 100644 --- a/src/chat/message_manager/scheduler_dispatcher.py +++ b/src/chat/message_manager/scheduler_dispatcher.py @@ -461,9 +461,9 @@ class SchedulerDispatcher: if not success: self.stats["total_failures"] += 1 - # 处理完成后,创建新的 schedule(用锁保护,避免与打断冲突) + # 处理完成后,检查是否需要创建新的 schedule async with self._get_schedule_lock(stream_id): - # 再次检查是否已有 schedule(可能在处理期间被打断创建了新的) + # 检查是否已有 schedule(可能在处理期间被打断创建了新的) if stream_id in self.stream_schedules: logger.info( f"⚠️ 处理完成时发现已有新 schedule: 流={stream_id[:8]}..., " @@ -471,7 +471,24 @@ class SchedulerDispatcher: ) return - await self._create_schedule(stream_id, context) + # 检查缓存中是否有待处理的消息 + from src.chat.message_manager.message_manager import message_manager + + has_cached = message_manager.has_cached_messages(stream_id) + + if has_cached: + # 有缓存消息,立即创建新 schedule 继续处理 + logger.info( + f"🔁 处理完成但有缓存消息: 流={stream_id[:8]}..., " + f"立即创建新 schedule 继续处理" + ) + await self._create_schedule(stream_id, context) + else: + # 没有缓存消息,不创建 schedule,等待新消息到达 + logger.debug( + f"✅ 处理完成且无缓存消息: 流={stream_id[:8]}..., " + f"等待新消息到达" + ) except Exception as e: logger.error(f"Schedule 回调执行失败 {stream_id}: {e}", exc_info=True)