feat(deadlock): 增加死锁检测阈值,优化死锁处理逻辑
This commit is contained in:
@@ -43,6 +43,7 @@ class SchedulerConfig:
|
|||||||
task_default_timeout: float = 300.0 # 默认任务超时(5分钟)
|
task_default_timeout: float = 300.0 # 默认任务超时(5分钟)
|
||||||
task_cancel_timeout: float = 10.0 # 任务取消超时(10秒)
|
task_cancel_timeout: float = 10.0 # 任务取消超时(10秒)
|
||||||
shutdown_timeout: float = 30.0 # 关闭超时(30秒)
|
shutdown_timeout: float = 30.0 # 关闭超时(30秒)
|
||||||
|
deadlock_threshold: float = 600.0 # 死锁检测阈值(10分钟,超过此时间视为死锁)
|
||||||
|
|
||||||
# 并发控制
|
# 并发控制
|
||||||
max_concurrent_tasks: int = 100 # 最大并发任务数
|
max_concurrent_tasks: int = 100 # 最大并发任务数
|
||||||
@@ -257,7 +258,8 @@ class DeadlockDetector:
|
|||||||
|
|
||||||
for task_id, (start_time, task_name) in list(self._monitored_tasks.items()):
|
for task_id, (start_time, task_name) in list(self._monitored_tasks.items()):
|
||||||
runtime = current_time - start_time
|
runtime = current_time - start_time
|
||||||
if runtime > self.config.task_default_timeout:
|
# 使用死锁阈值而不是默认超时
|
||||||
|
if runtime > self.config.deadlock_threshold:
|
||||||
deadlocked.append((task_id, runtime, task_name))
|
deadlocked.append((task_id, runtime, task_name))
|
||||||
|
|
||||||
return deadlocked
|
return deadlocked
|
||||||
@@ -502,13 +504,18 @@ class UnifiedScheduler:
|
|||||||
await asyncio.sleep(self.config.deadlock_check_interval)
|
await asyncio.sleep(self.config.deadlock_check_interval)
|
||||||
|
|
||||||
if not self._stopping:
|
if not self._stopping:
|
||||||
await self._check_and_handle_deadlocks()
|
# 使用 create_task 避免阻塞循环,并限制错误传播
|
||||||
|
asyncio.create_task(
|
||||||
|
self._safe_check_and_handle_deadlocks(),
|
||||||
|
name="deadlock_check"
|
||||||
|
)
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.debug("死锁检测循环被取消")
|
logger.debug("死锁检测循环被取消")
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"死锁检测循环发生错误: {e}", exc_info=True)
|
logger.error(f"死锁检测循环发生错误: {e}", exc_info=True)
|
||||||
|
# 继续运行,不因单次错误停止
|
||||||
|
|
||||||
async def _cleanup_loop(self) -> None:
|
async def _cleanup_loop(self) -> None:
|
||||||
"""清理循环:定期清理已完成的任务"""
|
"""清理循环:定期清理已完成的任务"""
|
||||||
@@ -831,6 +838,15 @@ class UnifiedScheduler:
|
|||||||
|
|
||||||
# ==================== 死锁检测和处理 ====================
|
# ==================== 死锁检测和处理 ====================
|
||||||
|
|
||||||
|
async def _safe_check_and_handle_deadlocks(self) -> None:
|
||||||
|
"""安全地检查并处理死锁任务(带错误隔离)"""
|
||||||
|
try:
|
||||||
|
await self._check_and_handle_deadlocks()
|
||||||
|
except RecursionError:
|
||||||
|
logger.error("死锁检测发生递归错误,跳过本轮检测")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"死锁检测处理失败: {e}", exc_info=True)
|
||||||
|
|
||||||
async def _check_and_handle_deadlocks(self) -> None:
|
async def _check_and_handle_deadlocks(self) -> None:
|
||||||
"""检查并处理死锁任务"""
|
"""检查并处理死锁任务"""
|
||||||
deadlocked = self._deadlock_detector.check_deadlocks()
|
deadlocked = self._deadlock_detector.check_deadlocks()
|
||||||
@@ -852,8 +868,15 @@ class UnifiedScheduler:
|
|||||||
f"运行时间={runtime:.1f}秒, 健康度={health:.2f}"
|
f"运行时间={runtime:.1f}秒, 健康度={health:.2f}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# 尝试取消任务
|
# 尝试取消任务(每个取消操作独立处理错误)
|
||||||
await self._cancel_task(task, reason="deadlock detected")
|
try:
|
||||||
|
await self._cancel_task(task, reason="deadlock detected")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"取消任务 {task_name} 时出错: {e}", exc_info=True)
|
||||||
|
# 强制清理
|
||||||
|
task._asyncio_task = None
|
||||||
|
task.status = TaskStatus.CANCELLED
|
||||||
|
self._deadlock_detector.unregister_task(task_id)
|
||||||
|
|
||||||
async def _cancel_task(self, task: ScheduleTask, reason: str = "manual") -> bool:
|
async def _cancel_task(self, task: ScheduleTask, reason: str = "manual") -> bool:
|
||||||
"""取消正在运行的任务(多级超时机制)"""
|
"""取消正在运行的任务(多级超时机制)"""
|
||||||
@@ -865,25 +888,31 @@ class UnifiedScheduler:
|
|||||||
# 第一阶段:发送取消信号
|
# 第一阶段:发送取消信号
|
||||||
task._asyncio_task.cancel()
|
task._asyncio_task.cancel()
|
||||||
|
|
||||||
# 第二阶段:渐进式等待
|
# 第二阶段:渐进式等待(使用 asyncio.wait 避免递归)
|
||||||
timeouts = [1.0, 3.0, 5.0, 10.0]
|
timeouts = [1.0, 3.0, 5.0, 10.0]
|
||||||
for i, timeout in enumerate(timeouts):
|
for i, timeout in enumerate(timeouts):
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(task._asyncio_task, timeout=timeout)
|
# 使用 asyncio.wait 代替 wait_for,避免重新抛出异常
|
||||||
logger.debug(f"任务 {task.task_name} 在阶段 {i+1} 成功取消")
|
done, pending = await asyncio.wait(
|
||||||
return True
|
{task._asyncio_task},
|
||||||
except asyncio.TimeoutError:
|
timeout=timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
if done:
|
||||||
|
# 任务已完成(可能是正常完成或被取消)
|
||||||
|
logger.debug(f"任务 {task.task_name} 在阶段 {i+1} 成功停止")
|
||||||
|
return True
|
||||||
|
|
||||||
|
# 超时:继续下一阶段或放弃
|
||||||
if i < len(timeouts) - 1:
|
if i < len(timeouts) - 1:
|
||||||
logger.warning(f"任务 {task.task_name} 取消阶段 {i+1} 超时,继续等待...")
|
logger.warning(f"任务 {task.task_name} 取消阶段 {i+1} 超时,继续等待...")
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
logger.error(f"任务 {task.task_name} 取消失败,强制清理")
|
logger.error(f"任务 {task.task_name} 取消失败,强制清理")
|
||||||
break
|
break
|
||||||
except asyncio.CancelledError:
|
|
||||||
logger.debug(f"任务 {task.task_name} 已被取消")
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"取消任务 {task.task_name} 时发生异常: {e}")
|
logger.error(f"取消任务 {task.task_name} 时发生异常: {e}", exc_info=True)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# 第三阶段:强制清理
|
# 第三阶段:强制清理
|
||||||
|
|||||||
Reference in New Issue
Block a user