diff --git a/src/chat/message_manager/scheduler_dispatcher.py b/src/chat/message_manager/scheduler_dispatcher.py index e8c93136b..ccc237324 100644 --- a/src/chat/message_manager/scheduler_dispatcher.py +++ b/src/chat/message_manager/scheduler_dispatcher.py @@ -30,9 +30,9 @@ class SchedulerDispatcher: """ def __init__(self): - # 追踪每个流的 schedule_id - self.stream_schedules: dict[str, str] = {} # stream_id -> schedule_id - + # 追踪每个流的 task_name + self.stream_schedules: dict[str, str] = {} # stream_id -> task_name + # Chatter 管理器 self.chatter_manager: ChatterManager | None = None @@ -67,12 +67,12 @@ class SchedulerDispatcher: self.is_running = False # 取消所有活跃的 schedule - schedule_ids = list(self.stream_schedules.values()) - for schedule_id in schedule_ids: + task_names = list(self.stream_schedules.values()) + for task_name in task_names: try: - await unified_scheduler.remove_schedule(schedule_id) + await unified_scheduler.remove_schedule_by_name(task_name) except Exception as e: - logger.error(f"移除 schedule {schedule_id} 失败: {e}") + logger.error(f"移除 schedule {task_name} 失败: {e}") self.stream_schedules.clear() logger.info("基于 Scheduler 的消息分发器已停止") @@ -100,7 +100,26 @@ class SchedulerDispatcher: return # 2. 检查是否有活跃的 schedule - has_active_schedule = stream_id in self.stream_schedules + task_name = self.stream_schedules.get(stream_id) + has_active_schedule = False + + if task_name: + # 验证schedule是否真的还在scheduler中活跃 + schedule_id = await unified_scheduler.find_schedule_by_name(task_name) + if schedule_id: + # 进一步检查任务是否正在执行或即将执行 + task = await unified_scheduler.get_task_info(schedule_id) + if task and task['is_active']: + has_active_schedule = True + logger.debug(f"验证到活跃schedule: 流={stream_id[:8]}..., task={task_name}") + else: + logger.warning(f"发现不活跃的schedule记录,将清理: 流={stream_id[:8]}..., task={task_name}") + # 清理无效记录 + self.stream_schedules.pop(stream_id, None) + else: + logger.warning(f"发现无效的schedule记录,将清理: 流={stream_id[:8]}..., task={task_name}") + # 清理无效记录 + self.stream_schedules.pop(stream_id, None) if not has_active_schedule: # 4. 创建新的 schedule(在锁内,避免重复创建) @@ -220,18 +239,18 @@ class SchedulerDispatcher: context: 流上下文 """ # 移除旧的 schedule - old_schedule_id = self.stream_schedules.get(stream_id) - if old_schedule_id: - success = await unified_scheduler.remove_schedule(old_schedule_id) + old_task_name = self.stream_schedules.get(stream_id) + if old_task_name: + success = await unified_scheduler.remove_schedule_by_name(old_task_name) if success: - logger.info(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., ID={old_schedule_id[:8]}...") + logger.info(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., task={old_task_name}") self.stats["total_schedules_cancelled"] += 1 # 只有成功移除后才从追踪中删除 self.stream_schedules.pop(stream_id) else: logger.error( f"❌ 打断失败:无法移除旧 schedule: 流={stream_id[:8]}..., " - f"ID={old_schedule_id[:8]}..., 放弃创建新 schedule 避免重复" + f"task={old_task_name}, 放弃创建新 schedule 避免重复" ) # 移除失败,不创建新 schedule,避免重复 return @@ -250,12 +269,12 @@ class SchedulerDispatcher: try: # 检查是否已有活跃的 schedule,如果有则先移除 if stream_id in self.stream_schedules: - old_schedule_id = self.stream_schedules[stream_id] + old_task_name = self.stream_schedules[stream_id] logger.warning( - f"⚠️ 流 {stream_id[:8]}... 已有活跃 schedule {old_schedule_id[:8]}..., " + f"⚠️ 流 {stream_id[:8]}... 已有活跃 schedule {old_task_name}, " f"这不应该发生,将先移除旧 schedule" ) - await unified_scheduler.remove_schedule(old_schedule_id) + await unified_scheduler.remove_schedule_by_name(old_task_name) del self.stream_schedules[stream_id] # 如果是即时处理模式(打断时),使用固定的1秒延迟立即重新处理 @@ -272,18 +291,22 @@ class SchedulerDispatcher: # 获取未读消息数量用于日志 unread_count = len(context.unread_messages) if context.unread_messages else 0 - # 创建 schedule + # 生成任务名称 - 使用stream_id确保唯一性 + task_name = f"dispatch_{stream_id}" + + # 创建 schedule - 使用force_overwrite确保可以覆盖 schedule_id = await unified_scheduler.create_schedule( callback=self._on_schedule_triggered, trigger_type=TriggerType.TIME, trigger_config={"delay_seconds": delay}, is_recurring=False, # 一次性任务,处理完后会创建新的 - task_name=f"dispatch_{stream_id[:8]}", + task_name=task_name, callback_args=(stream_id,), + force_overwrite=True, # 允许覆盖同名任务 ) - # 追踪 schedule - self.stream_schedules[stream_id] = schedule_id + # 追踪 task_name + self.stream_schedules[stream_id] = task_name self.stats["total_schedules_created"] += 1 mode_indicator = "⚡打断" if immediate_mode else "📅常规" @@ -291,7 +314,7 @@ class SchedulerDispatcher: logger.info( f"{mode_indicator} 创建 schedule: 流={stream_id[:8]}..., " f"延迟={delay:.3f}s, 未读={unread_count}, " - f"ID={schedule_id[:8]}..." + f"task={task_name}, ID={schedule_id[:8]}..." ) except Exception as e: @@ -408,11 +431,11 @@ class SchedulerDispatcher: stream_id: 流ID """ try: - old_schedule_id = self.stream_schedules.get(stream_id) + task_name = self.stream_schedules.get(stream_id) logger.info( f"⏰ Schedule 触发: 流={stream_id[:8]}..., " - f"ID={old_schedule_id[:8] if old_schedule_id else 'None'}..., " + f"task={task_name or 'None'}, " f"开始处理消息" ) @@ -435,7 +458,10 @@ class SchedulerDispatcher: if not success: self.stats["total_failures"] += 1 - self.stream_schedules.pop(stream_id, None) + # 清理schedule记录 - 处理完成后总是清理本地记录 + removed_task_name = self.stream_schedules.pop(stream_id, None) + if removed_task_name: + logger.debug(f"清理schedule记录: 流={stream_id[:8]}..., task={removed_task_name}") # 检查缓存中是否有待处理的消息 from src.chat.message_manager.message_manager import message_manager @@ -553,6 +579,36 @@ class SchedulerDispatcher: except Exception as e: logger.warning(f"刷新聊天流 {stream_id} 能量失败: {e}") + async def cleanup_invalid_schedules(self) -> int: + """清理无效的schedule记录 + + Returns: + int: 清理的记录数量 + """ + cleaned_count = 0 + invalid_streams = [] + + for stream_id, task_name in list(self.stream_schedules.items()): + # 验证schedule是否真的还在scheduler中活跃 + schedule_id = await unified_scheduler.find_schedule_by_name(task_name) + if not schedule_id: + invalid_streams.append(stream_id) + continue + + # 检查任务是否还在活跃状态 + task = await unified_scheduler.get_task_info(schedule_id) + if not task or not task['is_active']: + invalid_streams.append(stream_id) + + # 清理无效记录 + for stream_id in invalid_streams: + task_name = self.stream_schedules.pop(stream_id, None) + if task_name: + logger.info(f"清理无效schedule记录: 流={stream_id[:8]}..., task={task_name}") + cleaned_count += 1 + + return cleaned_count + def get_statistics(self) -> dict[str, Any]: """获取统计信息""" uptime = time.time() - self.stats["start_time"] diff --git a/src/schedule/unified_scheduler.py b/src/schedule/unified_scheduler.py index 387f84484..517538ae3 100644 --- a/src/schedule/unified_scheduler.py +++ b/src/schedule/unified_scheduler.py @@ -487,8 +487,47 @@ class UnifiedScheduler: task_name: str | None = None, callback_args: tuple | None = None, callback_kwargs: dict | None = None, + force_overwrite: bool = False, ) -> str: - """创建调度任务(无锁设计)""" + """创建调度任务(无锁设计) + + Args: + callback: 回调函数 + trigger_type: 触发类型 + trigger_config: 触发配置 + is_recurring: 是否循环任务 + task_name: 任务名称,如果指定则检查是否已存在同名任务 + callback_args: 回调函数位置参数 + callback_kwargs: 回调函数关键字参数 + force_overwrite: 如果同名任务已存在,是否强制覆盖 + + Returns: + str: 创建的schedule_id + + Raises: + ValueError: 如果同名任务已存在且未启用强制覆盖 + """ + # 检查任务名称是否已存在 + if task_name is not None: + existing_task = None + existing_schedule_id = None + + for sid, task in self._tasks.items(): + if task.task_name == task_name and task.is_active: + existing_task = task + existing_schedule_id = sid + break + + if existing_task is not None: + if force_overwrite: + logger.info(f"检测到同名活跃任务 '{task_name}',强制覆盖模式已启用,移除现有任务") + await self.remove_schedule(existing_schedule_id) + else: + raise ValueError( + f"任务名称 '{task_name}' 已存在活跃任务 (ID: {existing_schedule_id[:8]}...)。" + f"如需覆盖,请设置 force_overwrite=True" + ) + schedule_id = str(uuid.uuid4()) task = ScheduleTask( @@ -518,6 +557,34 @@ class UnifiedScheduler: logger.debug(f"创建调度任务: {task.task_name}") return schedule_id + async def find_schedule_by_name(self, task_name: str) -> str | None: + """根据任务名称查找schedule_id + + Args: + task_name: 任务名称 + + Returns: + str | None: 找到的schedule_id,如果不存在则返回None + """ + for schedule_id, task in self._tasks.items(): + if task.task_name == task_name and task.is_active: + return schedule_id + return None + + async def remove_schedule_by_name(self, task_name: str) -> bool: + """根据任务名称移除调度任务 + + Args: + task_name: 任务名称 + + Returns: + bool: 是否成功移除 + """ + schedule_id = await self.find_schedule_by_name(task_name) + if schedule_id: + return await self.remove_schedule(schedule_id) + return False + async def remove_schedule(self, schedule_id: str) -> bool: """移除调度任务(无锁设计) @@ -551,6 +618,33 @@ class UnifiedScheduler: logger.debug(f"移除调度任务: {task.task_name}") return True + def get_executing_task(self, schedule_id: str) -> asyncio.Task | None: + """获取指定schedule_id的正在执行的任务 + + Args: + schedule_id: 调度任务ID + + Returns: + asyncio.Task | None: 正在执行的任务,如果不在执行中则返回None + """ + executing_task = self._executing_tasks.get(schedule_id) + if executing_task and not executing_task.done(): + return executing_task + return None + + def get_all_executing_tasks(self) -> dict[str, asyncio.Task]: + """获取所有正在执行的任务 + + Returns: + dict[str, asyncio.Task]: schedule_id -> executing_task 的映射 + """ + # 过滤出未完成的任务 + return { + schedule_id: task + for schedule_id, task in self._executing_tasks.items() + if not task.done() + } + async def trigger_schedule(self, schedule_id: str) -> bool: """强制触发指定任务(无锁设计)""" # 获取任务信息 @@ -656,6 +750,17 @@ class UnifiedScheduler: for task in self._tasks.values(): tasks_by_type[task.trigger_type.value] += 1 + # 获取正在执行的任务详细信息 + executing_tasks_info = [] + for schedule_id, executing_task in self._executing_tasks.items(): + if not executing_task.done(): + task = self._tasks.get(schedule_id) + executing_tasks_info.append({ + "schedule_id": schedule_id[:8] + "...", + "task_name": task.task_name if task else "Unknown", + "task_obj_name": executing_task.get_name() if hasattr(executing_task, 'get_name') else str(executing_task), + }) + return { "is_running": self._running, "total_tasks": total_tasks, @@ -664,6 +769,7 @@ class UnifiedScheduler: "recurring_tasks": recurring_tasks, "one_time_tasks": total_tasks - recurring_tasks, "executing_tasks": executing_tasks, + "executing_tasks_info": executing_tasks_info, "tasks_by_type": tasks_by_type, "registered_events": list(self._event_subscriptions), }