refactor: 更新调度器以使用任务名称替代调度ID,增强任务管理和清理无效记录功能

This commit is contained in:
Windpicker-owo
2025-11-08 09:12:46 +08:00
parent 816bfdb8e0
commit 78a3a192bf
2 changed files with 187 additions and 25 deletions

View File

@@ -30,8 +30,8 @@ class SchedulerDispatcher:
""" """
def __init__(self): def __init__(self):
# 追踪每个流的 schedule_id # 追踪每个流的 task_name
self.stream_schedules: dict[str, str] = {} # stream_id -> schedule_id self.stream_schedules: dict[str, str] = {} # stream_id -> task_name
# Chatter 管理器 # Chatter 管理器
self.chatter_manager: ChatterManager | None = None self.chatter_manager: ChatterManager | None = None
@@ -67,12 +67,12 @@ class SchedulerDispatcher:
self.is_running = False self.is_running = False
# 取消所有活跃的 schedule # 取消所有活跃的 schedule
schedule_ids = list(self.stream_schedules.values()) task_names = list(self.stream_schedules.values())
for schedule_id in schedule_ids: for task_name in task_names:
try: try:
await unified_scheduler.remove_schedule(schedule_id) await unified_scheduler.remove_schedule_by_name(task_name)
except Exception as e: except Exception as e:
logger.error(f"移除 schedule {schedule_id} 失败: {e}") logger.error(f"移除 schedule {task_name} 失败: {e}")
self.stream_schedules.clear() self.stream_schedules.clear()
logger.info("基于 Scheduler 的消息分发器已停止") logger.info("基于 Scheduler 的消息分发器已停止")
@@ -100,7 +100,26 @@ class SchedulerDispatcher:
return return
# 2. 检查是否有活跃的 schedule # 2. 检查是否有活跃的 schedule
has_active_schedule = stream_id in self.stream_schedules task_name = self.stream_schedules.get(stream_id)
has_active_schedule = False
if task_name:
# 验证schedule是否真的还在scheduler中活跃
schedule_id = await unified_scheduler.find_schedule_by_name(task_name)
if schedule_id:
# 进一步检查任务是否正在执行或即将执行
task = await unified_scheduler.get_task_info(schedule_id)
if task and task['is_active']:
has_active_schedule = True
logger.debug(f"验证到活跃schedule: 流={stream_id[:8]}..., task={task_name}")
else:
logger.warning(f"发现不活跃的schedule记录将清理: 流={stream_id[:8]}..., task={task_name}")
# 清理无效记录
self.stream_schedules.pop(stream_id, None)
else:
logger.warning(f"发现无效的schedule记录将清理: 流={stream_id[:8]}..., task={task_name}")
# 清理无效记录
self.stream_schedules.pop(stream_id, None)
if not has_active_schedule: if not has_active_schedule:
# 4. 创建新的 schedule在锁内避免重复创建 # 4. 创建新的 schedule在锁内避免重复创建
@@ -220,18 +239,18 @@ class SchedulerDispatcher:
context: 流上下文 context: 流上下文
""" """
# 移除旧的 schedule # 移除旧的 schedule
old_schedule_id = self.stream_schedules.get(stream_id) old_task_name = self.stream_schedules.get(stream_id)
if old_schedule_id: if old_task_name:
success = await unified_scheduler.remove_schedule(old_schedule_id) success = await unified_scheduler.remove_schedule_by_name(old_task_name)
if success: if success:
logger.info(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., ID={old_schedule_id[:8]}...") logger.info(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., task={old_task_name}")
self.stats["total_schedules_cancelled"] += 1 self.stats["total_schedules_cancelled"] += 1
# 只有成功移除后才从追踪中删除 # 只有成功移除后才从追踪中删除
self.stream_schedules.pop(stream_id) self.stream_schedules.pop(stream_id)
else: else:
logger.error( logger.error(
f"❌ 打断失败:无法移除旧 schedule: 流={stream_id[:8]}..., " f"❌ 打断失败:无法移除旧 schedule: 流={stream_id[:8]}..., "
f"ID={old_schedule_id[:8]}..., 放弃创建新 schedule 避免重复" f"task={old_task_name}, 放弃创建新 schedule 避免重复"
) )
# 移除失败,不创建新 schedule避免重复 # 移除失败,不创建新 schedule避免重复
return return
@@ -250,12 +269,12 @@ class SchedulerDispatcher:
try: try:
# 检查是否已有活跃的 schedule如果有则先移除 # 检查是否已有活跃的 schedule如果有则先移除
if stream_id in self.stream_schedules: if stream_id in self.stream_schedules:
old_schedule_id = self.stream_schedules[stream_id] old_task_name = self.stream_schedules[stream_id]
logger.warning( logger.warning(
f"⚠️ 流 {stream_id[:8]}... 已有活跃 schedule {old_schedule_id[:8]}..., " f"⚠️ 流 {stream_id[:8]}... 已有活跃 schedule {old_task_name}, "
f"这不应该发生,将先移除旧 schedule" f"这不应该发生,将先移除旧 schedule"
) )
await unified_scheduler.remove_schedule(old_schedule_id) await unified_scheduler.remove_schedule_by_name(old_task_name)
del self.stream_schedules[stream_id] del self.stream_schedules[stream_id]
# 如果是即时处理模式打断时使用固定的1秒延迟立即重新处理 # 如果是即时处理模式打断时使用固定的1秒延迟立即重新处理
@@ -272,18 +291,22 @@ class SchedulerDispatcher:
# 获取未读消息数量用于日志 # 获取未读消息数量用于日志
unread_count = len(context.unread_messages) if context.unread_messages else 0 unread_count = len(context.unread_messages) if context.unread_messages else 0
# 创建 schedule # 生成任务名称 - 使用stream_id确保唯一性
task_name = f"dispatch_{stream_id}"
# 创建 schedule - 使用force_overwrite确保可以覆盖
schedule_id = await unified_scheduler.create_schedule( schedule_id = await unified_scheduler.create_schedule(
callback=self._on_schedule_triggered, callback=self._on_schedule_triggered,
trigger_type=TriggerType.TIME, trigger_type=TriggerType.TIME,
trigger_config={"delay_seconds": delay}, trigger_config={"delay_seconds": delay},
is_recurring=False, # 一次性任务,处理完后会创建新的 is_recurring=False, # 一次性任务,处理完后会创建新的
task_name=f"dispatch_{stream_id[:8]}", task_name=task_name,
callback_args=(stream_id,), callback_args=(stream_id,),
force_overwrite=True, # 允许覆盖同名任务
) )
# 追踪 schedule # 追踪 task_name
self.stream_schedules[stream_id] = schedule_id self.stream_schedules[stream_id] = task_name
self.stats["total_schedules_created"] += 1 self.stats["total_schedules_created"] += 1
mode_indicator = "⚡打断" if immediate_mode else "📅常规" mode_indicator = "⚡打断" if immediate_mode else "📅常规"
@@ -291,7 +314,7 @@ class SchedulerDispatcher:
logger.info( logger.info(
f"{mode_indicator} 创建 schedule: 流={stream_id[:8]}..., " f"{mode_indicator} 创建 schedule: 流={stream_id[:8]}..., "
f"延迟={delay:.3f}s, 未读={unread_count}, " f"延迟={delay:.3f}s, 未读={unread_count}, "
f"ID={schedule_id[:8]}..." f"task={task_name}, ID={schedule_id[:8]}..."
) )
except Exception as e: except Exception as e:
@@ -408,11 +431,11 @@ class SchedulerDispatcher:
stream_id: 流ID stream_id: 流ID
""" """
try: try:
old_schedule_id = self.stream_schedules.get(stream_id) task_name = self.stream_schedules.get(stream_id)
logger.info( logger.info(
f"⏰ Schedule 触发: 流={stream_id[:8]}..., " f"⏰ Schedule 触发: 流={stream_id[:8]}..., "
f"ID={old_schedule_id[:8] if old_schedule_id else 'None'}..., " f"task={task_name or 'None'}, "
f"开始处理消息" f"开始处理消息"
) )
@@ -435,7 +458,10 @@ class SchedulerDispatcher:
if not success: if not success:
self.stats["total_failures"] += 1 self.stats["total_failures"] += 1
self.stream_schedules.pop(stream_id, None) # 清理schedule记录 - 处理完成后总是清理本地记录
removed_task_name = self.stream_schedules.pop(stream_id, None)
if removed_task_name:
logger.debug(f"清理schedule记录: 流={stream_id[:8]}..., task={removed_task_name}")
# 检查缓存中是否有待处理的消息 # 检查缓存中是否有待处理的消息
from src.chat.message_manager.message_manager import message_manager from src.chat.message_manager.message_manager import message_manager
@@ -553,6 +579,36 @@ class SchedulerDispatcher:
except Exception as e: except Exception as e:
logger.warning(f"刷新聊天流 {stream_id} 能量失败: {e}") logger.warning(f"刷新聊天流 {stream_id} 能量失败: {e}")
async def cleanup_invalid_schedules(self) -> int:
"""清理无效的schedule记录
Returns:
int: 清理的记录数量
"""
cleaned_count = 0
invalid_streams = []
for stream_id, task_name in list(self.stream_schedules.items()):
# 验证schedule是否真的还在scheduler中活跃
schedule_id = await unified_scheduler.find_schedule_by_name(task_name)
if not schedule_id:
invalid_streams.append(stream_id)
continue
# 检查任务是否还在活跃状态
task = await unified_scheduler.get_task_info(schedule_id)
if not task or not task['is_active']:
invalid_streams.append(stream_id)
# 清理无效记录
for stream_id in invalid_streams:
task_name = self.stream_schedules.pop(stream_id, None)
if task_name:
logger.info(f"清理无效schedule记录: 流={stream_id[:8]}..., task={task_name}")
cleaned_count += 1
return cleaned_count
def get_statistics(self) -> dict[str, Any]: def get_statistics(self) -> dict[str, Any]:
"""获取统计信息""" """获取统计信息"""
uptime = time.time() - self.stats["start_time"] uptime = time.time() - self.stats["start_time"]

View File

@@ -487,8 +487,47 @@ class UnifiedScheduler:
task_name: str | None = None, task_name: str | None = None,
callback_args: tuple | None = None, callback_args: tuple | None = None,
callback_kwargs: dict | None = None, callback_kwargs: dict | None = None,
force_overwrite: bool = False,
) -> str: ) -> str:
"""创建调度任务(无锁设计)""" """创建调度任务(无锁设计)
Args:
callback: 回调函数
trigger_type: 触发类型
trigger_config: 触发配置
is_recurring: 是否循环任务
task_name: 任务名称,如果指定则检查是否已存在同名任务
callback_args: 回调函数位置参数
callback_kwargs: 回调函数关键字参数
force_overwrite: 如果同名任务已存在,是否强制覆盖
Returns:
str: 创建的schedule_id
Raises:
ValueError: 如果同名任务已存在且未启用强制覆盖
"""
# 检查任务名称是否已存在
if task_name is not None:
existing_task = None
existing_schedule_id = None
for sid, task in self._tasks.items():
if task.task_name == task_name and task.is_active:
existing_task = task
existing_schedule_id = sid
break
if existing_task is not None:
if force_overwrite:
logger.info(f"检测到同名活跃任务 '{task_name}',强制覆盖模式已启用,移除现有任务")
await self.remove_schedule(existing_schedule_id)
else:
raise ValueError(
f"任务名称 '{task_name}' 已存在活跃任务 (ID: {existing_schedule_id[:8]}...)。"
f"如需覆盖,请设置 force_overwrite=True"
)
schedule_id = str(uuid.uuid4()) schedule_id = str(uuid.uuid4())
task = ScheduleTask( task = ScheduleTask(
@@ -518,6 +557,34 @@ class UnifiedScheduler:
logger.debug(f"创建调度任务: {task.task_name}") logger.debug(f"创建调度任务: {task.task_name}")
return schedule_id return schedule_id
async def find_schedule_by_name(self, task_name: str) -> str | None:
"""根据任务名称查找schedule_id
Args:
task_name: 任务名称
Returns:
str | None: 找到的schedule_id如果不存在则返回None
"""
for schedule_id, task in self._tasks.items():
if task.task_name == task_name and task.is_active:
return schedule_id
return None
async def remove_schedule_by_name(self, task_name: str) -> bool:
"""根据任务名称移除调度任务
Args:
task_name: 任务名称
Returns:
bool: 是否成功移除
"""
schedule_id = await self.find_schedule_by_name(task_name)
if schedule_id:
return await self.remove_schedule(schedule_id)
return False
async def remove_schedule(self, schedule_id: str) -> bool: async def remove_schedule(self, schedule_id: str) -> bool:
"""移除调度任务(无锁设计) """移除调度任务(无锁设计)
@@ -551,6 +618,33 @@ class UnifiedScheduler:
logger.debug(f"移除调度任务: {task.task_name}") logger.debug(f"移除调度任务: {task.task_name}")
return True return True
def get_executing_task(self, schedule_id: str) -> asyncio.Task | None:
"""获取指定schedule_id的正在执行的任务
Args:
schedule_id: 调度任务ID
Returns:
asyncio.Task | None: 正在执行的任务如果不在执行中则返回None
"""
executing_task = self._executing_tasks.get(schedule_id)
if executing_task and not executing_task.done():
return executing_task
return None
def get_all_executing_tasks(self) -> dict[str, asyncio.Task]:
"""获取所有正在执行的任务
Returns:
dict[str, asyncio.Task]: schedule_id -> executing_task 的映射
"""
# 过滤出未完成的任务
return {
schedule_id: task
for schedule_id, task in self._executing_tasks.items()
if not task.done()
}
async def trigger_schedule(self, schedule_id: str) -> bool: async def trigger_schedule(self, schedule_id: str) -> bool:
"""强制触发指定任务(无锁设计)""" """强制触发指定任务(无锁设计)"""
# 获取任务信息 # 获取任务信息
@@ -656,6 +750,17 @@ class UnifiedScheduler:
for task in self._tasks.values(): for task in self._tasks.values():
tasks_by_type[task.trigger_type.value] += 1 tasks_by_type[task.trigger_type.value] += 1
# 获取正在执行的任务详细信息
executing_tasks_info = []
for schedule_id, executing_task in self._executing_tasks.items():
if not executing_task.done():
task = self._tasks.get(schedule_id)
executing_tasks_info.append({
"schedule_id": schedule_id[:8] + "...",
"task_name": task.task_name if task else "Unknown",
"task_obj_name": executing_task.get_name() if hasattr(executing_task, 'get_name') else str(executing_task),
})
return { return {
"is_running": self._running, "is_running": self._running,
"total_tasks": total_tasks, "total_tasks": total_tasks,
@@ -664,6 +769,7 @@ class UnifiedScheduler:
"recurring_tasks": recurring_tasks, "recurring_tasks": recurring_tasks,
"one_time_tasks": total_tasks - recurring_tasks, "one_time_tasks": total_tasks - recurring_tasks,
"executing_tasks": executing_tasks, "executing_tasks": executing_tasks,
"executing_tasks_info": executing_tasks_info,
"tasks_by_type": tasks_by_type, "tasks_by_type": tasks_by_type,
"registered_events": list(self._event_subscriptions), "registered_events": list(self._event_subscriptions),
} }