refactor(event): 优化AFTER_SEND事件的异步触发逻辑并增强日志记录
This commit is contained in:
@@ -85,8 +85,11 @@ class UnifiedScheduler:
|
||||
"""处理来自 event_manager 的事件通知
|
||||
|
||||
此方法由 event_manager 在触发事件时直接调用
|
||||
|
||||
注意:此方法不能在持有 self._lock 的情况下调用,
|
||||
否则会导致死锁(因为回调可能再次触发事件)
|
||||
"""
|
||||
# 获取订阅该事件的所有任务
|
||||
# 获取订阅该事件的所有任务(快速复制,减少锁持有时间)
|
||||
async with self._lock:
|
||||
event_tasks = [
|
||||
task
|
||||
@@ -97,15 +100,18 @@ class UnifiedScheduler:
|
||||
]
|
||||
|
||||
if not event_tasks:
|
||||
logger.debug(f"事件 '{event_name}' 没有对应的调度任务")
|
||||
logger.debug(f"[调度器] 事件 '{event_name}' 没有对应的调度任务")
|
||||
return
|
||||
|
||||
logger.info(f"事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务")
|
||||
logger.info(f"[调度器] 事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务")
|
||||
|
||||
tasks_to_remove = []
|
||||
|
||||
# 在锁外执行回调,避免死锁
|
||||
for task in event_tasks:
|
||||
try:
|
||||
logger.debug(f"[调度器] 执行事件任务: {task.task_name}")
|
||||
|
||||
# 执行回调,传入事件参数
|
||||
if event_params:
|
||||
if asyncio.iscoroutinefunction(task.callback):
|
||||
@@ -121,14 +127,17 @@ class UnifiedScheduler:
|
||||
# 如果不是循环任务,标记为删除
|
||||
if not task.is_recurring:
|
||||
tasks_to_remove.append(task.schedule_id)
|
||||
|
||||
logger.debug(f"[调度器] 事件任务 {task.task_name} 执行完成")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"执行事件 '{event_name}' 的任务 {task.task_name} 时出错: {e}", exc_info=True)
|
||||
logger.error(f"[调度器] 执行事件 '{event_name}' 的任务 {task.task_name} 时出错: {e}", exc_info=True)
|
||||
|
||||
# 移除已完成的一次性任务
|
||||
async with self._lock:
|
||||
for schedule_id in tasks_to_remove:
|
||||
await self._remove_task_internal(schedule_id)
|
||||
if tasks_to_remove:
|
||||
async with self._lock:
|
||||
for schedule_id in tasks_to_remove:
|
||||
await self._remove_task_internal(schedule_id)
|
||||
|
||||
async def start(self):
|
||||
"""启动调度器"""
|
||||
@@ -190,37 +199,54 @@ class UnifiedScheduler:
|
||||
logger.error(f"调度器检查循环发生错误: {e}", exc_info=True)
|
||||
|
||||
async def _check_and_trigger_tasks(self):
|
||||
"""检查并触发到期任务"""
|
||||
"""检查并触发到期任务
|
||||
|
||||
注意:为了避免死锁,回调执行必须在锁外进行
|
||||
"""
|
||||
current_time = datetime.now()
|
||||
|
||||
# 第一阶段:在锁内快速收集需要触发的任务
|
||||
async with self._lock:
|
||||
tasks_to_remove = []
|
||||
current_time = datetime.now()
|
||||
|
||||
tasks_to_trigger = []
|
||||
|
||||
for schedule_id, task in list(self._tasks.items()):
|
||||
if not task.is_active:
|
||||
continue
|
||||
|
||||
try:
|
||||
should_trigger = await self._should_trigger_task(task, current_time)
|
||||
|
||||
if should_trigger:
|
||||
# 执行回调
|
||||
await self._execute_callback(task)
|
||||
|
||||
# 更新任务状态
|
||||
task.last_triggered_at = current_time
|
||||
task.trigger_count += 1
|
||||
|
||||
# 如果不是循环任务,标记为删除
|
||||
if not task.is_recurring:
|
||||
tasks_to_remove.append(schedule_id)
|
||||
logger.info(f"一次性任务 {task.task_name} 已完成,将被移除")
|
||||
|
||||
tasks_to_trigger.append(task)
|
||||
except Exception as e:
|
||||
logger.error(f"检查任务 {task.task_name} 时发生错误: {e}", exc_info=True)
|
||||
|
||||
# 第二阶段:在锁外执行回调(避免死锁)
|
||||
tasks_to_remove = []
|
||||
|
||||
for task in tasks_to_trigger:
|
||||
try:
|
||||
logger.debug(f"[调度器] 触发定时任务: {task.task_name}")
|
||||
|
||||
# 执行回调
|
||||
await self._execute_callback(task)
|
||||
|
||||
# 移除已完成的一次性任务
|
||||
for schedule_id in tasks_to_remove:
|
||||
await self._remove_task_internal(schedule_id)
|
||||
# 更新任务状态
|
||||
task.last_triggered_at = current_time
|
||||
task.trigger_count += 1
|
||||
|
||||
# 如果不是循环任务,标记为删除
|
||||
if not task.is_recurring:
|
||||
tasks_to_remove.append(task.schedule_id)
|
||||
logger.info(f"[调度器] 一次性任务 {task.task_name} 已完成,将被移除")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[调度器] 执行任务 {task.task_name} 时发生错误: {e}", exc_info=True)
|
||||
|
||||
# 第三阶段:在锁内移除已完成的任务
|
||||
if tasks_to_remove:
|
||||
async with self._lock:
|
||||
for schedule_id in tasks_to_remove:
|
||||
await self._remove_task_internal(schedule_id)
|
||||
|
||||
async def _should_trigger_task(self, task: ScheduleTask, current_time: datetime) -> bool:
|
||||
"""判断任务是否应该触发"""
|
||||
|
||||
Reference in New Issue
Block a user