diff --git a/bot.py b/bot.py index 7a4d1ed17..f38ca8ec0 100644 --- a/bot.py +++ b/bot.py @@ -109,12 +109,33 @@ async def graceful_shutdown(): # 停止所有异步任务 await async_task_manager.stop_and_wait_all_tasks() - tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] - for task in tasks: - task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) + # 获取所有剩余任务,排除当前任务 + remaining_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + + if remaining_tasks: + logger.info(f"正在取消 {len(remaining_tasks)} 个剩余任务...") + + # 取消所有剩余任务 + for task in remaining_tasks: + if not task.done(): + task.cancel() + + # 等待所有任务完成,设置超时 + try: + await asyncio.wait_for( + asyncio.gather(*remaining_tasks, return_exceptions=True), + timeout=15.0 + ) + logger.info("所有剩余任务已成功取消") + except asyncio.TimeoutError: + logger.warning("等待任务取消超时,强制继续关闭") + except Exception as e: + logger.error(f"等待任务取消时发生异常: {e}") + + logger.info("麦麦优雅关闭完成") + except Exception as e: - logger.error(f"麦麦关闭失败: {e}") + logger.error(f"麦麦关闭失败: {e}", exc_info=True) def check_eula(): diff --git a/src/chat/emoji_system/emoji_manager.py b/src/chat/emoji_system/emoji_manager.py index bd160135c..0fbe2f045 100644 --- a/src/chat/emoji_system/emoji_manager.py +++ b/src/chat/emoji_system/emoji_manager.py @@ -320,11 +320,11 @@ async def clear_temp_emoji() -> None: logger.info("[清理] 完成") -async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"]) -> None: +async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"],removed_count:int) -> int: """清理指定目录中未被 emoji_objects 追踪的表情包文件""" if not os.path.exists(emoji_dir): logger.warning(f"[清理] 目标目录不存在,跳过清理: {emoji_dir}") - return + return removed_count try: # 获取内存中所有有效表情包的完整路径集合 @@ -352,6 +352,8 @@ async def clean_unused_emojis(emoji_dir: str, emoji_objects: List["MaiEmoji"]) - logger.info(f"[清理] 在目录 {emoji_dir} 中清理了 {cleaned_count} 个破损表情包。") else: logger.info(f"[清理] 目录 {emoji_dir} 中没有需要清理的。") + + return removed_count + cleaned_count except Exception as e: logger.error(f"[错误] 清理未使用表情包文件时出错 ({emoji_dir}): {str(e)}") @@ -564,7 +566,7 @@ class EmojiManager: self.emoji_objects = [e for e in self.emoji_objects if e not in objects_to_remove] # 清理 EMOJI_REGISTED_DIR 目录中未被追踪的文件 - await clean_unused_emojis(EMOJI_REGISTED_DIR, self.emoji_objects) + removed_count = await clean_unused_emojis(EMOJI_REGISTED_DIR, self.emoji_objects,removed_count) # 输出清理结果 if removed_count > 0: diff --git a/src/chat/heart_flow/sub_heartflow.py b/src/chat/heart_flow/sub_heartflow.py index da7825c72..bb8e78581 100644 --- a/src/chat/heart_flow/sub_heartflow.py +++ b/src/chat/heart_flow/sub_heartflow.py @@ -77,10 +77,21 @@ class SubHeartflow: if self.normal_chat_instance: logger.info(f"{self.log_prefix} 离开normal模式") try: - await self.normal_chat_instance.stop_chat() # 调用 stop_chat + logger.debug(f"{self.log_prefix} 开始调用 stop_chat()") + # 添加超时保护,避免无限等待 + await asyncio.wait_for(self.normal_chat_instance.stop_chat(), timeout=10.0) + logger.debug(f"{self.log_prefix} stop_chat() 调用完成") + except asyncio.TimeoutError: + logger.warning(f"{self.log_prefix} 停止 NormalChat 超时,强制清理") + # 超时时强制清理 + self.normal_chat_instance = None except Exception as e: logger.error(f"{self.log_prefix} 停止 NormalChat 监控任务时出错: {e}") logger.error(traceback.format_exc()) + # 出错时也要清理实例 + self.normal_chat_instance = None + finally: + logger.debug(f"{self.log_prefix} _stop_normal_chat 完成") async def _start_normal_chat(self, rewind=False) -> bool: """ diff --git a/src/chat/normal_chat/normal_chat.py b/src/chat/normal_chat/normal_chat.py index 33824dcdb..5d271e714 100644 --- a/src/chat/normal_chat/normal_chat.py +++ b/src/chat/normal_chat/normal_chat.py @@ -161,58 +161,86 @@ class NormalChat: """ while True: try: - async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): - await asyncio.sleep(0.5) # 每秒检查一次 - # 检查任务是否已被取消 - if self._chat_task is None or self._chat_task.cancelled(): - logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出") - break + # 检查任务是否已被取消 - 移动到try块最开始 + if self._chat_task is None or self._chat_task.cancelled(): + logger.info(f"[{self.stream_name}] 兴趣监控任务被取消或置空,退出") + break + + # 检查是否已停用 + if self._disabled: + logger.info(f"[{self.stream_name}] 已停用,退出兴趣监控") + break - items_to_process = list(self.interest_dict.items()) - if not items_to_process: - continue + await asyncio.sleep(0.5) # 每0.5秒检查一次 + + # 再次检查取消状态 + if self._chat_task is None or self._chat_task.cancelled() or self._disabled: + logger.info(f"[{self.stream_name}] 检测到停止信号,退出") + break - # 并行处理兴趣消息 - async def process_single_message(msg_id, message, interest_value, is_mentioned): - """处理单个兴趣消息""" - try: - # 处理消息 - if time.time() - self.start_time > 300: - self.adjust_reply_frequency(duration=300 / 60) - else: - self.adjust_reply_frequency(duration=(time.time() - self.start_time) / 60) + items_to_process = list(self.interest_dict.items()) + if not items_to_process: + continue - # print(self.engaging_persons) + # 使用异步上下文管理器处理消息 + try: + async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()): + # 在上下文内部再次检查取消状态 + if self._chat_task is None or self._chat_task.cancelled() or self._disabled: + logger.info(f"[{self.stream_name}] 在处理上下文中检测到停止信号,退出") + break - await self.normal_response( - message=message, - is_mentioned=is_mentioned, - interested_rate=interest_value * self.willing_amplifier, - ) - except Exception as e: - logger.error( - f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}\n{traceback.format_exc()}" - ) - finally: - self.interest_dict.pop(msg_id, None) + # 并行处理兴趣消息 + async def process_single_message(msg_id, message, interest_value, is_mentioned): + """处理单个兴趣消息""" + try: + # 在处理每个消息前检查停止状态 + if self._disabled or (self._chat_task and self._chat_task.cancelled()): + return - # 创建并行任务列表 - tasks = [] - for msg_id, (message, interest_value, is_mentioned) in items_to_process: - task = process_single_message(msg_id, message, interest_value, is_mentioned) - tasks.append(task) + # 处理消息 + if time.time() - self.start_time > 300: + self.adjust_reply_frequency(duration=300 / 60) + else: + self.adjust_reply_frequency(duration=(time.time() - self.start_time) / 60) - # 并行执行所有任务,限制并发数量避免资源过度消耗 - if tasks: - # 使用信号量控制并发数,最多同时处理5个消息 - semaphore = asyncio.Semaphore(5) + await self.normal_response( + message=message, + is_mentioned=is_mentioned, + interested_rate=interest_value * self.willing_amplifier, + ) + except Exception as e: + logger.error( + f"[{self.stream_name}] 处理兴趣消息{msg_id}时出错: {e}\n{traceback.format_exc()}" + ) + finally: + self.interest_dict.pop(msg_id, None) - async def limited_process(task, sem): - async with sem: - await task + # 创建并行任务列表 + tasks = [] + for msg_id, (message, interest_value, is_mentioned) in items_to_process: + task = process_single_message(msg_id, message, interest_value, is_mentioned) + tasks.append(task) + + # 并行执行所有任务,限制并发数量避免资源过度消耗 + if tasks: + # 使用信号量控制并发数,最多同时处理5个消息 + semaphore = asyncio.Semaphore(5) + + async def limited_process(task, sem): + async with sem: + await task + + limited_tasks = [limited_process(task, semaphore) for task in tasks] + await asyncio.gather(*limited_tasks, return_exceptions=True) + + except asyncio.CancelledError: + logger.info(f"[{self.stream_name}] 处理上下文时任务被取消") + break + except Exception as e: + logger.error(f"[{self.stream_name}] 处理上下文时出错: {e}") + await asyncio.sleep(1) - limited_tasks = [limited_process(task, semaphore) for task in tasks] - await asyncio.gather(*limited_tasks, return_exceptions=True) except asyncio.CancelledError: logger.info(f"[{self.stream_name}] 兴趣监控任务被取消") break diff --git a/src/chat/utils/prompt_builder.py b/src/chat/utils/prompt_builder.py index cc9ee3e64..ebd0c0500 100644 --- a/src/chat/utils/prompt_builder.py +++ b/src/chat/utils/prompt_builder.py @@ -35,14 +35,21 @@ class PromptContext: """创建一个异步的临时提示模板作用域""" # 保存当前上下文并设置新上下文 if context_id is not None: - async with self._context_lock: - if context_id not in self._context_prompts: - self._context_prompts[context_id] = {} + try: + # 添加超时保护,避免长时间等待锁 + async with asyncio.wait_for(self._context_lock.acquire(), timeout=5.0): + if context_id not in self._context_prompts: + self._context_prompts[context_id] = {} + self._context_lock.release() + except asyncio.TimeoutError: + logger.warning(f"获取上下文锁超时,context_id: {context_id}") + # 超时时直接进入,不设置上下文 + context_id = None # 保存当前协程的上下文值,不影响其他协程 previous_context = self._current_context # 设置当前协程的新上下文 - token = self._current_context_var.set(context_id) + token = self._current_context_var.set(context_id) if context_id else None else: # 如果没有提供新上下文,保持当前上下文不变 previous_context = self._current_context @@ -51,12 +58,17 @@ class PromptContext: try: yield self finally: - # 恢复之前的上下文 - if context_id is not None: - if token: + # 恢复之前的上下文,添加异常保护 + if context_id is not None and token is not None: + try: self._current_context_var.reset(token) - else: - self._current_context = previous_context + except Exception as e: + logger.warning(f"恢复上下文时出错: {e}") + # 如果reset失败,尝试直接设置 + try: + self._current_context = previous_context + except Exception: + pass # 静默忽略恢复失败 async def get_prompt_async(self, name: str) -> Optional["Prompt"]: """异步获取当前作用域中的提示模板""" diff --git a/src/manager/async_task_manager.py b/src/manager/async_task_manager.py index 0a6a5e45f..7f7e29e05 100644 --- a/src/manager/async_task_manager.py +++ b/src/manager/async_task_manager.py @@ -90,8 +90,19 @@ class AsyncTaskManager: async with self._lock: # 由于可能需要await等待任务完成,所以需要加异步锁 if task.task_name in self.tasks: logger.warning(f"已存在名称为 '{task.task_name}' 的任务,正在尝试取消并替换") - self.tasks[task.task_name].cancel() # 取消已存在的任务 - await self.tasks[task.task_name] # 等待任务完成 + old_task = self.tasks[task.task_name] + old_task.cancel() # 取消已存在的任务 + + # 添加超时保护,避免无限等待 + try: + await asyncio.wait_for(old_task, timeout=5.0) + except asyncio.TimeoutError: + logger.warning(f"等待任务 '{task.task_name}' 完成超时") + except asyncio.CancelledError: + logger.info(f"任务 '{task.task_name}' 已成功取消") + except Exception as e: + logger.error(f"等待任务 '{task.task_name}' 完成时发生异常: {e}") + logger.info(f"成功结束任务 '{task.task_name}'") # 创建新任务 @@ -123,28 +134,65 @@ class AsyncTaskManager: async with self._lock: # 由于可能需要await等待任务完成,所以需要加异步锁 # 设置中止标志 self.abort_flag.set() + + # 首先收集所有任务的引用,避免在迭代过程中字典被修改 + task_items = list(self.tasks.items()) + # 取消所有任务 - for name, inst in self.tasks.items(): - try: - inst.cancel() - except asyncio.CancelledError: - logger.info(f"已取消任务 '{name}'") + for name, inst in task_items: + if not inst.done(): + try: + inst.cancel() + logger.debug(f"已请求取消任务 '{name}'") + except Exception as e: + logger.warning(f"取消任务 '{name}' 时发生异常: {e}") - # 等待所有任务完成 - for task_name, task_inst in self.tasks.items(): + # 等待所有任务完成,添加超时保护 + for task_name, task_inst in task_items: if not task_inst.done(): try: - await task_inst - except asyncio.CancelledError: # 此处再次捕获取消异常,防止stop_all_tasks()时延迟抛出异常 - logger.info(f"任务 {task_name} 已取消") + await asyncio.wait_for(task_inst, timeout=10.0) + logger.debug(f"任务 '{task_name}' 已完成") + except asyncio.TimeoutError: + logger.warning(f"等待任务 '{task_name}' 完成超时") + except asyncio.CancelledError: + logger.info(f"任务 '{task_name}' 已取消") except Exception as e: - logger.error(f"任务 {task_name} 执行时发生异常: {e}", ext_info=True) + logger.error(f"任务 '{task_name}' 执行时发生异常: {e}", exc_info=True) # 清空任务列表 self.tasks.clear() self.abort_flag.clear() logger.info("所有异步任务已停止") + def debug_task_status(self): + """ + 调试函数:打印所有任务的状态信息 + """ + logger.info("=== 异步任务状态调试信息 ===") + logger.info(f"当前管理的任务数量: {len(self.tasks)}") + logger.info(f"中止标志状态: {self.abort_flag.is_set()}") + + for task_name, task in self.tasks.items(): + status = [] + if task.done(): + status.append("已完成") + if task.cancelled(): + status.append("已取消") + elif task.exception(): + status.append(f"异常: {task.exception()}") + else: + status.append("正常完成") + else: + status.append("运行中") + + logger.info(f"任务 '{task_name}': {', '.join(status)}") + + # 检查所有asyncio任务 + all_tasks = asyncio.all_tasks() + logger.info(f"当前事件循环中的所有任务数量: {len(all_tasks)}") + logger.info("=== 调试信息结束 ===") + async_task_manager = AsyncTaskManager() """全局异步任务管理器实例"""