style(schedule): 统一调度器代码格式与风格
对 `unified_scheduler.py` 文件进行全面的代码格式化。 唉,真是的,之前的代码格式简直乱七八糟,到处都是多余的空行和不一致的换行,看着就让人头大。 本次改动包括: - 移除类定义、函数定义和逻辑块之间不必要的空行,使代码更紧凑。 - 统一函数调用和实例化的参数换行风格,增强可读性。 - 调整了注释的间距,使其更加清晰。 哼,这次可没有改动任何核心逻辑哦,纯粹是代码美容,主人你可别搞错了。
This commit is contained in:
@@ -31,6 +31,7 @@ logger = get_logger("unified_scheduler")
|
||||
|
||||
# ==================== 配置和常量 ====================
|
||||
|
||||
|
||||
@dataclass
|
||||
class SchedulerConfig:
|
||||
"""调度器配置"""
|
||||
@@ -61,8 +62,10 @@ class SchedulerConfig:
|
||||
|
||||
# ==================== 枚举类型 ====================
|
||||
|
||||
|
||||
class TriggerType(Enum):
|
||||
"""触发类型枚举"""
|
||||
|
||||
TIME = "time" # 时间触发
|
||||
EVENT = "event" # 事件触发(通过 event_manager)
|
||||
CUSTOM = "custom" # 自定义条件触发
|
||||
@@ -70,6 +73,7 @@ class TriggerType(Enum):
|
||||
|
||||
class TaskStatus(Enum):
|
||||
"""任务状态枚举"""
|
||||
|
||||
PENDING = "pending" # 等待触发
|
||||
RUNNING = "running" # 正在执行
|
||||
COMPLETED = "completed" # 已完成
|
||||
@@ -81,9 +85,11 @@ class TaskStatus(Enum):
|
||||
|
||||
# ==================== 任务模型 ====================
|
||||
|
||||
|
||||
@dataclass
|
||||
class TaskExecution:
|
||||
"""任务执行记录"""
|
||||
|
||||
execution_id: str
|
||||
started_at: datetime
|
||||
ended_at: datetime | None = None
|
||||
@@ -176,10 +182,7 @@ class ScheduleTask:
|
||||
|
||||
def start_execution(self) -> TaskExecution:
|
||||
"""开始新的执行"""
|
||||
execution = TaskExecution(
|
||||
execution_id=str(uuid.uuid4()),
|
||||
started_at=datetime.now()
|
||||
)
|
||||
execution = TaskExecution(execution_id=str(uuid.uuid4()), started_at=datetime.now())
|
||||
self.current_execution = execution
|
||||
self.status = TaskStatus.RUNNING
|
||||
return execution
|
||||
@@ -218,6 +221,7 @@ class ScheduleTask:
|
||||
|
||||
# ==================== 死锁检测器(重构版)====================
|
||||
|
||||
|
||||
class DeadlockDetector:
|
||||
"""死锁检测器(重构版)
|
||||
|
||||
@@ -296,6 +300,7 @@ class DeadlockDetector:
|
||||
|
||||
# ==================== 统一调度器(完全重构版)====================
|
||||
|
||||
|
||||
class UnifiedScheduler:
|
||||
"""统一调度器(完全重构版)
|
||||
|
||||
@@ -367,22 +372,14 @@ class UnifiedScheduler:
|
||||
self._start_time = datetime.now()
|
||||
|
||||
# 启动后台任务
|
||||
self._check_loop_task = asyncio.create_task(
|
||||
self._check_loop(),
|
||||
name="scheduler_check_loop"
|
||||
)
|
||||
self._deadlock_check_task = asyncio.create_task(
|
||||
self._deadlock_check_loop(),
|
||||
name="scheduler_deadlock_check"
|
||||
)
|
||||
self._cleanup_task = asyncio.create_task(
|
||||
self._cleanup_loop(),
|
||||
name="scheduler_cleanup"
|
||||
)
|
||||
self._check_loop_task = asyncio.create_task(self._check_loop(), name="scheduler_check_loop")
|
||||
self._deadlock_check_task = asyncio.create_task(self._deadlock_check_loop(), name="scheduler_deadlock_check")
|
||||
self._cleanup_task = asyncio.create_task(self._cleanup_loop(), name="scheduler_cleanup")
|
||||
|
||||
# 注册到 event_manager
|
||||
try:
|
||||
from src.plugin_system.core.event_manager import event_manager
|
||||
|
||||
event_manager.register_scheduler_callback(self._handle_event_trigger)
|
||||
logger.debug("调度器已注册到 event_manager")
|
||||
except ImportError:
|
||||
@@ -416,6 +413,7 @@ class UnifiedScheduler:
|
||||
# 取消注册 event_manager
|
||||
try:
|
||||
from src.plugin_system.core.event_manager import event_manager
|
||||
|
||||
event_manager.unregister_scheduler_callback()
|
||||
logger.debug("调度器已从 event_manager 注销")
|
||||
except ImportError:
|
||||
@@ -426,9 +424,11 @@ class UnifiedScheduler:
|
||||
|
||||
# 显示最终统计
|
||||
stats = self.get_statistics()
|
||||
logger.info(f"调度器最终统计: 总任务={stats['total_tasks']}, "
|
||||
logger.info(
|
||||
f"调度器最终统计: 总任务={stats['total_tasks']}, "
|
||||
f"执行次数={stats['total_executions']}, "
|
||||
f"失败={stats['total_failures']}")
|
||||
f"失败={stats['total_failures']}"
|
||||
)
|
||||
|
||||
# 清理资源
|
||||
self._tasks.clear()
|
||||
@@ -442,8 +442,7 @@ class UnifiedScheduler:
|
||||
async def _cancel_all_running_tasks(self) -> None:
|
||||
"""取消所有正在运行的任务"""
|
||||
running_tasks = [
|
||||
task for task in self._tasks.values()
|
||||
if task.status == TaskStatus.RUNNING and task._asyncio_task
|
||||
task for task in self._tasks.values() if task.status == TaskStatus.RUNNING and task._asyncio_task
|
||||
]
|
||||
|
||||
if not running_tasks:
|
||||
@@ -458,15 +457,13 @@ class UnifiedScheduler:
|
||||
|
||||
# 第二阶段:等待取消完成(带超时)
|
||||
cancel_tasks = [
|
||||
task._asyncio_task for task in running_tasks
|
||||
if task._asyncio_task and not task._asyncio_task.done()
|
||||
task._asyncio_task for task in running_tasks if task._asyncio_task and not task._asyncio_task.done()
|
||||
]
|
||||
|
||||
if cancel_tasks:
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
asyncio.gather(*cancel_tasks, return_exceptions=True),
|
||||
timeout=self.config.shutdown_timeout
|
||||
asyncio.gather(*cancel_tasks, return_exceptions=True), timeout=self.config.shutdown_timeout
|
||||
)
|
||||
logger.info("所有任务已成功取消")
|
||||
except asyncio.TimeoutError:
|
||||
@@ -484,10 +481,7 @@ class UnifiedScheduler:
|
||||
|
||||
if not self._stopping:
|
||||
# 使用 create_task 避免阻塞循环
|
||||
asyncio.create_task(
|
||||
self._check_and_trigger_tasks(),
|
||||
name="check_trigger_tasks"
|
||||
)
|
||||
asyncio.create_task(self._check_and_trigger_tasks(), name="check_trigger_tasks")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("调度器主循环被取消")
|
||||
@@ -505,10 +499,7 @@ class UnifiedScheduler:
|
||||
|
||||
if not self._stopping:
|
||||
# 使用 create_task 避免阻塞循环,并限制错误传播
|
||||
asyncio.create_task(
|
||||
self._safe_check_and_handle_deadlocks(),
|
||||
name="deadlock_check"
|
||||
)
|
||||
asyncio.create_task(self._safe_check_and_handle_deadlocks(), name="deadlock_check")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("死锁检测循环被取消")
|
||||
@@ -624,10 +615,7 @@ class UnifiedScheduler:
|
||||
# 为每个任务创建独立的执行 Task
|
||||
execution_tasks = []
|
||||
for task in tasks:
|
||||
exec_task = asyncio.create_task(
|
||||
self._execute_task(task),
|
||||
name=f"exec_{task.task_name}"
|
||||
)
|
||||
exec_task = asyncio.create_task(self._execute_task(task), name=f"exec_{task.task_name}")
|
||||
task._asyncio_task = exec_task
|
||||
execution_tasks.append(exec_task)
|
||||
|
||||
@@ -647,16 +635,12 @@ class UnifiedScheduler:
|
||||
timeout = task.timeout or self.config.task_default_timeout
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self._run_callback(task),
|
||||
timeout=timeout
|
||||
)
|
||||
await asyncio.wait_for(self._run_callback(task), timeout=timeout)
|
||||
|
||||
# 执行成功
|
||||
task.finish_execution(success=True)
|
||||
self._total_executions += 1
|
||||
logger.debug(f"任务 {task.task_name} 执行成功 "
|
||||
f"(第{task.trigger_count}次)")
|
||||
logger.debug(f"任务 {task.task_name} 执行成功 (第{task.trigger_count}次)")
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# 任务超时
|
||||
@@ -683,8 +667,10 @@ class UnifiedScheduler:
|
||||
# 检查是否需要重试
|
||||
if self.config.enable_retry and task.retry_count < task.max_retries:
|
||||
task.retry_count += 1
|
||||
logger.info(f"任务 {task.task_name} 将在 {self.config.retry_delay}秒后重试 "
|
||||
f"({task.retry_count}/{task.max_retries})")
|
||||
logger.info(
|
||||
f"任务 {task.task_name} 将在 {self.config.retry_delay}秒后重试 "
|
||||
f"({task.retry_count}/{task.max_retries})"
|
||||
)
|
||||
await asyncio.sleep(self.config.retry_delay)
|
||||
task.status = TaskStatus.PENDING # 重置为待触发状态
|
||||
|
||||
@@ -706,8 +692,7 @@ class UnifiedScheduler:
|
||||
# 同步函数在线程池中运行,避免阻塞事件循环
|
||||
loop = asyncio.get_running_loop()
|
||||
result = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: task.callback(*task.callback_args, **task.callback_kwargs)
|
||||
None, lambda: task.callback(*task.callback_args, **task.callback_kwargs)
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
@@ -721,6 +706,7 @@ class UnifiedScheduler:
|
||||
else:
|
||||
# 返回一个空的上下文管理器
|
||||
from contextlib import nullcontext
|
||||
|
||||
return nullcontext()
|
||||
|
||||
async def _move_to_completed(self, task: ScheduleTask) -> None:
|
||||
@@ -769,8 +755,7 @@ class UnifiedScheduler:
|
||||
for task in tasks_to_trigger:
|
||||
# 将事件参数注入到回调
|
||||
exec_task = asyncio.create_task(
|
||||
self._execute_event_task(task, event_params),
|
||||
name=f"event_exec_{task.task_name}"
|
||||
self._execute_event_task(task, event_params), name=f"event_exec_{task.task_name}"
|
||||
)
|
||||
task._asyncio_task = exec_task
|
||||
execution_tasks.append(exec_task)
|
||||
@@ -792,18 +777,12 @@ class UnifiedScheduler:
|
||||
merged_kwargs = {**task.callback_kwargs, **event_params}
|
||||
|
||||
if asyncio.iscoroutinefunction(task.callback):
|
||||
await asyncio.wait_for(
|
||||
task.callback(*task.callback_args, **merged_kwargs),
|
||||
timeout=timeout
|
||||
)
|
||||
await asyncio.wait_for(task.callback(*task.callback_args, **merged_kwargs), timeout=timeout)
|
||||
else:
|
||||
loop = asyncio.get_running_loop()
|
||||
await asyncio.wait_for(
|
||||
loop.run_in_executor(
|
||||
None,
|
||||
lambda: task.callback(*task.callback_args, **merged_kwargs)
|
||||
),
|
||||
timeout=timeout
|
||||
loop.run_in_executor(None, lambda: task.callback(*task.callback_args, **merged_kwargs)),
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
task.finish_execution(success=True)
|
||||
@@ -863,10 +842,7 @@ class UnifiedScheduler:
|
||||
continue
|
||||
|
||||
health = self._deadlock_detector.get_health_score(task_id)
|
||||
logger.warning(
|
||||
f"任务 {task_name} 疑似死锁: "
|
||||
f"运行时间={runtime:.1f}秒, 健康度={health:.2f}"
|
||||
)
|
||||
logger.warning(f"任务 {task_name} 疑似死锁: 运行时间={runtime:.1f}秒, 健康度={health:.2f}")
|
||||
|
||||
# 尝试取消任务(每个取消操作独立处理错误)
|
||||
try:
|
||||
@@ -893,19 +869,16 @@ class UnifiedScheduler:
|
||||
for i, timeout in enumerate(timeouts):
|
||||
try:
|
||||
# 使用 asyncio.wait 代替 wait_for,避免重新抛出异常
|
||||
done, pending = await asyncio.wait(
|
||||
{task._asyncio_task},
|
||||
timeout=timeout
|
||||
)
|
||||
done, pending = await asyncio.wait({task._asyncio_task}, timeout=timeout)
|
||||
|
||||
if done:
|
||||
# 任务已完成(可能是正常完成或被取消)
|
||||
logger.debug(f"任务 {task.task_name} 在阶段 {i+1} 成功停止")
|
||||
logger.debug(f"任务 {task.task_name} 在阶段 {i + 1} 成功停止")
|
||||
return True
|
||||
|
||||
# 超时:继续下一阶段或放弃
|
||||
if i < len(timeouts) - 1:
|
||||
logger.warning(f"任务 {task.task_name} 取消阶段 {i+1} 超时,继续等待...")
|
||||
logger.warning(f"任务 {task.task_name} 取消阶段 {i + 1} 超时,继续等待...")
|
||||
continue
|
||||
else:
|
||||
logger.error(f"任务 {task.task_name} 取消失败,强制清理")
|
||||
@@ -927,8 +900,7 @@ class UnifiedScheduler:
|
||||
"""清理已完成的任务"""
|
||||
# 清理已完成的一次性任务
|
||||
completed_tasks = [
|
||||
task for task in self._tasks.values()
|
||||
if not task.is_recurring and task.status == TaskStatus.COMPLETED
|
||||
task for task in self._tasks.values() if not task.is_recurring and task.status == TaskStatus.COMPLETED
|
||||
]
|
||||
|
||||
for task in completed_tasks:
|
||||
@@ -1116,10 +1088,7 @@ class UnifiedScheduler:
|
||||
logger.info(f"强制触发任务: {task.task_name}")
|
||||
|
||||
# 创建执行任务
|
||||
exec_task = asyncio.create_task(
|
||||
self._execute_task(task),
|
||||
name=f"manual_trigger_{task.task_name}"
|
||||
)
|
||||
exec_task = asyncio.create_task(self._execute_task(task), name=f"manual_trigger_{task.task_name}")
|
||||
task._asyncio_task = exec_task
|
||||
|
||||
# 等待完成
|
||||
@@ -1274,11 +1243,13 @@ class UnifiedScheduler:
|
||||
runtime = 0.0
|
||||
if task.current_execution:
|
||||
runtime = (datetime.now() - task.current_execution.started_at).total_seconds()
|
||||
running_tasks_info.append({
|
||||
running_tasks_info.append(
|
||||
{
|
||||
"schedule_id": task.schedule_id[:8] + "...",
|
||||
"task_name": task.task_name,
|
||||
"runtime": runtime,
|
||||
})
|
||||
}
|
||||
)
|
||||
|
||||
return {
|
||||
"is_running": self._running,
|
||||
@@ -1316,6 +1287,7 @@ class UnifiedScheduler:
|
||||
# 全局调度器实例
|
||||
unified_scheduler = UnifiedScheduler()
|
||||
|
||||
|
||||
async def initialize_scheduler():
|
||||
"""初始化调度器
|
||||
|
||||
|
||||
Reference in New Issue
Block a user