From 6042a604c00b5ce943bcdd4d0e59251caf67c075 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Fri, 7 Nov 2025 22:28:27 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E7=A7=BB=E9=99=A4=E9=94=81?= =?UTF-8?q?=E6=9C=BA=E5=88=B6=EF=BC=8C=E4=BC=98=E5=8C=96=E7=BB=9F=E4=B8=80?= =?UTF-8?q?=E8=B0=83=E5=BA=A6=E5=99=A8=E7=9A=84=E5=B9=B6=E5=8F=91=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E8=AE=BE=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/schedule/unified_scheduler.py | 368 ++++++++++++++---------------- 1 file changed, 173 insertions(+), 195 deletions(-) diff --git a/src/schedule/unified_scheduler.py b/src/schedule/unified_scheduler.py index 1ae1bb61f..387f84484 100644 --- a/src/schedule/unified_scheduler.py +++ b/src/schedule/unified_scheduler.py @@ -79,38 +79,34 @@ class UnifiedScheduler: self._tasks: dict[str, ScheduleTask] = {} self._running = False self._check_task: asyncio.Task | None = None - self._lock = asyncio.Lock() self._event_subscriptions: set[str] = set() # 追踪已订阅的事件 self._executing_tasks: dict[str, asyncio.Task] = {} # 追踪正在执行的任务 - self._execution_lock = asyncio.Lock() # 专门用于保护执行任务的并发访问 + # 移除锁机制,使用无锁设计(基于 asyncio 单线程特性) async def _handle_event_trigger(self, event_name: str | EventType, event_params: dict[str, Any]) -> None: """处理来自 event_manager 的事件通知 此方法由 event_manager 在触发事件时直接调用 - - 注意:此方法不能在持有 self._lock 的情况下调用, - 否则会导致死锁(因为回调可能再次触发事件) + 无锁设计:基于 asyncio 单线程特性,避免死锁 """ - # 获取订阅该事件的所有任务(快速复制,减少锁持有时间) - async with self._lock: - event_tasks = [] - for task in self._tasks.values(): - if (task.trigger_type == TriggerType.EVENT - and task.trigger_config.get("event_name") == event_name - and task.is_active): + # 获取订阅该事件的所有任务 + event_tasks = [] + for task in self._tasks.values(): + if (task.trigger_type == TriggerType.EVENT + and task.trigger_config.get("event_name") == event_name + and task.is_active): - # 检查事件任务是否已经在执行中,防止重复触发 - if task.schedule_id in self._executing_tasks: - executing_task = self._executing_tasks[task.schedule_id] - if not executing_task.done(): - logger.debug(f"[调度器] 事件任务 {task.task_name} 仍在执行中,跳过本次触发") - continue - else: - # 任务已完成但未清理,先清理 - self._executing_tasks.pop(task.schedule_id, None) + # 检查事件任务是否已经在执行中,防止重复触发 + if task.schedule_id in self._executing_tasks: + executing_task = self._executing_tasks[task.schedule_id] + if not executing_task.done(): + logger.debug(f"[调度器] 事件任务 {task.task_name} 仍在执行中,跳过本次触发") + continue + else: + # 任务已完成但未清理,先清理 + self._executing_tasks.pop(task.schedule_id, None) - event_tasks.append(task) + event_tasks.append(task) if not event_tasks: logger.debug(f"[调度器] 事件 '{event_name}' 没有对应的调度任务") @@ -118,26 +114,24 @@ class UnifiedScheduler: logger.debug(f"[调度器] 事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务") - # 并发执行所有事件任务 - async with self._execution_lock: - execution_tasks = [] - for task in event_tasks: - execution_task = asyncio.create_task( - self._execute_event_task_callback(task, event_params), - name=f"execute_event_{task.task_name}" - ) - execution_tasks.append(execution_task) + # 并发执行所有事件任务(无锁设计) + execution_tasks = [] + for task in event_tasks: + execution_task = asyncio.create_task( + self._execute_event_task_callback(task, event_params), + name=f"execute_event_{task.task_name}" + ) + execution_tasks.append(execution_task) - # 追踪正在执行的任务 - self._executing_tasks[task.schedule_id] = execution_task + # 追踪正在执行的任务 + self._executing_tasks[task.schedule_id] = execution_task # 等待所有任务完成 results = await asyncio.gather(*execution_tasks, return_exceptions=True) # 清理执行追踪 - async with self._execution_lock: - for task in event_tasks: - self._executing_tasks.pop(task.schedule_id, None) + for task in event_tasks: + self._executing_tasks.pop(task.schedule_id, None) # 收集需要移除的任务 tasks_to_remove = [] @@ -149,11 +143,9 @@ class UnifiedScheduler: tasks_to_remove.append(task.schedule_id) logger.debug(f"[调度器] 一次性事件任务 {task.task_name} 已完成,将被移除") - # 移除已完成的一次性任务 - if tasks_to_remove: - async with self._lock: - for schedule_id in tasks_to_remove: - await self._remove_task_internal(schedule_id) + # 移除已完成的一次性任务(无锁设计) + for schedule_id in tasks_to_remove: + await self._remove_task_internal(schedule_id) async def start(self): """启动调度器""" @@ -197,14 +189,15 @@ class UnifiedScheduler: except ImportError: pass - # 取消所有正在执行的任务(避免在锁内进行阻塞操作) + # 取消所有正在执行的任务(无锁设计) executing_tasks = list(self._executing_tasks.values()) if executing_tasks: logger.debug(f"取消 {len(executing_tasks)} 个正在执行的任务") - # 在取消任务前先清空追踪,避免死锁 + + # 在取消任务前先清空追踪 self._executing_tasks.clear() - # 在锁外取消任务 + # 取消任务 for task in executing_tasks: if not task.done(): task.cancel() @@ -219,7 +212,7 @@ class UnifiedScheduler: logger.warning("部分任务取消超时,强制停止") logger.info("统一调度器已停止") - # 清空资源时不需要锁,因为已经停止运行 + # 清空所有资源 self._tasks.clear() self._event_subscriptions.clear() self._executing_tasks.clear() @@ -240,61 +233,58 @@ class UnifiedScheduler: async def _check_and_trigger_tasks(self): """检查并触发到期任务 - 注意:为了避免死锁和阻塞,回调执行必须在锁外并且并发进行 + 无锁设计:基于 asyncio 单线程特性,避免死锁和阻塞 """ current_time = datetime.now() - # 第一阶段:在锁内快速收集需要触发的任务 - async with self._lock: - tasks_to_trigger = [] + # 收集需要触发的任务 + tasks_to_trigger = [] - for schedule_id, task in list(self._tasks.items()): - if not task.is_active: + for schedule_id, task in list(self._tasks.items()): + if not task.is_active: + continue + + # 检查任务是否已经在执行中,防止重复触发 + if schedule_id in self._executing_tasks: + executing_task = self._executing_tasks[schedule_id] + if not executing_task.done(): + logger.debug(f"[调度器] 任务 {task.task_name} 仍在执行中,跳过本次触发") continue + else: + # 任务已完成但未清理,先清理 + self._executing_tasks.pop(schedule_id, None) - # 检查任务是否已经在执行中,防止重复触发 - if schedule_id in self._executing_tasks: - executing_task = self._executing_tasks[schedule_id] - if not executing_task.done(): - logger.debug(f"[调度器] 任务 {task.task_name} 仍在执行中,跳过本次触发") - continue - else: - # 任务已完成但未清理,先清理 - self._executing_tasks.pop(schedule_id, None) + try: + should_trigger = await self._should_trigger_task(task, current_time) + if should_trigger: + tasks_to_trigger.append(task) + except Exception as e: + logger.error(f"检查任务 {task.task_name} 时发生错误: {e}", exc_info=True) - try: - should_trigger = await self._should_trigger_task(task, current_time) - if should_trigger: - tasks_to_trigger.append(task) - except Exception as e: - logger.error(f"检查任务 {task.task_name} 时发生错误: {e}", exc_info=True) - - # 第二阶段:在锁外并发执行所有回调(避免死锁和阻塞) + # 第二阶段:并发执行所有回调(无锁设计) if not tasks_to_trigger: return # 为每个任务创建独立的异步任务,确保并发执行 - async with self._execution_lock: - execution_tasks = [] - for task in tasks_to_trigger: - execution_task = asyncio.create_task( - self._execute_task_callback(task, current_time), - name=f"execute_{task.task_name}" - ) - execution_tasks.append(execution_task) + execution_tasks = [] + for task in tasks_to_trigger: + execution_task = asyncio.create_task( + self._execute_task_callback(task, current_time), + name=f"execute_{task.task_name}" + ) + execution_tasks.append(execution_task) - # 追踪正在执行的任务,以便在 remove_schedule 时可以取消 - self._executing_tasks[task.schedule_id] = execution_task + # 追踪正在执行的任务,以便在 remove_schedule 时可以取消 + self._executing_tasks[task.schedule_id] = execution_task # 等待所有任务完成(使用 return_exceptions=True 避免单个任务失败影响其他任务) results = await asyncio.gather(*execution_tasks, return_exceptions=True) # 清理执行追踪 - async with self._execution_lock: - for task in tasks_to_trigger: - self._executing_tasks.pop(task.schedule_id, None) + for task in tasks_to_trigger: + self._executing_tasks.pop(task.schedule_id, None) - # 第三阶段:收集需要移除的任务并在锁内移除 + # 第三阶段:收集需要移除的任务并移除(无锁设计) tasks_to_remove = [] for task, result in zip(tasks_to_trigger, results): if isinstance(result, Exception): @@ -304,10 +294,9 @@ class UnifiedScheduler: tasks_to_remove.append(task.schedule_id) logger.debug(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除") - if tasks_to_remove: - async with self._lock: - for schedule_id in tasks_to_remove: - await self._remove_task_internal(schedule_id) + # 移除已完成的一次性任务 + for schedule_id in tasks_to_remove: + await self._remove_task_internal(schedule_id) async def _execute_task_callback(self, task: ScheduleTask, current_time: datetime) -> bool: """执行单个任务的回调(用于并发执行) @@ -474,21 +463,20 @@ class UnifiedScheduler: logger.error(f"执行任务 {task.task_name} 的回调函数时出错: {e}", exc_info=True) async def _remove_task_internal(self, schedule_id: str): - """内部方法:移除任务(需要加锁保护)""" - async with self._lock: - task = self._tasks.pop(schedule_id, None) - if task: - if task.trigger_type == TriggerType.EVENT: - event_name = task.trigger_config.get("event_name") - if event_name: - has_other_subscribers = any( - t.trigger_type == TriggerType.EVENT and t.trigger_config.get("event_name") == event_name - for t in self._tasks.values() - ) - # 如果没有其他任务订阅此事件,从追踪集合中移除 - if not has_other_subscribers and event_name in self._event_subscriptions: - self._event_subscriptions.discard(event_name) - logger.debug(f"事件 '{event_name}' 已无订阅任务,从追踪中移除") + """内部方法:移除任务(无锁设计)""" + task = self._tasks.pop(schedule_id, None) + if task: + if task.trigger_type == TriggerType.EVENT: + event_name = task.trigger_config.get("event_name") + if event_name: + has_other_subscribers = any( + t.trigger_type == TriggerType.EVENT and t.trigger_config.get("event_name") == event_name + for t in self._tasks.values() + ) + # 如果没有其他任务订阅此事件,从追踪集合中移除 + if not has_other_subscribers and event_name in self._event_subscriptions: + self._event_subscriptions.discard(event_name) + logger.debug(f"事件 '{event_name}' 已无订阅任务,从追踪中移除") async def create_schedule( self, @@ -500,7 +488,7 @@ class UnifiedScheduler: callback_args: tuple | None = None, callback_kwargs: dict | None = None, ) -> str: - """创建调度任务(详细注释见文档)""" + """创建调度任务(无锁设计)""" schedule_id = str(uuid.uuid4()) task = ScheduleTask( @@ -514,37 +502,36 @@ class UnifiedScheduler: callback_kwargs=callback_kwargs, ) - async with self._lock: - self._tasks[schedule_id] = task + # 存储任务(无锁操作) + self._tasks[schedule_id] = task - if trigger_type == TriggerType.EVENT: - event_name = trigger_config.get("event_name") - if not event_name: - raise ValueError("事件触发类型必须提供 event_name") + if trigger_type == TriggerType.EVENT: + event_name = trigger_config.get("event_name") + if not event_name: + raise ValueError("事件触发类型必须提供 event_name") - # 添加到追踪集合 - if event_name not in self._event_subscriptions: - self._event_subscriptions.add(event_name) - logger.debug(f"开始追踪事件: {event_name}") + # 添加到追踪集合 + if event_name not in self._event_subscriptions: + self._event_subscriptions.add(event_name) + logger.debug(f"开始追踪事件: {event_name}") logger.debug(f"创建调度任务: {task.task_name}") return schedule_id async def remove_schedule(self, schedule_id: str) -> bool: - """移除调度任务 + """移除调度任务(无锁设计) 如果任务正在执行,会取消执行中的任务 """ - # 先获取任务信息和执行任务,避免长时间持有锁 - async with self._lock: - if schedule_id not in self._tasks: - logger.warning(f"尝试移除不存在的任务: {schedule_id}") - return False + # 获取任务信息 + if schedule_id not in self._tasks: + logger.warning(f"尝试移除不存在的任务: {schedule_id}") + return False - task = self._tasks[schedule_id] - executing_task = self._executing_tasks.get(schedule_id) + task = self._tasks[schedule_id] + executing_task = self._executing_tasks.get(schedule_id) - # 在锁外取消正在执行的任务,避免死锁 + # 取消正在执行的任务 if executing_task and not executing_task.done(): logger.debug(f"取消正在执行的任务: {task.task_name}") try: @@ -555,112 +542,103 @@ class UnifiedScheduler: except Exception as e: logger.warning(f"取消任务 {task.task_name} 时发生错误: {e}") - # 重新获取锁移除任务 - async with self._lock: - await self._remove_task_internal(schedule_id) + # 移除任务 + await self._remove_task_internal(schedule_id) - # 使用执行锁清理执行追踪 - async with self._execution_lock: - self._executing_tasks.pop(schedule_id, None) + # 清理执行追踪 + self._executing_tasks.pop(schedule_id, None) logger.debug(f"移除调度任务: {task.task_name}") return True async def trigger_schedule(self, schedule_id: str) -> bool: - """强制触发指定任务""" - # 先获取任务信息,减少锁持有时间 - async with self._lock: - task = self._tasks.get(schedule_id) - if not task: - logger.warning(f"尝试触发不存在的任务: {schedule_id}") - return False + """强制触发指定任务(无锁设计)""" + # 获取任务信息 + task = self._tasks.get(schedule_id) + if not task: + logger.warning(f"尝试触发不存在的任务: {schedule_id}") + return False - if not task.is_active: - logger.warning(f"尝试触发已停用的任务: {task.task_name}") - return False + if not task.is_active: + logger.warning(f"尝试触发已停用的任务: {task.task_name}") + return False - # 检查任务是否已经在执行中 - executing_task = self._executing_tasks.get(schedule_id) - if executing_task and not executing_task.done(): - logger.warning(f"任务 {task.task_name} 已在执行中,无法重复触发") - return False + # 检查任务是否已经在执行中 + executing_task = self._executing_tasks.get(schedule_id) + if executing_task and not executing_task.done(): + logger.warning(f"任务 {task.task_name} 已在执行中,无法重复触发") + return False - # 清理已完成的任务 - if executing_task and executing_task.done(): - self._executing_tasks.pop(schedule_id, None) + # 清理已完成的任务 + if executing_task and executing_task.done(): + self._executing_tasks.pop(schedule_id, None) - # 在锁外创建执行任务 - async with self._execution_lock: - execution_task = asyncio.create_task( - self._execute_trigger_task_callback(task), - name=f"trigger_{task.task_name}" - ) + # 创建执行任务 + execution_task = asyncio.create_task( + self._execute_trigger_task_callback(task), + name=f"trigger_{task.task_name}" + ) - # 追踪执行任务 - self._executing_tasks[schedule_id] = execution_task + # 追踪执行任务 + self._executing_tasks[schedule_id] = execution_task - # 在锁外等待任务完成 + # 等待任务完成 try: result = await execution_task return result finally: # 清理执行追踪 - async with self._execution_lock: - self._executing_tasks.pop(schedule_id, None) + self._executing_tasks.pop(schedule_id, None) async def pause_schedule(self, schedule_id: str) -> bool: """暂停任务(不删除)""" - async with self._lock: - task = self._tasks.get(schedule_id) - if not task: - logger.warning(f"尝试暂停不存在的任务: {schedule_id}") - return False + task = self._tasks.get(schedule_id) + if not task: + logger.warning(f"尝试暂停不存在的任务: {schedule_id}") + return False - task.is_active = False - logger.debug(f"暂停任务: {task.task_name}") - return True + task.is_active = False + logger.debug(f"暂停任务: {task.task_name}") + return True async def resume_schedule(self, schedule_id: str) -> bool: """恢复任务""" - async with self._lock: - task = self._tasks.get(schedule_id) - if not task: - logger.warning(f"尝试恢复不存在的任务: {schedule_id}") - return False + task = self._tasks.get(schedule_id) + if not task: + logger.warning(f"尝试恢复不存在的任务: {schedule_id}") + return False - task.is_active = True - logger.debug(f"恢复任务: {task.task_name}") - return True + task.is_active = True + logger.debug(f"恢复任务: {task.task_name}") + return True async def get_task_info(self, schedule_id: str) -> dict[str, Any] | None: """获取任务信息""" - async with self._lock: - task = self._tasks.get(schedule_id) - if not task: - return None + task = self._tasks.get(schedule_id) + if not task: + return None - return { - "schedule_id": task.schedule_id, - "task_name": task.task_name, - "trigger_type": task.trigger_type.value, - "is_recurring": task.is_recurring, - "is_active": task.is_active, - "created_at": task.created_at.isoformat(), - "last_triggered_at": task.last_triggered_at.isoformat() if task.last_triggered_at else None, - "trigger_count": task.trigger_count, - "trigger_config": task.trigger_config.copy(), - } + return { + "schedule_id": task.schedule_id, + "task_name": task.task_name, + "trigger_type": task.trigger_type.value, + "is_recurring": task.is_recurring, + "is_active": task.is_active, + "created_at": task.created_at.isoformat(), + "last_triggered_at": task.last_triggered_at.isoformat() if task.last_triggered_at else None, + "trigger_count": task.trigger_count, + "trigger_config": task.trigger_config.copy(), + } async def list_tasks(self, trigger_type: TriggerType | None = None) -> list[dict[str, Any]]: """列出所有任务或指定类型的任务""" - async with self._lock: - tasks = [] - for task in self._tasks.values(): - if trigger_type is None or task.trigger_type == trigger_type: - task_info = await self.get_task_info(task.schedule_id) - if task_info: - tasks.append(task_info) - return tasks + tasks = [] + for task in self._tasks.values(): + if trigger_type is None or task.trigger_type == trigger_type: + task_info = await self.get_task_info(task.schedule_id) + if task_info: + tasks.append(task_info) + return tasks def get_statistics(self) -> dict[str, Any]: """获取调度器统计信息"""