diff --git a/src/schedule/unified_scheduler.py b/src/schedule/unified_scheduler.py index 2c085bccf..30eaf61b1 100644 --- a/src/schedule/unified_scheduler.py +++ b/src/schedule/unified_scheduler.py @@ -43,6 +43,7 @@ class SchedulerConfig: task_default_timeout: float = 300.0 # 默认任务超时(5分钟) task_cancel_timeout: float = 10.0 # 任务取消超时(10秒) shutdown_timeout: float = 30.0 # 关闭超时(30秒) + deadlock_threshold: float = 600.0 # 死锁检测阈值(10分钟,超过此时间视为死锁) # 并发控制 max_concurrent_tasks: int = 100 # 最大并发任务数 @@ -257,7 +258,8 @@ class DeadlockDetector: for task_id, (start_time, task_name) in list(self._monitored_tasks.items()): runtime = current_time - start_time - if runtime > self.config.task_default_timeout: + # 使用死锁阈值而不是默认超时 + if runtime > self.config.deadlock_threshold: deadlocked.append((task_id, runtime, task_name)) return deadlocked @@ -502,13 +504,18 @@ class UnifiedScheduler: await asyncio.sleep(self.config.deadlock_check_interval) if not self._stopping: - await self._check_and_handle_deadlocks() + # 使用 create_task 避免阻塞循环,并限制错误传播 + asyncio.create_task( + self._safe_check_and_handle_deadlocks(), + name="deadlock_check" + ) except asyncio.CancelledError: logger.debug("死锁检测循环被取消") break except Exception as e: logger.error(f"死锁检测循环发生错误: {e}", exc_info=True) + # 继续运行,不因单次错误停止 async def _cleanup_loop(self) -> None: """清理循环:定期清理已完成的任务""" @@ -831,6 +838,15 @@ class UnifiedScheduler: # ==================== 死锁检测和处理 ==================== + async def _safe_check_and_handle_deadlocks(self) -> None: + """安全地检查并处理死锁任务(带错误隔离)""" + try: + await self._check_and_handle_deadlocks() + except RecursionError: + logger.error("死锁检测发生递归错误,跳过本轮检测") + except Exception as e: + logger.error(f"死锁检测处理失败: {e}", exc_info=True) + async def _check_and_handle_deadlocks(self) -> None: """检查并处理死锁任务""" deadlocked = self._deadlock_detector.check_deadlocks() @@ -852,8 +868,15 @@ class UnifiedScheduler: f"运行时间={runtime:.1f}秒, 健康度={health:.2f}" ) - # 尝试取消任务 - await self._cancel_task(task, reason="deadlock detected") + # 尝试取消任务(每个取消操作独立处理错误) + try: + await self._cancel_task(task, reason="deadlock detected") + except Exception as e: + logger.error(f"取消任务 {task_name} 时出错: {e}", exc_info=True) + # 强制清理 + task._asyncio_task = None + task.status = TaskStatus.CANCELLED + self._deadlock_detector.unregister_task(task_id) async def _cancel_task(self, task: ScheduleTask, reason: str = "manual") -> bool: """取消正在运行的任务(多级超时机制)""" @@ -865,25 +888,31 @@ class UnifiedScheduler: # 第一阶段:发送取消信号 task._asyncio_task.cancel() - # 第二阶段:渐进式等待 + # 第二阶段:渐进式等待(使用 asyncio.wait 避免递归) timeouts = [1.0, 3.0, 5.0, 10.0] for i, timeout in enumerate(timeouts): try: - await asyncio.wait_for(task._asyncio_task, timeout=timeout) - logger.debug(f"任务 {task.task_name} 在阶段 {i+1} 成功取消") - return True - except asyncio.TimeoutError: + # 使用 asyncio.wait 代替 wait_for,避免重新抛出异常 + done, pending = await asyncio.wait( + {task._asyncio_task}, + timeout=timeout + ) + + if done: + # 任务已完成(可能是正常完成或被取消) + logger.debug(f"任务 {task.task_name} 在阶段 {i+1} 成功停止") + return True + + # 超时:继续下一阶段或放弃 if i < len(timeouts) - 1: logger.warning(f"任务 {task.task_name} 取消阶段 {i+1} 超时,继续等待...") continue else: logger.error(f"任务 {task.task_name} 取消失败,强制清理") break - except asyncio.CancelledError: - logger.debug(f"任务 {task.task_name} 已被取消") - return True + except Exception as e: - logger.error(f"取消任务 {task.task_name} 时发生异常: {e}") + logger.error(f"取消任务 {task.task_name} 时发生异常: {e}", exc_info=True) return False # 第三阶段:强制清理