From 98bfa05b96d083a33a095fc6c84ca9ebde358118 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Sat, 8 Nov 2025 18:20:00 +0800 Subject: [PATCH] =?UTF-8?q?feat(scheduler):=20=E6=96=B0=E5=A2=9E=E6=AD=BB?= =?UTF-8?q?=E9=94=81=E6=A3=80=E6=B5=8B=E5=99=A8=EF=BC=8C=E6=94=B9=E8=BF=9B?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=8F=96=E6=B6=88=E6=9C=BA=E5=88=B6=EF=BC=8C?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=BC=82=E6=AD=A5=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../database/optimization/batch_scheduler.py | 12 +- src/plugin_system/core/event_manager.py | 5 +- src/schedule/unified_scheduler.py | 161 +++++++++++++++++- 3 files changed, 168 insertions(+), 10 deletions(-) diff --git a/src/common/database/optimization/batch_scheduler.py b/src/common/database/optimization/batch_scheduler.py index be095b46b..8f523f2b4 100644 --- a/src/common/database/optimization/batch_scheduler.py +++ b/src/common/database/optimization/batch_scheduler.py @@ -188,18 +188,24 @@ class AdaptiveBatchScheduler: future = asyncio.get_event_loop().create_future() operation.future = future + should_execute_immediately = False + total_queued = 0 + async with self._lock: # 检查队列是否已满 total_queued = sum(len(q) for q in self.operation_queues.values()) if total_queued >= self.max_queue_size: - # 队列满,直接执行(阻塞模式) - logger.warning(f"队列已满({total_queued}),直接执行操作") - await self._execute_operations([operation]) + should_execute_immediately = True else: # 添加到优先级队列 self.operation_queues[operation.priority].append(operation) self.stats.total_operations += 1 + # 🔧 修复:在锁外执行操作,避免死锁 + if should_execute_immediately: + logger.warning(f"队列已满({total_queued}),直接执行操作") + await self._execute_operations([operation]) + return future async def _scheduler_loop(self) -> None: diff --git a/src/plugin_system/core/event_manager.py b/src/plugin_system/core/event_manager.py index 4ad4bbdc9..1b1765ab9 100644 --- a/src/plugin_system/core/event_manager.py +++ b/src/plugin_system/core/event_manager.py @@ -320,10 +320,11 @@ class EventManager: logger.warning(f"插件 {permission_group} 没有权限触发事件 {event_name},已拒绝触发!") return None - # 通知 scheduler(如果已注册) + # 🔧 修复:异步通知 scheduler,避免阻塞当前事件流程 if hasattr(self, "_scheduler_callback") and self._scheduler_callback: try: - await self._scheduler_callback(event_name, params) + # 使用 create_task 异步执行,避免死锁 + asyncio.create_task(self._scheduler_callback(event_name, params)) except Exception as e: logger.error(f"调用 scheduler 回调时出错: {e}", exc_info=True) diff --git a/src/schedule/unified_scheduler.py b/src/schedule/unified_scheduler.py index 517538ae3..d24c67d9a 100644 --- a/src/schedule/unified_scheduler.py +++ b/src/schedule/unified_scheduler.py @@ -4,6 +4,7 @@ """ import asyncio +import time import uuid from collections.abc import Awaitable, Callable from datetime import datetime @@ -16,6 +17,57 @@ from src.plugin_system.base.component_types import EventType logger = get_logger("unified_scheduler") +class DeadlockDetector: + """死锁检测器 + + 用于检测长时间运行的任务,防止死锁 + """ + def __init__(self, deadlock_timeout: float = 300.0): + """ + Args: + deadlock_timeout: 死锁超时时间(秒),默认5分钟 + """ + self._task_start_times: dict[str, float] = {} + self._deadlock_timeout = deadlock_timeout + + def register_task_start(self, task_id: str) -> None: + """注册任务开始时间""" + self._task_start_times[task_id] = time.time() + + def unregister_task(self, task_id: str) -> None: + """取消注册任务""" + self._task_start_times.pop(task_id, None) + + def check_for_deadlocks(self) -> list[str]: + """检查可能的死锁任务 + + Returns: + List[str]: 可能死锁的任务ID列表 + """ + current_time = time.time() + deadlocked_tasks = [] + + for task_id, start_time in self._task_start_times.items(): + if current_time - start_time > self._deadlock_timeout: + deadlocked_tasks.append(task_id) + + return deadlocked_tasks + + def get_task_runtime(self, task_id: str) -> float: + """获取任务运行时间 + + Args: + task_id: 任务ID + + Returns: + float: 运行时间(秒),如果任务不存在返回0 + """ + start_time = self._task_start_times.get(task_id) + if start_time: + return time.time() - start_time + return 0.0 + + class TriggerType(Enum): """触发类型枚举""" @@ -73,6 +125,7 @@ class UnifiedScheduler: - 支持循环和一次性任务 - 提供任务管理API(创建、删除、强制触发等) - 与 event_manager 集成,统一事件管理 + - 内置死锁检测和恢复机制 """ def __init__(self): @@ -81,6 +134,9 @@ class UnifiedScheduler: self._check_task: asyncio.Task | None = None self._event_subscriptions: set[str] = set() # 追踪已订阅的事件 self._executing_tasks: dict[str, asyncio.Task] = {} # 追踪正在执行的任务 + # 🔧 新增:死锁检测器 + self._deadlock_detector = DeadlockDetector(deadlock_timeout=300.0) + self._deadlock_check_task: asyncio.Task | None = None # 移除锁机制,使用无锁设计(基于 asyncio 单线程特性) async def _handle_event_trigger(self, event_name: str | EventType, event_params: dict[str, Any]) -> None: @@ -117,6 +173,9 @@ class UnifiedScheduler: # 并发执行所有事件任务(无锁设计) execution_tasks = [] for task in event_tasks: + # 🔧 新增:在死锁检测器中注册任务开始 + self._deadlock_detector.register_task_start(task.schedule_id) + execution_task = asyncio.create_task( self._execute_event_task_callback(task, event_params), name=f"execute_event_{task.task_name}" @@ -132,6 +191,8 @@ class UnifiedScheduler: # 清理执行追踪 for task in event_tasks: self._executing_tasks.pop(task.schedule_id, None) + # 🔧 新增:从死锁检测器中移除任务 + self._deadlock_detector.unregister_task(task.schedule_id) # 收集需要移除的任务 tasks_to_remove = [] @@ -155,6 +216,8 @@ class UnifiedScheduler: self._running = True self._check_task = asyncio.create_task(self._check_loop()) + # 🔧 新增:启动死锁检测任务 + self._deadlock_check_task = asyncio.create_task(self._deadlock_check_loop()) # 注册回调到 event_manager try: @@ -173,6 +236,15 @@ class UnifiedScheduler: return self._running = False + + # 🔧 修复:停止死锁检测任务 + if self._deadlock_check_task: + self._deadlock_check_task.cancel() + try: + await self._deadlock_check_task + except asyncio.CancelledError: + pass + if self._check_task: self._check_task.cancel() try: @@ -216,6 +288,9 @@ class UnifiedScheduler: self._tasks.clear() self._event_subscriptions.clear() self._executing_tasks.clear() + # 🔧 新增:清理死锁检测器 + if hasattr(self, '_deadlock_detector'): + self._deadlock_detector._task_start_times.clear() async def _check_loop(self): """主循环:每秒检查一次所有任务""" @@ -230,6 +305,59 @@ class UnifiedScheduler: except Exception as e: logger.error(f"调度器检查循环发生错误: {e}", exc_info=True) + async def _deadlock_check_loop(self): + """死锁检测循环:每30秒检查一次是否有死锁任务""" + logger.debug("死锁检测循环已启动") + while self._running: + try: + await asyncio.sleep(30) + deadlocked_tasks = self._deadlock_detector.check_for_deadlocks() + + if deadlocked_tasks: + logger.warning(f"检测到 {len(deadlocked_tasks)} 个可能的死锁任务: {deadlocked_tasks}") + + # 尝试恢复死锁任务 + for schedule_id in deadlocked_tasks: + await self._handle_deadlocked_task(schedule_id) + + except asyncio.CancelledError: + logger.debug("死锁检测循环被取消") + break + except Exception as e: + logger.error(f"死锁检测循环发生错误: {e}", exc_info=True) + + async def _handle_deadlocked_task(self, schedule_id: str) -> None: + """处理死锁任务""" + task = self._tasks.get(schedule_id) + if not task: + # 任务不存在,清理检测器中的记录 + self._deadlock_detector.unregister_task(schedule_id) + return + + runtime = self._deadlock_detector.get_task_runtime(schedule_id) + logger.warning(f"任务 {task.task_name} 已运行 {runtime:.1f} 秒,可能已死锁") + + # 获取执行中的任务 + executing_task = self._executing_tasks.get(schedule_id) + if executing_task and not executing_task.done(): + # 强制取消任务 + logger.warning(f"强制取消死锁任务: {task.task_name}") + try: + executing_task.cancel() + # 等待任务取消,但使用较短的超时 + await asyncio.wait_for(executing_task, timeout=5.0) + logger.info(f"死锁任务 {task.task_name} 已成功取消") + except asyncio.TimeoutError: + logger.error(f"无法取消死锁任务 {task.task_name},可能需要重启系统") + except Exception as e: + logger.error(f"取消死锁任务 {task.task_name} 时发生错误: {e}") + + # 清理执行追踪 + self._executing_tasks.pop(schedule_id, None) + + # 从检测器中移除记录 + self._deadlock_detector.unregister_task(schedule_id) + async def _check_and_trigger_tasks(self): """检查并触发到期任务 @@ -268,6 +396,9 @@ class UnifiedScheduler: # 为每个任务创建独立的异步任务,确保并发执行 execution_tasks = [] for task in tasks_to_trigger: + # 🔧 新增:在死锁检测器中注册任务开始 + self._deadlock_detector.register_task_start(task.schedule_id) + execution_task = asyncio.create_task( self._execute_task_callback(task, current_time), name=f"execute_{task.task_name}" @@ -283,6 +414,8 @@ class UnifiedScheduler: # 清理执行追踪 for task in tasks_to_trigger: self._executing_tasks.pop(task.schedule_id, None) + # 🔧 新增:从死锁检测器中移除任务 + self._deadlock_detector.unregister_task(task.schedule_id) # 第三阶段:收集需要移除的任务并移除(无锁设计) tasks_to_remove = [] @@ -586,7 +719,7 @@ class UnifiedScheduler: return False async def remove_schedule(self, schedule_id: str) -> bool: - """移除调度任务(无锁设计) + """移除调度任务(改进的取消机制) 如果任务正在执行,会取消执行中的任务 """ @@ -598,16 +731,20 @@ class UnifiedScheduler: task = self._tasks[schedule_id] executing_task = self._executing_tasks.get(schedule_id) - # 取消正在执行的任务 + # 🔧 修复:改进任务取消机制,避免死锁 if executing_task and not executing_task.done(): logger.debug(f"取消正在执行的任务: {task.task_name}") try: executing_task.cancel() - await asyncio.wait_for(executing_task, 3) + # 使用更长的超时时间,并添加异常处理 + await asyncio.wait_for(executing_task, timeout=10.0) except asyncio.TimeoutError: - logger.warning(f"取消任务 {task.task_name} 超时,强制移除") + logger.warning(f"取消任务 {task.task_name} 超时,可能存在死锁风险") + # 不再强制移除,让任务自然完成 + return False except Exception as e: - logger.warning(f"取消任务 {task.task_name} 时发生错误: {e}") + logger.error(f"取消任务 {task.task_name} 时发生未预期的错误: {e}") + return False # 移除任务 await self._remove_task_internal(schedule_id) @@ -666,6 +803,10 @@ class UnifiedScheduler: # 清理已完成的任务 if executing_task and executing_task.done(): self._executing_tasks.pop(schedule_id, None) + self._deadlock_detector.unregister_task(schedule_id) + + # 🔧 新增:在死锁检测器中注册任务开始 + self._deadlock_detector.register_task_start(schedule_id) # 创建执行任务 execution_task = asyncio.create_task( @@ -683,6 +824,8 @@ class UnifiedScheduler: finally: # 清理执行追踪 self._executing_tasks.pop(schedule_id, None) + # 🔧 新增:从死锁检测器中移除任务 + self._deadlock_detector.unregister_task(schedule_id) async def pause_schedule(self, schedule_id: str) -> bool: """暂停任务(不删除)""" @@ -761,6 +904,12 @@ class UnifiedScheduler: "task_obj_name": executing_task.get_name() if hasattr(executing_task, 'get_name') else str(executing_task), }) + # 🔧 新增:获取死锁检测统计 + deadlock_stats = { + "monitored_tasks": len(self._deadlock_detector._task_start_times), + "deadlock_timeout": self._deadlock_detector._deadlock_timeout, + } + return { "is_running": self._running, "total_tasks": total_tasks, @@ -772,6 +921,8 @@ class UnifiedScheduler: "executing_tasks_info": executing_tasks_info, "tasks_by_type": tasks_by_type, "registered_events": list(self._event_subscriptions), + # 🔧 新增:死锁检测统计 + "deadlock_detection": deadlock_stats, }