From ba30a52e32979091b4ae1fd328602107f9ef752b Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Fri, 7 Nov 2025 17:38:50 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20=E6=9B=B4=E6=96=B0=E9=A1=B9=E7=9B=AE?= =?UTF-8?q?=E7=89=88=E6=9C=AC=E8=87=B30.12.0=EF=BC=8C=E6=B7=BB=E5=8A=A0aio?= =?UTF-8?q?files=E4=BE=9D=E8=B5=96=EF=BC=9B=E4=BC=98=E5=8C=96=E8=B0=83?= =?UTF-8?q?=E5=BA=A6=E5=99=A8=E4=BB=BB=E5=8A=A1=E6=89=A7=E8=A1=8C=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=8C=E9=81=BF=E5=85=8D=E9=87=8D=E5=A4=8D=E8=A7=A6?= =?UTF-8?q?=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pyproject.toml | 3 +- requirements.txt | 1 + .../message_manager/scheduler_dispatcher.py | 92 ++++----- src/schedule/unified_scheduler.py | 182 ++++++++++++++---- 4 files changed, 186 insertions(+), 92 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7b0a544a9..888fe1ad0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,11 +1,12 @@ [project] name = "MoFox-Bot" -version = "0.8.1" +version = "0.12.0" description = "MoFox-Bot 是一个基于大语言模型的可交互智能体" requires-python = ">=3.11,<=3.13" dependencies = [ "aiohttp>=3.12.14", "aiohttp-cors>=0.8.1", + "aiofiles>=23.1.0", "apscheduler>=3.11.0", "asyncio>=4.0.0", "beautifulsoup4>=4.13.4", diff --git a/requirements.txt b/requirements.txt index a2dcb7b81..8c2c37c1e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ sqlalchemy aiosqlite +aiofiles aiomysql APScheduler aiohttp diff --git a/src/chat/message_manager/scheduler_dispatcher.py b/src/chat/message_manager/scheduler_dispatcher.py index 5953c958d..392da2212 100644 --- a/src/chat/message_manager/scheduler_dispatcher.py +++ b/src/chat/message_manager/scheduler_dispatcher.py @@ -109,18 +109,14 @@ class SchedulerDispatcher: return # 2. 检查是否有活跃的 schedule - async with self._get_schedule_lock(stream_id): - has_active_schedule = stream_id in self.stream_schedules - - if has_active_schedule: - # 释放锁后再做打断检查(避免长时间持有锁) - pass - else: - # 4. 创建新的 schedule(在锁内,避免重复创建) - await self._create_schedule(stream_id, context) - return + has_active_schedule = stream_id in self.stream_schedules - # 3. 检查打断判定(锁外执行,避免阻塞) + if not has_active_schedule: + # 4. 创建新的 schedule(在锁内,避免重复创建) + await self._create_schedule(stream_id, context) + return + + # 3. 检查打断判定 if has_active_schedule: should_interrupt = await self._check_interruption(stream_id, context) @@ -232,17 +228,15 @@ class SchedulerDispatcher: stream_id: 流ID context: 流上下文 """ - # 使用锁保护,避免与 _on_schedule_triggered 冲突 - async with self._get_schedule_lock(stream_id): - # 移除旧的 schedule - old_schedule_id = self.stream_schedules.get(stream_id) + # 移除旧的 schedule + old_schedule_id = self.stream_schedules.get(stream_id) if old_schedule_id: success = await unified_scheduler.remove_schedule(old_schedule_id) if success: logger.info(f"🔄 已移除旧 schedule 并准备重建: 流={stream_id[:8]}..., ID={old_schedule_id[:8]}...") self.stats["total_schedules_cancelled"] += 1 # 只有成功移除后才从追踪中删除 - del self.stream_schedules[stream_id] + self.stream_schedules.pop(stream_id) else: logger.error( f"❌ 打断失败:无法移除旧 schedule: 流={stream_id[:8]}..., " @@ -303,18 +297,10 @@ class SchedulerDispatcher: mode_indicator = "⚡打断" if immediate_mode else "📅常规" - # 获取调用栈信息,帮助追踪重复创建的问题 - import traceback - caller_info = "" - stack = traceback.extract_stack() - if len(stack) >= 2: - caller_frame = stack[-2] - caller_info = f", 调用自={caller_frame.name}" - logger.info( f"{mode_indicator} 创建 schedule: 流={stream_id[:8]}..., " f"延迟={delay:.3f}s, 未读={unread_count}, " - f"ID={schedule_id[:8]}...{caller_info}" + f"ID={schedule_id[:8]}..." ) except Exception as e: @@ -431,10 +417,8 @@ class SchedulerDispatcher: stream_id: 流ID """ try: - # 使用锁保护,避免与打断逻辑冲突 - async with self._get_schedule_lock(stream_id): - # 从追踪中移除(因为是一次性任务) - old_schedule_id = self.stream_schedules.pop(stream_id, None) + # 从追踪中移除(因为是一次性任务) + old_schedule_id = self.stream_schedules.pop(stream_id, None) logger.info( f"⏰ Schedule 触发: 流={stream_id[:8]}..., " @@ -462,33 +446,31 @@ class SchedulerDispatcher: self.stats["total_failures"] += 1 # 处理完成后,检查是否需要创建新的 schedule - async with self._get_schedule_lock(stream_id): - # 检查是否已有 schedule(可能在处理期间被打断创建了新的) - if stream_id in self.stream_schedules: - logger.info( - f"⚠️ 处理完成时发现已有新 schedule: 流={stream_id[:8]}..., " - f"可能是打断创建的,跳过创建新 schedule" - ) - return + if stream_id in self.stream_schedules: + logger.info( + f"⚠️ 处理完成时发现已有新 schedule: 流={stream_id[:8]}..., " + f"可能是打断创建的,跳过创建新 schedule" + ) + return - # 检查缓存中是否有待处理的消息 - from src.chat.message_manager.message_manager import message_manager - - has_cached = message_manager.has_cached_messages(stream_id) - - if has_cached: - # 有缓存消息,立即创建新 schedule 继续处理 - logger.info( - f"🔁 处理完成但有缓存消息: 流={stream_id[:8]}..., " - f"立即创建新 schedule 继续处理" - ) - await self._create_schedule(stream_id, context) - else: - # 没有缓存消息,不创建 schedule,等待新消息到达 - logger.debug( - f"✅ 处理完成且无缓存消息: 流={stream_id[:8]}..., " - f"等待新消息到达" - ) + # 检查缓存中是否有待处理的消息 + from src.chat.message_manager.message_manager import message_manager + + has_cached = message_manager.has_cached_messages(stream_id) + + if has_cached: + # 有缓存消息,立即创建新 schedule 继续处理 + logger.info( + f"🔁 处理完成但有缓存消息: 流={stream_id[:8]}..., " + f"立即创建新 schedule 继续处理" + ) + await self._create_schedule(stream_id, context) + else: + # 没有缓存消息,不创建 schedule,等待新消息到达 + logger.debug( + f"✅ 处理完成且无缓存消息: 流={stream_id[:8]}..., " + f"等待新消息到达" + ) except Exception as e: logger.error(f"Schedule 回调执行失败 {stream_id}: {e}", exc_info=True) diff --git a/src/schedule/unified_scheduler.py b/src/schedule/unified_scheduler.py index cae135d02..10b740f59 100644 --- a/src/schedule/unified_scheduler.py +++ b/src/schedule/unified_scheduler.py @@ -93,13 +93,23 @@ class UnifiedScheduler: """ # 获取订阅该事件的所有任务(快速复制,减少锁持有时间) async with self._lock: - event_tasks = [ - task - for task in self._tasks.values() - if task.trigger_type == TriggerType.EVENT - and task.trigger_config.get("event_name") == event_name - and task.is_active - ] + event_tasks = [] + for task in self._tasks.values(): + if (task.trigger_type == TriggerType.EVENT + and task.trigger_config.get("event_name") == event_name + and task.is_active): + + # 检查事件任务是否已经在执行中,防止重复触发 + if task.schedule_id in self._executing_tasks: + executing_task = self._executing_tasks[task.schedule_id] + if not executing_task.done(): + logger.debug(f"[调度器] 事件任务 {task.task_name} 仍在执行中,跳过本次触发") + continue + else: + # 任务已完成但未清理,先清理 + self._executing_tasks.pop(task.schedule_id, None) + + event_tasks.append(task) if not event_tasks: logger.debug(f"[调度器] 事件 '{event_name}' 没有对应的调度任务") @@ -107,33 +117,34 @@ class UnifiedScheduler: logger.debug(f"[调度器] 事件 '{event_name}' 触发,共有 {len(event_tasks)} 个调度任务") - tasks_to_remove = [] - - # 在锁外执行回调,避免死锁 + # 并发执行所有事件任务 + execution_tasks = [] for task in event_tasks: - try: - logger.debug(f"[调度器] 执行事件任务: {task.task_name}") + execution_task = asyncio.create_task( + self._execute_event_task_callback(task, event_params), + name=f"execute_event_{task.task_name}" + ) + execution_tasks.append(execution_task) - # 执行回调,传入事件参数 - if event_params: - if asyncio.iscoroutinefunction(task.callback): - await task.callback(**event_params) - else: - task.callback(**event_params) - else: - await self._execute_callback(task) + # 追踪正在执行的任务 + self._executing_tasks[task.schedule_id] = execution_task - task.last_triggered_at = datetime.now() - task.trigger_count += 1 + # 等待所有任务完成 + results = await asyncio.gather(*execution_tasks, return_exceptions=True) - # 如果不是循环任务,标记为删除 - if not task.is_recurring: - tasks_to_remove.append(task.schedule_id) + # 清理执行追踪 + for task in event_tasks: + self._executing_tasks.pop(task.schedule_id, None) - logger.debug(f"[调度器] 事件任务 {task.task_name} 执行完成") - - except Exception as e: - logger.error(f"[调度器] 执行事件 '{event_name}' 的任务 {task.task_name} 时出错: {e}", exc_info=True) + # 收集需要移除的任务 + tasks_to_remove = [] + for task, result in zip(event_tasks, results): + if isinstance(result, Exception): + logger.error(f"[调度器] 执行事件任务 {task.task_name} 时发生错误: {result}", exc_info=result) + elif result is True and not task.is_recurring: + # 成功执行且是一次性任务,标记为删除 + tasks_to_remove.append(task.schedule_id) + logger.debug(f"[调度器] 一次性事件任务 {task.task_name} 已完成,将被移除") # 移除已完成的一次性任务 if tasks_to_remove: @@ -204,7 +215,7 @@ class UnifiedScheduler: while self._running: try: await asyncio.sleep(1) - await self._check_and_trigger_tasks() + asyncio.create_task(self._check_and_trigger_tasks()) except asyncio.CancelledError: logger.debug("调度器检查循环被取消") break @@ -226,6 +237,16 @@ class UnifiedScheduler: if not task.is_active: continue + # 检查任务是否已经在执行中,防止重复触发 + if schedule_id in self._executing_tasks: + executing_task = self._executing_tasks[schedule_id] + if not executing_task.done(): + logger.debug(f"[调度器] 任务 {task.task_name} 仍在执行中,跳过本次触发") + continue + else: + # 任务已完成但未清理,先清理 + self._executing_tasks.pop(schedule_id, None) + try: should_trigger = await self._should_trigger_task(task, current_time) if should_trigger: @@ -298,6 +319,75 @@ class UnifiedScheduler: logger.error(f"[调度器] 执行任务 {task.task_name} 时发生错误: {e}", exc_info=True) return False + async def _execute_event_task_callback(self, task: ScheduleTask, event_params: dict[str, Any]) -> bool: + """执行单个事件任务的回调(用于并发执行) + + Args: + task: 要执行的任务 + event_params: 事件参数 + + Returns: + bool: 执行是否成功 + """ + try: + logger.debug(f"[调度器] 执行事件任务: {task.task_name}") + + current_time = datetime.now() + + # 执行回调,传入事件参数 + if event_params: + if asyncio.iscoroutinefunction(task.callback): + await task.callback(**event_params) + else: + task.callback(**event_params) + else: + await self._execute_callback(task) + + # 更新任务状态 + task.last_triggered_at = current_time + task.trigger_count += 1 + + logger.debug(f"[调度器] 事件任务 {task.task_name} 执行完成") + return True + + except Exception as e: + logger.error(f"[调度器] 执行事件任务 {task.task_name} 时发生错误: {e}", exc_info=True) + return False + + async def _execute_trigger_task_callback(self, task: ScheduleTask) -> bool: + """执行强制触发的任务回调 + + Args: + task: 要执行的任务 + + Returns: + bool: 执行是否成功 + """ + try: + logger.debug(f"[调度器] 强制触发任务: {task.task_name}") + + # 执行回调 + await self._execute_callback(task) + + # 更新任务状态 + current_time = datetime.now() + task.last_triggered_at = current_time + task.trigger_count += 1 + + logger.debug(f"[调度器] 强制触发任务 {task.task_name} 执行完成") + + # 如果不是循环任务,需要移除 + if not task.is_recurring: + async with self._lock: + await self._remove_task_internal(task.schedule_id) + logger.debug(f"[调度器] 一次性任务 {task.task_name} 已完成并移除") + + return True + + except Exception as e: + logger.error(f"[调度器] 强制触发任务 {task.task_name} 时发生错误: {e}", exc_info=True) + return False + async def _should_trigger_task(self, task: ScheduleTask, current_time: datetime) -> bool: """判断任务是否应该触发""" if task.trigger_type == TriggerType.TIME: @@ -459,14 +549,32 @@ class UnifiedScheduler: logger.warning(f"尝试触发已停用的任务: {task.task_name}") return False - await self._execute_callback(task) - task.last_triggered_at = datetime.now() - task.trigger_count += 1 + # 检查任务是否已经在执行中 + if schedule_id in self._executing_tasks: + executing_task = self._executing_tasks[schedule_id] + if not executing_task.done(): + logger.warning(f"任务 {task.task_name} 已在执行中,无法重复触发") + return False + else: + # 任务已完成但未清理,先清理 + self._executing_tasks.pop(schedule_id, None) - if not task.is_recurring: - await self._remove_task_internal(schedule_id) + # 释放锁,在锁外执行任务 + execution_task = asyncio.create_task( + self._execute_trigger_task_callback(task), + name=f"trigger_{task.task_name}" + ) - return True + # 追踪执行任务 + self._executing_tasks[schedule_id] = execution_task + + # 在锁外等待任务完成 + try: + result = await execution_task + return result + finally: + # 清理执行追踪 + self._executing_tasks.pop(schedule_id, None) async def pause_schedule(self, schedule_id: str) -> bool: """暂停任务(不删除)""" @@ -527,6 +635,7 @@ class UnifiedScheduler: total_tasks = len(self._tasks) active_tasks = sum(1 for task in self._tasks.values() if task.is_active) recurring_tasks = sum(1 for task in self._tasks.values() if task.is_recurring) + executing_tasks = sum(1 for task in self._executing_tasks.values() if not task.done()) tasks_by_type = { TriggerType.TIME.value: 0, @@ -544,6 +653,7 @@ class UnifiedScheduler: "paused_tasks": total_tasks - active_tasks, "recurring_tasks": recurring_tasks, "one_time_tasks": total_tasks - recurring_tasks, + "executing_tasks": executing_tasks, "tasks_by_type": tasks_by_type, "registered_events": list(self._event_subscriptions), }