feat(scheduler): 新增死锁检测器,改进任务取消机制,优化异步通知
This commit is contained in:
@@ -188,18 +188,24 @@ class AdaptiveBatchScheduler:
|
|||||||
future = asyncio.get_event_loop().create_future()
|
future = asyncio.get_event_loop().create_future()
|
||||||
operation.future = future
|
operation.future = future
|
||||||
|
|
||||||
|
should_execute_immediately = False
|
||||||
|
total_queued = 0
|
||||||
|
|
||||||
async with self._lock:
|
async with self._lock:
|
||||||
# 检查队列是否已满
|
# 检查队列是否已满
|
||||||
total_queued = sum(len(q) for q in self.operation_queues.values())
|
total_queued = sum(len(q) for q in self.operation_queues.values())
|
||||||
if total_queued >= self.max_queue_size:
|
if total_queued >= self.max_queue_size:
|
||||||
# 队列满,直接执行(阻塞模式)
|
should_execute_immediately = True
|
||||||
logger.warning(f"队列已满({total_queued}),直接执行操作")
|
|
||||||
await self._execute_operations([operation])
|
|
||||||
else:
|
else:
|
||||||
# 添加到优先级队列
|
# 添加到优先级队列
|
||||||
self.operation_queues[operation.priority].append(operation)
|
self.operation_queues[operation.priority].append(operation)
|
||||||
self.stats.total_operations += 1
|
self.stats.total_operations += 1
|
||||||
|
|
||||||
|
# 🔧 修复:在锁外执行操作,避免死锁
|
||||||
|
if should_execute_immediately:
|
||||||
|
logger.warning(f"队列已满({total_queued}),直接执行操作")
|
||||||
|
await self._execute_operations([operation])
|
||||||
|
|
||||||
return future
|
return future
|
||||||
|
|
||||||
async def _scheduler_loop(self) -> None:
|
async def _scheduler_loop(self) -> None:
|
||||||
|
|||||||
@@ -320,10 +320,11 @@ class EventManager:
|
|||||||
logger.warning(f"插件 {permission_group} 没有权限触发事件 {event_name},已拒绝触发!")
|
logger.warning(f"插件 {permission_group} 没有权限触发事件 {event_name},已拒绝触发!")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# 通知 scheduler(如果已注册)
|
# 🔧 修复:异步通知 scheduler,避免阻塞当前事件流程
|
||||||
if hasattr(self, "_scheduler_callback") and self._scheduler_callback:
|
if hasattr(self, "_scheduler_callback") and self._scheduler_callback:
|
||||||
try:
|
try:
|
||||||
await self._scheduler_callback(event_name, params)
|
# 使用 create_task 异步执行,避免死锁
|
||||||
|
asyncio.create_task(self._scheduler_callback(event_name, params))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"调用 scheduler 回调时出错: {e}", exc_info=True)
|
logger.error(f"调用 scheduler 回调时出错: {e}", exc_info=True)
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
from collections.abc import Awaitable, Callable
|
from collections.abc import Awaitable, Callable
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
@@ -16,6 +17,57 @@ from src.plugin_system.base.component_types import EventType
|
|||||||
logger = get_logger("unified_scheduler")
|
logger = get_logger("unified_scheduler")
|
||||||
|
|
||||||
|
|
||||||
|
class DeadlockDetector:
|
||||||
|
"""死锁检测器
|
||||||
|
|
||||||
|
用于检测长时间运行的任务,防止死锁
|
||||||
|
"""
|
||||||
|
def __init__(self, deadlock_timeout: float = 300.0):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
deadlock_timeout: 死锁超时时间(秒),默认5分钟
|
||||||
|
"""
|
||||||
|
self._task_start_times: dict[str, float] = {}
|
||||||
|
self._deadlock_timeout = deadlock_timeout
|
||||||
|
|
||||||
|
def register_task_start(self, task_id: str) -> None:
|
||||||
|
"""注册任务开始时间"""
|
||||||
|
self._task_start_times[task_id] = time.time()
|
||||||
|
|
||||||
|
def unregister_task(self, task_id: str) -> None:
|
||||||
|
"""取消注册任务"""
|
||||||
|
self._task_start_times.pop(task_id, None)
|
||||||
|
|
||||||
|
def check_for_deadlocks(self) -> list[str]:
|
||||||
|
"""检查可能的死锁任务
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List[str]: 可能死锁的任务ID列表
|
||||||
|
"""
|
||||||
|
current_time = time.time()
|
||||||
|
deadlocked_tasks = []
|
||||||
|
|
||||||
|
for task_id, start_time in self._task_start_times.items():
|
||||||
|
if current_time - start_time > self._deadlock_timeout:
|
||||||
|
deadlocked_tasks.append(task_id)
|
||||||
|
|
||||||
|
return deadlocked_tasks
|
||||||
|
|
||||||
|
def get_task_runtime(self, task_id: str) -> float:
|
||||||
|
"""获取任务运行时间
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_id: 任务ID
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
float: 运行时间(秒),如果任务不存在返回0
|
||||||
|
"""
|
||||||
|
start_time = self._task_start_times.get(task_id)
|
||||||
|
if start_time:
|
||||||
|
return time.time() - start_time
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
class TriggerType(Enum):
|
class TriggerType(Enum):
|
||||||
"""触发类型枚举"""
|
"""触发类型枚举"""
|
||||||
|
|
||||||
@@ -73,6 +125,7 @@ class UnifiedScheduler:
|
|||||||
- 支持循环和一次性任务
|
- 支持循环和一次性任务
|
||||||
- 提供任务管理API(创建、删除、强制触发等)
|
- 提供任务管理API(创建、删除、强制触发等)
|
||||||
- 与 event_manager 集成,统一事件管理
|
- 与 event_manager 集成,统一事件管理
|
||||||
|
- 内置死锁检测和恢复机制
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@@ -81,6 +134,9 @@ class UnifiedScheduler:
|
|||||||
self._check_task: asyncio.Task | None = None
|
self._check_task: asyncio.Task | None = None
|
||||||
self._event_subscriptions: set[str] = set() # 追踪已订阅的事件
|
self._event_subscriptions: set[str] = set() # 追踪已订阅的事件
|
||||||
self._executing_tasks: dict[str, asyncio.Task] = {} # 追踪正在执行的任务
|
self._executing_tasks: dict[str, asyncio.Task] = {} # 追踪正在执行的任务
|
||||||
|
# 🔧 新增:死锁检测器
|
||||||
|
self._deadlock_detector = DeadlockDetector(deadlock_timeout=300.0)
|
||||||
|
self._deadlock_check_task: asyncio.Task | None = None
|
||||||
# 移除锁机制,使用无锁设计(基于 asyncio 单线程特性)
|
# 移除锁机制,使用无锁设计(基于 asyncio 单线程特性)
|
||||||
|
|
||||||
async def _handle_event_trigger(self, event_name: str | EventType, event_params: dict[str, Any]) -> None:
|
async def _handle_event_trigger(self, event_name: str | EventType, event_params: dict[str, Any]) -> None:
|
||||||
@@ -117,6 +173,9 @@ class UnifiedScheduler:
|
|||||||
# 并发执行所有事件任务(无锁设计)
|
# 并发执行所有事件任务(无锁设计)
|
||||||
execution_tasks = []
|
execution_tasks = []
|
||||||
for task in event_tasks:
|
for task in event_tasks:
|
||||||
|
# 🔧 新增:在死锁检测器中注册任务开始
|
||||||
|
self._deadlock_detector.register_task_start(task.schedule_id)
|
||||||
|
|
||||||
execution_task = asyncio.create_task(
|
execution_task = asyncio.create_task(
|
||||||
self._execute_event_task_callback(task, event_params),
|
self._execute_event_task_callback(task, event_params),
|
||||||
name=f"execute_event_{task.task_name}"
|
name=f"execute_event_{task.task_name}"
|
||||||
@@ -132,6 +191,8 @@ class UnifiedScheduler:
|
|||||||
# 清理执行追踪
|
# 清理执行追踪
|
||||||
for task in event_tasks:
|
for task in event_tasks:
|
||||||
self._executing_tasks.pop(task.schedule_id, None)
|
self._executing_tasks.pop(task.schedule_id, None)
|
||||||
|
# 🔧 新增:从死锁检测器中移除任务
|
||||||
|
self._deadlock_detector.unregister_task(task.schedule_id)
|
||||||
|
|
||||||
# 收集需要移除的任务
|
# 收集需要移除的任务
|
||||||
tasks_to_remove = []
|
tasks_to_remove = []
|
||||||
@@ -155,6 +216,8 @@ class UnifiedScheduler:
|
|||||||
|
|
||||||
self._running = True
|
self._running = True
|
||||||
self._check_task = asyncio.create_task(self._check_loop())
|
self._check_task = asyncio.create_task(self._check_loop())
|
||||||
|
# 🔧 新增:启动死锁检测任务
|
||||||
|
self._deadlock_check_task = asyncio.create_task(self._deadlock_check_loop())
|
||||||
|
|
||||||
# 注册回调到 event_manager
|
# 注册回调到 event_manager
|
||||||
try:
|
try:
|
||||||
@@ -173,6 +236,15 @@ class UnifiedScheduler:
|
|||||||
return
|
return
|
||||||
|
|
||||||
self._running = False
|
self._running = False
|
||||||
|
|
||||||
|
# 🔧 修复:停止死锁检测任务
|
||||||
|
if self._deadlock_check_task:
|
||||||
|
self._deadlock_check_task.cancel()
|
||||||
|
try:
|
||||||
|
await self._deadlock_check_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
if self._check_task:
|
if self._check_task:
|
||||||
self._check_task.cancel()
|
self._check_task.cancel()
|
||||||
try:
|
try:
|
||||||
@@ -216,6 +288,9 @@ class UnifiedScheduler:
|
|||||||
self._tasks.clear()
|
self._tasks.clear()
|
||||||
self._event_subscriptions.clear()
|
self._event_subscriptions.clear()
|
||||||
self._executing_tasks.clear()
|
self._executing_tasks.clear()
|
||||||
|
# 🔧 新增:清理死锁检测器
|
||||||
|
if hasattr(self, '_deadlock_detector'):
|
||||||
|
self._deadlock_detector._task_start_times.clear()
|
||||||
|
|
||||||
async def _check_loop(self):
|
async def _check_loop(self):
|
||||||
"""主循环:每秒检查一次所有任务"""
|
"""主循环:每秒检查一次所有任务"""
|
||||||
@@ -230,6 +305,59 @@ class UnifiedScheduler:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"调度器检查循环发生错误: {e}", exc_info=True)
|
logger.error(f"调度器检查循环发生错误: {e}", exc_info=True)
|
||||||
|
|
||||||
|
async def _deadlock_check_loop(self):
|
||||||
|
"""死锁检测循环:每30秒检查一次是否有死锁任务"""
|
||||||
|
logger.debug("死锁检测循环已启动")
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(30)
|
||||||
|
deadlocked_tasks = self._deadlock_detector.check_for_deadlocks()
|
||||||
|
|
||||||
|
if deadlocked_tasks:
|
||||||
|
logger.warning(f"检测到 {len(deadlocked_tasks)} 个可能的死锁任务: {deadlocked_tasks}")
|
||||||
|
|
||||||
|
# 尝试恢复死锁任务
|
||||||
|
for schedule_id in deadlocked_tasks:
|
||||||
|
await self._handle_deadlocked_task(schedule_id)
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.debug("死锁检测循环被取消")
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"死锁检测循环发生错误: {e}", exc_info=True)
|
||||||
|
|
||||||
|
async def _handle_deadlocked_task(self, schedule_id: str) -> None:
|
||||||
|
"""处理死锁任务"""
|
||||||
|
task = self._tasks.get(schedule_id)
|
||||||
|
if not task:
|
||||||
|
# 任务不存在,清理检测器中的记录
|
||||||
|
self._deadlock_detector.unregister_task(schedule_id)
|
||||||
|
return
|
||||||
|
|
||||||
|
runtime = self._deadlock_detector.get_task_runtime(schedule_id)
|
||||||
|
logger.warning(f"任务 {task.task_name} 已运行 {runtime:.1f} 秒,可能已死锁")
|
||||||
|
|
||||||
|
# 获取执行中的任务
|
||||||
|
executing_task = self._executing_tasks.get(schedule_id)
|
||||||
|
if executing_task and not executing_task.done():
|
||||||
|
# 强制取消任务
|
||||||
|
logger.warning(f"强制取消死锁任务: {task.task_name}")
|
||||||
|
try:
|
||||||
|
executing_task.cancel()
|
||||||
|
# 等待任务取消,但使用较短的超时
|
||||||
|
await asyncio.wait_for(executing_task, timeout=5.0)
|
||||||
|
logger.info(f"死锁任务 {task.task_name} 已成功取消")
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
logger.error(f"无法取消死锁任务 {task.task_name},可能需要重启系统")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"取消死锁任务 {task.task_name} 时发生错误: {e}")
|
||||||
|
|
||||||
|
# 清理执行追踪
|
||||||
|
self._executing_tasks.pop(schedule_id, None)
|
||||||
|
|
||||||
|
# 从检测器中移除记录
|
||||||
|
self._deadlock_detector.unregister_task(schedule_id)
|
||||||
|
|
||||||
async def _check_and_trigger_tasks(self):
|
async def _check_and_trigger_tasks(self):
|
||||||
"""检查并触发到期任务
|
"""检查并触发到期任务
|
||||||
|
|
||||||
@@ -268,6 +396,9 @@ class UnifiedScheduler:
|
|||||||
# 为每个任务创建独立的异步任务,确保并发执行
|
# 为每个任务创建独立的异步任务,确保并发执行
|
||||||
execution_tasks = []
|
execution_tasks = []
|
||||||
for task in tasks_to_trigger:
|
for task in tasks_to_trigger:
|
||||||
|
# 🔧 新增:在死锁检测器中注册任务开始
|
||||||
|
self._deadlock_detector.register_task_start(task.schedule_id)
|
||||||
|
|
||||||
execution_task = asyncio.create_task(
|
execution_task = asyncio.create_task(
|
||||||
self._execute_task_callback(task, current_time),
|
self._execute_task_callback(task, current_time),
|
||||||
name=f"execute_{task.task_name}"
|
name=f"execute_{task.task_name}"
|
||||||
@@ -283,6 +414,8 @@ class UnifiedScheduler:
|
|||||||
# 清理执行追踪
|
# 清理执行追踪
|
||||||
for task in tasks_to_trigger:
|
for task in tasks_to_trigger:
|
||||||
self._executing_tasks.pop(task.schedule_id, None)
|
self._executing_tasks.pop(task.schedule_id, None)
|
||||||
|
# 🔧 新增:从死锁检测器中移除任务
|
||||||
|
self._deadlock_detector.unregister_task(task.schedule_id)
|
||||||
|
|
||||||
# 第三阶段:收集需要移除的任务并移除(无锁设计)
|
# 第三阶段:收集需要移除的任务并移除(无锁设计)
|
||||||
tasks_to_remove = []
|
tasks_to_remove = []
|
||||||
@@ -586,7 +719,7 @@ class UnifiedScheduler:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
async def remove_schedule(self, schedule_id: str) -> bool:
|
async def remove_schedule(self, schedule_id: str) -> bool:
|
||||||
"""移除调度任务(无锁设计)
|
"""移除调度任务(改进的取消机制)
|
||||||
|
|
||||||
如果任务正在执行,会取消执行中的任务
|
如果任务正在执行,会取消执行中的任务
|
||||||
"""
|
"""
|
||||||
@@ -598,16 +731,20 @@ class UnifiedScheduler:
|
|||||||
task = self._tasks[schedule_id]
|
task = self._tasks[schedule_id]
|
||||||
executing_task = self._executing_tasks.get(schedule_id)
|
executing_task = self._executing_tasks.get(schedule_id)
|
||||||
|
|
||||||
# 取消正在执行的任务
|
# 🔧 修复:改进任务取消机制,避免死锁
|
||||||
if executing_task and not executing_task.done():
|
if executing_task and not executing_task.done():
|
||||||
logger.debug(f"取消正在执行的任务: {task.task_name}")
|
logger.debug(f"取消正在执行的任务: {task.task_name}")
|
||||||
try:
|
try:
|
||||||
executing_task.cancel()
|
executing_task.cancel()
|
||||||
await asyncio.wait_for(executing_task, 3)
|
# 使用更长的超时时间,并添加异常处理
|
||||||
|
await asyncio.wait_for(executing_task, timeout=10.0)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
logger.warning(f"取消任务 {task.task_name} 超时,强制移除")
|
logger.warning(f"取消任务 {task.task_name} 超时,可能存在死锁风险")
|
||||||
|
# 不再强制移除,让任务自然完成
|
||||||
|
return False
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"取消任务 {task.task_name} 时发生错误: {e}")
|
logger.error(f"取消任务 {task.task_name} 时发生未预期的错误: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
# 移除任务
|
# 移除任务
|
||||||
await self._remove_task_internal(schedule_id)
|
await self._remove_task_internal(schedule_id)
|
||||||
@@ -666,6 +803,10 @@ class UnifiedScheduler:
|
|||||||
# 清理已完成的任务
|
# 清理已完成的任务
|
||||||
if executing_task and executing_task.done():
|
if executing_task and executing_task.done():
|
||||||
self._executing_tasks.pop(schedule_id, None)
|
self._executing_tasks.pop(schedule_id, None)
|
||||||
|
self._deadlock_detector.unregister_task(schedule_id)
|
||||||
|
|
||||||
|
# 🔧 新增:在死锁检测器中注册任务开始
|
||||||
|
self._deadlock_detector.register_task_start(schedule_id)
|
||||||
|
|
||||||
# 创建执行任务
|
# 创建执行任务
|
||||||
execution_task = asyncio.create_task(
|
execution_task = asyncio.create_task(
|
||||||
@@ -683,6 +824,8 @@ class UnifiedScheduler:
|
|||||||
finally:
|
finally:
|
||||||
# 清理执行追踪
|
# 清理执行追踪
|
||||||
self._executing_tasks.pop(schedule_id, None)
|
self._executing_tasks.pop(schedule_id, None)
|
||||||
|
# 🔧 新增:从死锁检测器中移除任务
|
||||||
|
self._deadlock_detector.unregister_task(schedule_id)
|
||||||
|
|
||||||
async def pause_schedule(self, schedule_id: str) -> bool:
|
async def pause_schedule(self, schedule_id: str) -> bool:
|
||||||
"""暂停任务(不删除)"""
|
"""暂停任务(不删除)"""
|
||||||
@@ -761,6 +904,12 @@ class UnifiedScheduler:
|
|||||||
"task_obj_name": executing_task.get_name() if hasattr(executing_task, 'get_name') else str(executing_task),
|
"task_obj_name": executing_task.get_name() if hasattr(executing_task, 'get_name') else str(executing_task),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
# 🔧 新增:获取死锁检测统计
|
||||||
|
deadlock_stats = {
|
||||||
|
"monitored_tasks": len(self._deadlock_detector._task_start_times),
|
||||||
|
"deadlock_timeout": self._deadlock_detector._deadlock_timeout,
|
||||||
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"is_running": self._running,
|
"is_running": self._running,
|
||||||
"total_tasks": total_tasks,
|
"total_tasks": total_tasks,
|
||||||
@@ -772,6 +921,8 @@ class UnifiedScheduler:
|
|||||||
"executing_tasks_info": executing_tasks_info,
|
"executing_tasks_info": executing_tasks_info,
|
||||||
"tasks_by_type": tasks_by_type,
|
"tasks_by_type": tasks_by_type,
|
||||||
"registered_events": list(self._event_subscriptions),
|
"registered_events": list(self._event_subscriptions),
|
||||||
|
# 🔧 新增:死锁检测统计
|
||||||
|
"deadlock_detection": deadlock_stats,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user