feat(scheduler): 新增死锁检测器,改进任务取消机制,优化异步通知
This commit is contained in:
@@ -188,18 +188,24 @@ class AdaptiveBatchScheduler:
|
||||
future = asyncio.get_event_loop().create_future()
|
||||
operation.future = future
|
||||
|
||||
should_execute_immediately = False
|
||||
total_queued = 0
|
||||
|
||||
async with self._lock:
|
||||
# 检查队列是否已满
|
||||
total_queued = sum(len(q) for q in self.operation_queues.values())
|
||||
if total_queued >= self.max_queue_size:
|
||||
# 队列满,直接执行(阻塞模式)
|
||||
logger.warning(f"队列已满({total_queued}),直接执行操作")
|
||||
await self._execute_operations([operation])
|
||||
should_execute_immediately = True
|
||||
else:
|
||||
# 添加到优先级队列
|
||||
self.operation_queues[operation.priority].append(operation)
|
||||
self.stats.total_operations += 1
|
||||
|
||||
# 🔧 修复:在锁外执行操作,避免死锁
|
||||
if should_execute_immediately:
|
||||
logger.warning(f"队列已满({total_queued}),直接执行操作")
|
||||
await self._execute_operations([operation])
|
||||
|
||||
return future
|
||||
|
||||
async def _scheduler_loop(self) -> None:
|
||||
|
||||
Reference in New Issue
Block a user