refactor: 移除锁机制,优化统一调度器的并发执行设计

This commit is contained in:
Windpicker-owo
2025-11-07 22:28:27 +08:00
parent 09d7f1e7d0
commit 6042a604c0

View File

@@ -79,38 +79,34 @@ class UnifiedScheduler:
self._tasks: dict[str, ScheduleTask] = {} self._tasks: dict[str, ScheduleTask] = {}
self._running = False self._running = False
self._check_task: asyncio.Task | None = None self._check_task: asyncio.Task | None = None
self._lock = asyncio.Lock()
self._event_subscriptions: set[str] = set() # 追踪已订阅的事件 self._event_subscriptions: set[str] = set() # 追踪已订阅的事件
self._executing_tasks: dict[str, asyncio.Task] = {} # 追踪正在执行的任务 self._executing_tasks: dict[str, asyncio.Task] = {} # 追踪正在执行的任务
self._execution_lock = asyncio.Lock() # 专门用于保护执行任务的并发访问 # 移除锁机制,使用无锁设计(基于 asyncio 单线程特性)
async def _handle_event_trigger(self, event_name: str | EventType, event_params: dict[str, Any]) -> None: async def _handle_event_trigger(self, event_name: str | EventType, event_params: dict[str, Any]) -> None:
"""处理来自 event_manager 的事件通知 """处理来自 event_manager 的事件通知
此方法由 event_manager 在触发事件时直接调用 此方法由 event_manager 在触发事件时直接调用
无锁设计:基于 asyncio 单线程特性,避免死锁
注意:此方法不能在持有 self._lock 的情况下调用,
否则会导致死锁(因为回调可能再次触发事件)
""" """
# 获取订阅该事件的所有任务(快速复制,减少锁持有时间) # 获取订阅该事件的所有任务
async with self._lock: event_tasks = []
event_tasks = [] for task in self._tasks.values():
for task in self._tasks.values(): if (task.trigger_type == TriggerType.EVENT
if (task.trigger_type == TriggerType.EVENT and task.trigger_config.get("event_name") == event_name
and task.trigger_config.get("event_name") == event_name and task.is_active):
and task.is_active):
# 检查事件任务是否已经在执行中,防止重复触发 # 检查事件任务是否已经在执行中,防止重复触发
if task.schedule_id in self._executing_tasks: if task.schedule_id in self._executing_tasks:
executing_task = self._executing_tasks[task.schedule_id] executing_task = self._executing_tasks[task.schedule_id]
if not executing_task.done(): if not executing_task.done():
logger.debug(f"[调度器] 事件任务 {task.task_name} 仍在执行中,跳过本次触发") logger.debug(f"[调度器] 事件任务 {task.task_name} 仍在执行中,跳过本次触发")
continue continue
else: else:
# 任务已完成但未清理,先清理 # 任务已完成但未清理,先清理
self._executing_tasks.pop(task.schedule_id, None) self._executing_tasks.pop(task.schedule_id, None)
event_tasks.append(task) event_tasks.append(task)
if not event_tasks: if not event_tasks:
logger.debug(f"[调度器] 事件 '{event_name}' 没有对应的调度任务") logger.debug(f"[调度器] 事件 '{event_name}' 没有对应的调度任务")
@@ -118,26 +114,24 @@ class UnifiedScheduler:
logger.debug(f"[调度器] 事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务") logger.debug(f"[调度器] 事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务")
# 并发执行所有事件任务 # 并发执行所有事件任务(无锁设计)
async with self._execution_lock: execution_tasks = []
execution_tasks = [] for task in event_tasks:
for task in event_tasks: execution_task = asyncio.create_task(
execution_task = asyncio.create_task( self._execute_event_task_callback(task, event_params),
self._execute_event_task_callback(task, event_params), name=f"execute_event_{task.task_name}"
name=f"execute_event_{task.task_name}" )
) execution_tasks.append(execution_task)
execution_tasks.append(execution_task)
# 追踪正在执行的任务 # 追踪正在执行的任务
self._executing_tasks[task.schedule_id] = execution_task self._executing_tasks[task.schedule_id] = execution_task
# 等待所有任务完成 # 等待所有任务完成
results = await asyncio.gather(*execution_tasks, return_exceptions=True) results = await asyncio.gather(*execution_tasks, return_exceptions=True)
# 清理执行追踪 # 清理执行追踪
async with self._execution_lock: for task in event_tasks:
for task in event_tasks: self._executing_tasks.pop(task.schedule_id, None)
self._executing_tasks.pop(task.schedule_id, None)
# 收集需要移除的任务 # 收集需要移除的任务
tasks_to_remove = [] tasks_to_remove = []
@@ -149,11 +143,9 @@ class UnifiedScheduler:
tasks_to_remove.append(task.schedule_id) tasks_to_remove.append(task.schedule_id)
logger.debug(f"[调度器] 一次性事件任务 {task.task_name} 已完成,将被移除") logger.debug(f"[调度器] 一次性事件任务 {task.task_name} 已完成,将被移除")
# 移除已完成的一次性任务 # 移除已完成的一次性任务(无锁设计)
if tasks_to_remove: for schedule_id in tasks_to_remove:
async with self._lock: await self._remove_task_internal(schedule_id)
for schedule_id in tasks_to_remove:
await self._remove_task_internal(schedule_id)
async def start(self): async def start(self):
"""启动调度器""" """启动调度器"""
@@ -197,14 +189,15 @@ class UnifiedScheduler:
except ImportError: except ImportError:
pass pass
# 取消所有正在执行的任务(避免在锁内进行阻塞操作 # 取消所有正在执行的任务(无锁设计
executing_tasks = list(self._executing_tasks.values()) executing_tasks = list(self._executing_tasks.values())
if executing_tasks: if executing_tasks:
logger.debug(f"取消 {len(executing_tasks)} 个正在执行的任务") logger.debug(f"取消 {len(executing_tasks)} 个正在执行的任务")
# 在取消任务前先清空追踪,避免死锁
# 在取消任务前先清空追踪
self._executing_tasks.clear() self._executing_tasks.clear()
# 在锁外取消任务 # 取消任务
for task in executing_tasks: for task in executing_tasks:
if not task.done(): if not task.done():
task.cancel() task.cancel()
@@ -219,7 +212,7 @@ class UnifiedScheduler:
logger.warning("部分任务取消超时,强制停止") logger.warning("部分任务取消超时,强制停止")
logger.info("统一调度器已停止") logger.info("统一调度器已停止")
# 清空资源时不需要锁,因为已经停止运行 # 清空所有资源
self._tasks.clear() self._tasks.clear()
self._event_subscriptions.clear() self._event_subscriptions.clear()
self._executing_tasks.clear() self._executing_tasks.clear()
@@ -240,61 +233,58 @@ class UnifiedScheduler:
async def _check_and_trigger_tasks(self): async def _check_and_trigger_tasks(self):
"""检查并触发到期任务 """检查并触发到期任务
注意:为了避免死锁和阻塞,回调执行必须在锁外并且并发进行 无锁设计:基于 asyncio 单线程特性,避免死锁和阻塞
""" """
current_time = datetime.now() current_time = datetime.now()
# 第一阶段:在锁内快速收集需要触发的任务 # 收集需要触发的任务
async with self._lock: tasks_to_trigger = []
tasks_to_trigger = []
for schedule_id, task in list(self._tasks.items()): for schedule_id, task in list(self._tasks.items()):
if not task.is_active: if not task.is_active:
continue
# 检查任务是否已经在执行中,防止重复触发
if schedule_id in self._executing_tasks:
executing_task = self._executing_tasks[schedule_id]
if not executing_task.done():
logger.debug(f"[调度器] 任务 {task.task_name} 仍在执行中,跳过本次触发")
continue continue
else:
# 任务已完成但未清理,先清理
self._executing_tasks.pop(schedule_id, None)
# 检查任务是否已经在执行中,防止重复触发 try:
if schedule_id in self._executing_tasks: should_trigger = await self._should_trigger_task(task, current_time)
executing_task = self._executing_tasks[schedule_id] if should_trigger:
if not executing_task.done(): tasks_to_trigger.append(task)
logger.debug(f"[调度器] 任务 {task.task_name} 仍在执行中,跳过本次触发") except Exception as e:
continue logger.error(f"检查任务 {task.task_name} 时发生错误: {e}", exc_info=True)
else:
# 任务已完成但未清理,先清理
self._executing_tasks.pop(schedule_id, None)
try: # 第二阶段:并发执行所有回调(无锁设计)
should_trigger = await self._should_trigger_task(task, current_time)
if should_trigger:
tasks_to_trigger.append(task)
except Exception as e:
logger.error(f"检查任务 {task.task_name} 时发生错误: {e}", exc_info=True)
# 第二阶段:在锁外并发执行所有回调(避免死锁和阻塞)
if not tasks_to_trigger: if not tasks_to_trigger:
return return
# 为每个任务创建独立的异步任务,确保并发执行 # 为每个任务创建独立的异步任务,确保并发执行
async with self._execution_lock: execution_tasks = []
execution_tasks = [] for task in tasks_to_trigger:
for task in tasks_to_trigger: execution_task = asyncio.create_task(
execution_task = asyncio.create_task( self._execute_task_callback(task, current_time),
self._execute_task_callback(task, current_time), name=f"execute_{task.task_name}"
name=f"execute_{task.task_name}" )
) execution_tasks.append(execution_task)
execution_tasks.append(execution_task)
# 追踪正在执行的任务,以便在 remove_schedule 时可以取消 # 追踪正在执行的任务,以便在 remove_schedule 时可以取消
self._executing_tasks[task.schedule_id] = execution_task self._executing_tasks[task.schedule_id] = execution_task
# 等待所有任务完成(使用 return_exceptions=True 避免单个任务失败影响其他任务) # 等待所有任务完成(使用 return_exceptions=True 避免单个任务失败影响其他任务)
results = await asyncio.gather(*execution_tasks, return_exceptions=True) results = await asyncio.gather(*execution_tasks, return_exceptions=True)
# 清理执行追踪 # 清理执行追踪
async with self._execution_lock: for task in tasks_to_trigger:
for task in tasks_to_trigger: self._executing_tasks.pop(task.schedule_id, None)
self._executing_tasks.pop(task.schedule_id, None)
# 第三阶段:收集需要移除的任务并在锁内移除 # 第三阶段:收集需要移除的任务并移除(无锁设计)
tasks_to_remove = [] tasks_to_remove = []
for task, result in zip(tasks_to_trigger, results): for task, result in zip(tasks_to_trigger, results):
if isinstance(result, Exception): if isinstance(result, Exception):
@@ -304,10 +294,9 @@ class UnifiedScheduler:
tasks_to_remove.append(task.schedule_id) tasks_to_remove.append(task.schedule_id)
logger.debug(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除") logger.debug(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除")
if tasks_to_remove: # 移除已完成的一次性任务
async with self._lock: for schedule_id in tasks_to_remove:
for schedule_id in tasks_to_remove: await self._remove_task_internal(schedule_id)
await self._remove_task_internal(schedule_id)
async def _execute_task_callback(self, task: ScheduleTask, current_time: datetime) -> bool: async def _execute_task_callback(self, task: ScheduleTask, current_time: datetime) -> bool:
"""执行单个任务的回调(用于并发执行) """执行单个任务的回调(用于并发执行)
@@ -474,21 +463,20 @@ class UnifiedScheduler:
logger.error(f"执行任务 {task.task_name} 的回调函数时出错: {e}", exc_info=True) logger.error(f"执行任务 {task.task_name} 的回调函数时出错: {e}", exc_info=True)
async def _remove_task_internal(self, schedule_id: str): async def _remove_task_internal(self, schedule_id: str):
"""内部方法:移除任务(需要加锁保护""" """内部方法:移除任务(无锁设计"""
async with self._lock: task = self._tasks.pop(schedule_id, None)
task = self._tasks.pop(schedule_id, None) if task:
if task: if task.trigger_type == TriggerType.EVENT:
if task.trigger_type == TriggerType.EVENT: event_name = task.trigger_config.get("event_name")
event_name = task.trigger_config.get("event_name") if event_name:
if event_name: has_other_subscribers = any(
has_other_subscribers = any( t.trigger_type == TriggerType.EVENT and t.trigger_config.get("event_name") == event_name
t.trigger_type == TriggerType.EVENT and t.trigger_config.get("event_name") == event_name for t in self._tasks.values()
for t in self._tasks.values() )
) # 如果没有其他任务订阅此事件,从追踪集合中移除
# 如果没有其他任务订阅此事件,从追踪集合中移除 if not has_other_subscribers and event_name in self._event_subscriptions:
if not has_other_subscribers and event_name in self._event_subscriptions: self._event_subscriptions.discard(event_name)
self._event_subscriptions.discard(event_name) logger.debug(f"事件 '{event_name}' 已无订阅任务,从追踪中移除")
logger.debug(f"事件 '{event_name}' 已无订阅任务,从追踪中移除")
async def create_schedule( async def create_schedule(
self, self,
@@ -500,7 +488,7 @@ class UnifiedScheduler:
callback_args: tuple | None = None, callback_args: tuple | None = None,
callback_kwargs: dict | None = None, callback_kwargs: dict | None = None,
) -> str: ) -> str:
"""创建调度任务(详细注释见文档""" """创建调度任务(无锁设计"""
schedule_id = str(uuid.uuid4()) schedule_id = str(uuid.uuid4())
task = ScheduleTask( task = ScheduleTask(
@@ -514,37 +502,36 @@ class UnifiedScheduler:
callback_kwargs=callback_kwargs, callback_kwargs=callback_kwargs,
) )
async with self._lock: # 存储任务(无锁操作)
self._tasks[schedule_id] = task self._tasks[schedule_id] = task
if trigger_type == TriggerType.EVENT: if trigger_type == TriggerType.EVENT:
event_name = trigger_config.get("event_name") event_name = trigger_config.get("event_name")
if not event_name: if not event_name:
raise ValueError("事件触发类型必须提供 event_name") raise ValueError("事件触发类型必须提供 event_name")
# 添加到追踪集合 # 添加到追踪集合
if event_name not in self._event_subscriptions: if event_name not in self._event_subscriptions:
self._event_subscriptions.add(event_name) self._event_subscriptions.add(event_name)
logger.debug(f"开始追踪事件: {event_name}") logger.debug(f"开始追踪事件: {event_name}")
logger.debug(f"创建调度任务: {task.task_name}") logger.debug(f"创建调度任务: {task.task_name}")
return schedule_id return schedule_id
async def remove_schedule(self, schedule_id: str) -> bool: async def remove_schedule(self, schedule_id: str) -> bool:
"""移除调度任务 """移除调度任务(无锁设计)
如果任务正在执行,会取消执行中的任务 如果任务正在执行,会取消执行中的任务
""" """
# 获取任务信息和执行任务,避免长时间持有锁 # 获取任务信息
async with self._lock: if schedule_id not in self._tasks:
if schedule_id not in self._tasks: logger.warning(f"尝试移除不存在的任务: {schedule_id}")
logger.warning(f"尝试移除不存在的任务: {schedule_id}") return False
return False
task = self._tasks[schedule_id] task = self._tasks[schedule_id]
executing_task = self._executing_tasks.get(schedule_id) executing_task = self._executing_tasks.get(schedule_id)
# 在锁外取消正在执行的任务,避免死锁 # 取消正在执行的任务
if executing_task and not executing_task.done(): if executing_task and not executing_task.done():
logger.debug(f"取消正在执行的任务: {task.task_name}") logger.debug(f"取消正在执行的任务: {task.task_name}")
try: try:
@@ -555,112 +542,103 @@ class UnifiedScheduler:
except Exception as e: except Exception as e:
logger.warning(f"取消任务 {task.task_name} 时发生错误: {e}") logger.warning(f"取消任务 {task.task_name} 时发生错误: {e}")
# 重新获取锁移除任务 # 移除任务
async with self._lock: await self._remove_task_internal(schedule_id)
await self._remove_task_internal(schedule_id)
# 使用执行锁清理执行追踪 # 清理执行追踪
async with self._execution_lock: self._executing_tasks.pop(schedule_id, None)
self._executing_tasks.pop(schedule_id, None)
logger.debug(f"移除调度任务: {task.task_name}") logger.debug(f"移除调度任务: {task.task_name}")
return True return True
async def trigger_schedule(self, schedule_id: str) -> bool: async def trigger_schedule(self, schedule_id: str) -> bool:
"""强制触发指定任务""" """强制触发指定任务(无锁设计)"""
# 获取任务信息,减少锁持有时间 # 获取任务信息
async with self._lock: task = self._tasks.get(schedule_id)
task = self._tasks.get(schedule_id) if not task:
if not task: logger.warning(f"尝试触发不存在的任务: {schedule_id}")
logger.warning(f"尝试触发不存在的任务: {schedule_id}") return False
return False
if not task.is_active: if not task.is_active:
logger.warning(f"尝试触发已停用的任务: {task.task_name}") logger.warning(f"尝试触发已停用的任务: {task.task_name}")
return False return False
# 检查任务是否已经在执行中 # 检查任务是否已经在执行中
executing_task = self._executing_tasks.get(schedule_id) executing_task = self._executing_tasks.get(schedule_id)
if executing_task and not executing_task.done(): if executing_task and not executing_task.done():
logger.warning(f"任务 {task.task_name} 已在执行中,无法重复触发") logger.warning(f"任务 {task.task_name} 已在执行中,无法重复触发")
return False return False
# 清理已完成的任务 # 清理已完成的任务
if executing_task and executing_task.done(): if executing_task and executing_task.done():
self._executing_tasks.pop(schedule_id, None) self._executing_tasks.pop(schedule_id, None)
# 在锁外创建执行任务 # 创建执行任务
async with self._execution_lock: execution_task = asyncio.create_task(
execution_task = asyncio.create_task( self._execute_trigger_task_callback(task),
self._execute_trigger_task_callback(task), name=f"trigger_{task.task_name}"
name=f"trigger_{task.task_name}" )
)
# 追踪执行任务 # 追踪执行任务
self._executing_tasks[schedule_id] = execution_task self._executing_tasks[schedule_id] = execution_task
# 在锁外等待任务完成 # 等待任务完成
try: try:
result = await execution_task result = await execution_task
return result return result
finally: finally:
# 清理执行追踪 # 清理执行追踪
async with self._execution_lock: self._executing_tasks.pop(schedule_id, None)
self._executing_tasks.pop(schedule_id, None)
async def pause_schedule(self, schedule_id: str) -> bool: async def pause_schedule(self, schedule_id: str) -> bool:
"""暂停任务(不删除)""" """暂停任务(不删除)"""
async with self._lock: task = self._tasks.get(schedule_id)
task = self._tasks.get(schedule_id) if not task:
if not task: logger.warning(f"尝试暂停不存在的任务: {schedule_id}")
logger.warning(f"尝试暂停不存在的任务: {schedule_id}") return False
return False
task.is_active = False task.is_active = False
logger.debug(f"暂停任务: {task.task_name}") logger.debug(f"暂停任务: {task.task_name}")
return True return True
async def resume_schedule(self, schedule_id: str) -> bool: async def resume_schedule(self, schedule_id: str) -> bool:
"""恢复任务""" """恢复任务"""
async with self._lock: task = self._tasks.get(schedule_id)
task = self._tasks.get(schedule_id) if not task:
if not task: logger.warning(f"尝试恢复不存在的任务: {schedule_id}")
logger.warning(f"尝试恢复不存在的任务: {schedule_id}") return False
return False
task.is_active = True task.is_active = True
logger.debug(f"恢复任务: {task.task_name}") logger.debug(f"恢复任务: {task.task_name}")
return True return True
async def get_task_info(self, schedule_id: str) -> dict[str, Any] | None: async def get_task_info(self, schedule_id: str) -> dict[str, Any] | None:
"""获取任务信息""" """获取任务信息"""
async with self._lock: task = self._tasks.get(schedule_id)
task = self._tasks.get(schedule_id) if not task:
if not task: return None
return None
return { return {
"schedule_id": task.schedule_id, "schedule_id": task.schedule_id,
"task_name": task.task_name, "task_name": task.task_name,
"trigger_type": task.trigger_type.value, "trigger_type": task.trigger_type.value,
"is_recurring": task.is_recurring, "is_recurring": task.is_recurring,
"is_active": task.is_active, "is_active": task.is_active,
"created_at": task.created_at.isoformat(), "created_at": task.created_at.isoformat(),
"last_triggered_at": task.last_triggered_at.isoformat() if task.last_triggered_at else None, "last_triggered_at": task.last_triggered_at.isoformat() if task.last_triggered_at else None,
"trigger_count": task.trigger_count, "trigger_count": task.trigger_count,
"trigger_config": task.trigger_config.copy(), "trigger_config": task.trigger_config.copy(),
} }
async def list_tasks(self, trigger_type: TriggerType | None = None) -> list[dict[str, Any]]: async def list_tasks(self, trigger_type: TriggerType | None = None) -> list[dict[str, Any]]:
"""列出所有任务或指定类型的任务""" """列出所有任务或指定类型的任务"""
async with self._lock: tasks = []
tasks = [] for task in self._tasks.values():
for task in self._tasks.values(): if trigger_type is None or task.trigger_type == trigger_type:
if trigger_type is None or task.trigger_type == trigger_type: task_info = await self.get_task_info(task.schedule_id)
task_info = await self.get_task_info(task.schedule_id) if task_info:
if task_info: tasks.append(task_info)
tasks.append(task_info) return tasks
return tasks
def get_statistics(self) -> dict[str, Any]: def get_statistics(self) -> dict[str, Any]:
"""获取调度器统计信息""" """获取调度器统计信息"""