diff --git a/src/chat/message_manager/scheduler_dispatcher.py b/src/chat/message_manager/scheduler_dispatcher.py index 3b7ddf7c5..5953c958d 100644 --- a/src/chat/message_manager/scheduler_dispatcher.py +++ b/src/chat/message_manager/scheduler_dispatcher.py @@ -461,9 +461,9 @@ class SchedulerDispatcher: if not success: self.stats["total_failures"] += 1 - # 处理完成后,创建新的 schedule(用锁保护,避免与打断冲突) + # 处理完成后,检查是否需要创建新的 schedule async with self._get_schedule_lock(stream_id): - # 再次检查是否已有 schedule(可能在处理期间被打断创建了新的) + # 检查是否已有 schedule(可能在处理期间被打断创建了新的) if stream_id in self.stream_schedules: logger.info( f"⚠️ 处理完成时发现已有新 schedule: 流={stream_id[:8]}..., " @@ -471,7 +471,24 @@ class SchedulerDispatcher: ) return - await self._create_schedule(stream_id, context) + # 检查缓存中是否有待处理的消息 + from src.chat.message_manager.message_manager import message_manager + + has_cached = message_manager.has_cached_messages(stream_id) + + if has_cached: + # 有缓存消息,立即创建新 schedule 继续处理 + logger.info( + f"🔁 处理完成但有缓存消息: 流={stream_id[:8]}..., " + f"立即创建新 schedule 继续处理" + ) + await self._create_schedule(stream_id, context) + else: + # 没有缓存消息,不创建 schedule,等待新消息到达 + logger.debug( + f"✅ 处理完成且无缓存消息: 流={stream_id[:8]}..., " + f"等待新消息到达" + ) except Exception as e: logger.error(f"Schedule 回调执行失败 {stream_id}: {e}", exc_info=True)