chore: 更新项目版本至0.12.0,添加aiofiles依赖;优化调度器任务执行逻辑,避免重复触发

This commit is contained in:
Windpicker-owo
2025-11-07 17:38:50 +08:00
parent de8d1db35f
commit ba30a52e32
4 changed files with 186 additions and 92 deletions

View File

@@ -93,13 +93,23 @@ class UnifiedScheduler:
"""
# 获取订阅该事件的所有任务(快速复制,减少锁持有时间)
async with self._lock:
event_tasks = [
task
for task in self._tasks.values()
if task.trigger_type == TriggerType.EVENT
and task.trigger_config.get("event_name") == event_name
and task.is_active
]
event_tasks = []
for task in self._tasks.values():
if (task.trigger_type == TriggerType.EVENT
and task.trigger_config.get("event_name") == event_name
and task.is_active):
# 检查事件任务是否已经在执行中,防止重复触发
if task.schedule_id in self._executing_tasks:
executing_task = self._executing_tasks[task.schedule_id]
if not executing_task.done():
logger.debug(f"[调度器] 事件任务 {task.task_name} 仍在执行中,跳过本次触发")
continue
else:
# 任务已完成但未清理,先清理
self._executing_tasks.pop(task.schedule_id, None)
event_tasks.append(task)
if not event_tasks:
logger.debug(f"[调度器] 事件 '{event_name}' 没有对应的调度任务")
@@ -107,33 +117,34 @@ class UnifiedScheduler:
logger.debug(f"[调度器] 事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务")
tasks_to_remove = []
# 在锁外执行回调,避免死锁
# 并发执行所有事件任务
execution_tasks = []
for task in event_tasks:
try:
logger.debug(f"[调度器] 执行事件任务: {task.task_name}")
execution_task = asyncio.create_task(
self._execute_event_task_callback(task, event_params),
name=f"execute_event_{task.task_name}"
)
execution_tasks.append(execution_task)
# 执行回调,传入事件参数
if event_params:
if asyncio.iscoroutinefunction(task.callback):
await task.callback(**event_params)
else:
task.callback(**event_params)
else:
await self._execute_callback(task)
# 追踪正在执行的任务
self._executing_tasks[task.schedule_id] = execution_task
task.last_triggered_at = datetime.now()
task.trigger_count += 1
# 等待所有任务完成
results = await asyncio.gather(*execution_tasks, return_exceptions=True)
# 如果不是循环任务,标记为删除
if not task.is_recurring:
tasks_to_remove.append(task.schedule_id)
# 清理执行追踪
for task in event_tasks:
self._executing_tasks.pop(task.schedule_id, None)
logger.debug(f"[调度器] 事件任务 {task.task_name} 执行完成")
except Exception as e:
logger.error(f"[调度器] 执行事件 '{event_name}' 的任务 {task.task_name} 时出错: {e}", exc_info=True)
# 收集需要移除的任务
tasks_to_remove = []
for task, result in zip(event_tasks, results):
if isinstance(result, Exception):
logger.error(f"[调度器] 执行事件任务 {task.task_name} 时发生错误: {result}", exc_info=result)
elif result is True and not task.is_recurring:
# 成功执行且是一次性任务,标记为删除
tasks_to_remove.append(task.schedule_id)
logger.debug(f"[调度器] 一次性事件任务 {task.task_name} 已完成,将被移除")
# 移除已完成的一次性任务
if tasks_to_remove:
@@ -204,7 +215,7 @@ class UnifiedScheduler:
while self._running:
try:
await asyncio.sleep(1)
await self._check_and_trigger_tasks()
asyncio.create_task(self._check_and_trigger_tasks())
except asyncio.CancelledError:
logger.debug("调度器检查循环被取消")
break
@@ -226,6 +237,16 @@ class UnifiedScheduler:
if not task.is_active:
continue
# 检查任务是否已经在执行中,防止重复触发
if schedule_id in self._executing_tasks:
executing_task = self._executing_tasks[schedule_id]
if not executing_task.done():
logger.debug(f"[调度器] 任务 {task.task_name} 仍在执行中,跳过本次触发")
continue
else:
# 任务已完成但未清理,先清理
self._executing_tasks.pop(schedule_id, None)
try:
should_trigger = await self._should_trigger_task(task, current_time)
if should_trigger:
@@ -298,6 +319,75 @@ class UnifiedScheduler:
logger.error(f"[调度器] 执行任务 {task.task_name} 时发生错误: {e}", exc_info=True)
return False
async def _execute_event_task_callback(self, task: ScheduleTask, event_params: dict[str, Any]) -> bool:
"""执行单个事件任务的回调(用于并发执行)
Args:
task: 要执行的任务
event_params: 事件参数
Returns:
bool: 执行是否成功
"""
try:
logger.debug(f"[调度器] 执行事件任务: {task.task_name}")
current_time = datetime.now()
# 执行回调,传入事件参数
if event_params:
if asyncio.iscoroutinefunction(task.callback):
await task.callback(**event_params)
else:
task.callback(**event_params)
else:
await self._execute_callback(task)
# 更新任务状态
task.last_triggered_at = current_time
task.trigger_count += 1
logger.debug(f"[调度器] 事件任务 {task.task_name} 执行完成")
return True
except Exception as e:
logger.error(f"[调度器] 执行事件任务 {task.task_name} 时发生错误: {e}", exc_info=True)
return False
async def _execute_trigger_task_callback(self, task: ScheduleTask) -> bool:
"""执行强制触发的任务回调
Args:
task: 要执行的任务
Returns:
bool: 执行是否成功
"""
try:
logger.debug(f"[调度器] 强制触发任务: {task.task_name}")
# 执行回调
await self._execute_callback(task)
# 更新任务状态
current_time = datetime.now()
task.last_triggered_at = current_time
task.trigger_count += 1
logger.debug(f"[调度器] 强制触发任务 {task.task_name} 执行完成")
# 如果不是循环任务,需要移除
if not task.is_recurring:
async with self._lock:
await self._remove_task_internal(task.schedule_id)
logger.debug(f"[调度器] 一次性任务 {task.task_name} 已完成并移除")
return True
except Exception as e:
logger.error(f"[调度器] 强制触发任务 {task.task_name} 时发生错误: {e}", exc_info=True)
return False
async def _should_trigger_task(self, task: ScheduleTask, current_time: datetime) -> bool:
"""判断任务是否应该触发"""
if task.trigger_type == TriggerType.TIME:
@@ -459,14 +549,32 @@ class UnifiedScheduler:
logger.warning(f"尝试触发已停用的任务: {task.task_name}")
return False
await self._execute_callback(task)
task.last_triggered_at = datetime.now()
task.trigger_count += 1
# 检查任务是否已经在执行中
if schedule_id in self._executing_tasks:
executing_task = self._executing_tasks[schedule_id]
if not executing_task.done():
logger.warning(f"任务 {task.task_name} 已在执行中,无法重复触发")
return False
else:
# 任务已完成但未清理,先清理
self._executing_tasks.pop(schedule_id, None)
if not task.is_recurring:
await self._remove_task_internal(schedule_id)
# 释放锁,在锁外执行任务
execution_task = asyncio.create_task(
self._execute_trigger_task_callback(task),
name=f"trigger_{task.task_name}"
)
return True
# 追踪执行任务
self._executing_tasks[schedule_id] = execution_task
# 在锁外等待任务完成
try:
result = await execution_task
return result
finally:
# 清理执行追踪
self._executing_tasks.pop(schedule_id, None)
async def pause_schedule(self, schedule_id: str) -> bool:
"""暂停任务(不删除)"""
@@ -527,6 +635,7 @@ class UnifiedScheduler:
total_tasks = len(self._tasks)
active_tasks = sum(1 for task in self._tasks.values() if task.is_active)
recurring_tasks = sum(1 for task in self._tasks.values() if task.is_recurring)
executing_tasks = sum(1 for task in self._executing_tasks.values() if not task.done())
tasks_by_type = {
TriggerType.TIME.value: 0,
@@ -544,6 +653,7 @@ class UnifiedScheduler:
"paused_tasks": total_tasks - active_tasks,
"recurring_tasks": recurring_tasks,
"one_time_tasks": total_tasks - recurring_tasks,
"executing_tasks": executing_tasks,
"tasks_by_type": tasks_by_type,
"registered_events": list(self._event_subscriptions),
}