fix(scheduler): 优化调度创建逻辑,处理完成后检查缓存消息以决定是否创建新调度
This commit is contained in:
@@ -461,9 +461,9 @@ class SchedulerDispatcher:
|
|||||||
if not success:
|
if not success:
|
||||||
self.stats["total_failures"] += 1
|
self.stats["total_failures"] += 1
|
||||||
|
|
||||||
# 处理完成后,创建新的 schedule(用锁保护,避免与打断冲突)
|
# 处理完成后,检查是否需要创建新的 schedule
|
||||||
async with self._get_schedule_lock(stream_id):
|
async with self._get_schedule_lock(stream_id):
|
||||||
# 再次检查是否已有 schedule(可能在处理期间被打断创建了新的)
|
# 检查是否已有 schedule(可能在处理期间被打断创建了新的)
|
||||||
if stream_id in self.stream_schedules:
|
if stream_id in self.stream_schedules:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"⚠️ 处理完成时发现已有新 schedule: 流={stream_id[:8]}..., "
|
f"⚠️ 处理完成时发现已有新 schedule: 流={stream_id[:8]}..., "
|
||||||
@@ -471,7 +471,24 @@ class SchedulerDispatcher:
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# 检查缓存中是否有待处理的消息
|
||||||
|
from src.chat.message_manager.message_manager import message_manager
|
||||||
|
|
||||||
|
has_cached = message_manager.has_cached_messages(stream_id)
|
||||||
|
|
||||||
|
if has_cached:
|
||||||
|
# 有缓存消息,立即创建新 schedule 继续处理
|
||||||
|
logger.info(
|
||||||
|
f"🔁 处理完成但有缓存消息: 流={stream_id[:8]}..., "
|
||||||
|
f"立即创建新 schedule 继续处理"
|
||||||
|
)
|
||||||
await self._create_schedule(stream_id, context)
|
await self._create_schedule(stream_id, context)
|
||||||
|
else:
|
||||||
|
# 没有缓存消息,不创建 schedule,等待新消息到达
|
||||||
|
logger.debug(
|
||||||
|
f"✅ 处理完成且无缓存消息: 流={stream_id[:8]}..., "
|
||||||
|
f"等待新消息到达"
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Schedule 回调执行失败 {stream_id}: {e}", exc_info=True)
|
logger.error(f"Schedule 回调执行失败 {stream_id}: {e}", exc_info=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user