diff --git a/bot.py b/bot.py index d2b9f4b3e..b549f121b 100644 --- a/bot.py +++ b/bot.py @@ -5,311 +5,498 @@ import platform import sys import time import traceback +from contextlib import asynccontextmanager from pathlib import Path +# 初始化基础工具 from colorama import Fore, init -from dotenv import load_dotenv # 处理.env文件 +from dotenv import load_dotenv from rich.traceback import install -# maim_message imports for console input -# 最早期初始化日志系统,确保所有后续模块都使用正确的日志格式 +# 初始化日志系统 from src.common.logger import get_logger, initialize_logging, shutdown_logging -# UI日志适配器 +# 初始化日志和错误显示 initialize_logging() - -from src.main import MainSystem # noqa -from src import BaseMain -from src.manager.async_task_manager import async_task_manager -from src.chat.knowledge.knowledge_lib import initialize_lpmm_knowledge -from src.config.config import global_config -from src.common.database.database import initialize_sql_database -from src.common.database.sqlalchemy_models import initialize_database as init_db - logger = get_logger("main") - install(extra_lines=3) +# 常量定义 +SUPPORTED_DATABASES = ["sqlite", "mysql", "postgresql"] +SHUTDOWN_TIMEOUT = 10.0 +EULA_CHECK_INTERVAL = 2 +MAX_EULA_CHECK_ATTEMPTS = 30 +MAX_ENV_FILE_SIZE = 1024 * 1024 # 1MB限制 + # 设置工作目录为脚本所在目录 script_dir = os.path.dirname(os.path.abspath(__file__)) os.chdir(script_dir) -logger.info(f"已设置工作目录为: {script_dir}") +logger.info("工作目录已设置") +class ConfigManager: + """配置管理器""" -# 检查并创建.env文件 -def ensure_env_file(): - """确保.env文件存在,如果不存在则从模板创建""" - env_file = Path(".env") - template_env = Path("template/template.env") + @staticmethod + def ensure_env_file(): + """确保.env文件存在,如果不存在则从模板创建""" + env_file = Path(".env") + template_env = Path("template/template.env") - if not env_file.exists(): - if template_env.exists(): - logger.info("未找到.env文件,正在从模板创建...") - import shutil + if not env_file.exists(): + if template_env.exists(): + logger.info("未找到.env文件,正在从模板创建...") + try: + env_file.write_text(template_env.read_text(encoding="utf-8"), encoding="utf-8") + logger.info("已从template/template.env创建.env文件") + logger.warning("请编辑.env文件,将EULA_CONFIRMED设置为true并配置其他必要参数") + except Exception as e: + logger.error(f"创建.env文件失败: {e}") + sys.exit(1) + else: + logger.error("未找到.env文件和template.env模板文件") + sys.exit(1) - shutil.copy(template_env, env_file) - logger.info("已从template/template.env创建.env文件") - logger.warning("请编辑.env文件,将EULA_CONFIRMED设置为true并配置其他必要参数") - else: - logger.error("未找到.env文件和template.env模板文件") + @staticmethod + def verify_env_file_integrity(): + """验证.env文件完整性""" + env_file = Path(".env") + if not env_file.exists(): + return False + + # 检查文件大小 + file_size = env_file.stat().st_size + if file_size == 0 or file_size > MAX_ENV_FILE_SIZE: + logger.error(f".env文件大小异常: {file_size}字节") + return False + + # 检查文件内容是否包含必要字段 + try: + content = env_file.read_text(encoding="utf-8") + if "EULA_CONFIRMED" not in content: + logger.error(".env文件缺少EULA_CONFIRMED字段") + return False + except Exception as e: + logger.error(f"读取.env文件失败: {e}") + return False + + return True + + @staticmethod + def safe_load_dotenv(): + """安全加载环境变量""" + try: + if not ConfigManager.verify_env_file_integrity(): + logger.error(".env文件完整性验证失败") + return False + + load_dotenv() + logger.info("环境变量加载成功") + return True + except Exception as e: + logger.error(f"加载环境变量失败: {e}") + return False + +class EULAManager: + """EULA管理类""" + + @staticmethod + async def check_eula(): + """检查EULA和隐私条款确认状态""" + confirm_logger = get_logger("confirm") + + if not ConfigManager.safe_load_dotenv(): + confirm_logger.error("无法加载环境变量,EULA检查失败") sys.exit(1) + eula_confirmed = os.getenv("EULA_CONFIRMED", "").lower() + if eula_confirmed == "true": + logger.info("EULA已通过环境变量确认") + return -# 确保环境文件存在 -ensure_env_file() + # 提示用户确认EULA + confirm_logger.critical("您需要同意EULA和隐私条款才能使用MoFox_Bot") + confirm_logger.critical("请阅读以下文件:") + confirm_logger.critical(" - EULA.md (用户许可协议)") + confirm_logger.critical(" - PRIVACY.md (隐私条款)") + confirm_logger.critical("然后编辑 .env 文件,将 'EULA_CONFIRMED=false' 改为 'EULA_CONFIRMED=true'") -# 加载环境变量 -load_dotenv() + attempts = 0 + while attempts < MAX_EULA_CHECK_ATTEMPTS: + try: + await asyncio.sleep(EULA_CHECK_INTERVAL) + attempts += 1 -confirm_logger = get_logger("confirm") -# 获取没有加载env时的环境变量 + # 重新加载环境变量 + ConfigManager.safe_load_dotenv() + eula_confirmed = os.getenv("EULA_CONFIRMED", "").lower() + if eula_confirmed == "true": + confirm_logger.info("EULA确认成功,感谢您的同意") + return -uvicorn_server = None -driver = None -app = None -loop = None -main_system = None + if attempts % 5 == 0: + confirm_logger.critical(f"请修改 .env 文件中的 EULA_CONFIRMED=true (尝试 {attempts}/{MAX_EULA_CHECK_ATTEMPTS})") + except KeyboardInterrupt: + confirm_logger.info("用户取消,程序退出") + sys.exit(0) + except Exception as e: + confirm_logger.error(f"检查EULA状态失败: {e}") + if attempts >= MAX_EULA_CHECK_ATTEMPTS: + confirm_logger.error("达到最大检查次数,程序退出") + sys.exit(1) -async def request_shutdown() -> bool: - """请求关闭程序""" + confirm_logger.error("EULA确认超时,程序退出") + sys.exit(1) + +class TaskManager: + """任务管理器""" + + @staticmethod + async def cancel_pending_tasks(loop, timeout=SHUTDOWN_TIMEOUT): + """取消所有待处理的任务""" + remaining_tasks = [ + t for t in asyncio.all_tasks(loop) + if t is not asyncio.current_task(loop) and not t.done() + ] + + if not remaining_tasks: + logger.info("没有待取消的任务") + return True + + logger.info(f"正在取消 {len(remaining_tasks)} 个剩余任务...") + + # 取消任务 + for task in remaining_tasks: + task.cancel() + + # 等待任务完成 + try: + results = await asyncio.wait_for( + asyncio.gather(*remaining_tasks, return_exceptions=True), + timeout=timeout + ) + + # 检查任务结果 + for i, result in enumerate(results): + if isinstance(result, Exception): + logger.warning(f"任务 {i} 取消时发生异常: {result}") + + logger.info("所有剩余任务已成功取消") + return True + except asyncio.TimeoutError: + logger.warning("等待任务取消超时,强制继续关闭") + return False + except Exception as e: + logger.error(f"等待任务取消时发生异常: {e}") + return False + + @staticmethod + async def stop_async_tasks(): + """停止所有异步任务""" + try: + from src.manager.async_task_manager import async_task_manager + await async_task_manager.stop_and_wait_all_tasks() + return True + except ImportError: + logger.warning("异步任务管理器不可用,跳过任务停止") + return False + except Exception as e: + logger.error(f"停止异步任务失败: {e}") + return False + +class ShutdownManager: + """关闭管理器""" + + @staticmethod + async def graceful_shutdown(loop=None): + """优雅关闭程序""" + try: + logger.info("正在优雅关闭麦麦...") + start_time = time.time() + + # 停止异步任务 + tasks_stopped = await TaskManager.stop_async_tasks() + + # 取消待处理任务 + tasks_cancelled = True + if loop and not loop.is_closed(): + tasks_cancelled = await TaskManager.cancel_pending_tasks(loop) + + shutdown_time = time.time() - start_time + success = tasks_stopped and tasks_cancelled + + if success: + logger.info(f"麦麦优雅关闭完成,耗时: {shutdown_time:.2f}秒") + else: + logger.warning(f"麦麦关闭完成,但部分操作未成功,耗时: {shutdown_time:.2f}秒") + + return success + + except Exception as e: + logger.error(f"麦麦关闭失败: {e}", exc_info=True) + return False + +@asynccontextmanager +async def create_event_loop_context(): + """创建事件循环的上下文管理器""" + loop = None try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + yield loop + except Exception as e: + logger.error(f"创建事件循环失败: {e}") + raise + finally: if loop and not loop.is_closed(): try: - loop.run_until_complete(graceful_shutdown(maibot.main_system)) - except Exception as ge: # 捕捉优雅关闭时可能发生的错误 - logger.error(f"优雅关闭时发生错误: {ge}") - return False - return True - except Exception as e: - logger.error(f"请求关闭程序时发生错误: {e}") + await ShutdownManager.graceful_shutdown(loop) + except Exception as e: + logger.error(f"关闭事件循环时出错: {e}") + finally: + try: + loop.close() + logger.info("事件循环已关闭") + except Exception as e: + logger.error(f"关闭事件循环失败: {e}") + +class DatabaseManager: + """数据库连接管理器""" + + def __init__(self): + self._connection = None + + async def __aenter__(self): + """异步上下文管理器入口""" + try: + from src.common.database.database import initialize_sql_database + from src.config.config import global_config + + logger.info("正在初始化数据库连接...") + start_time = time.time() + + # 使用线程执行器运行潜在的阻塞操作 + await asyncio.to_thread(initialize_sql_database, global_config.database) + elapsed_time = time.time() - start_time + logger.info(f"数据库连接初始化成功,使用 {global_config.database.database_type} 数据库,耗时: {elapsed_time:.2f}秒") + + return self + except Exception as e: + logger.error(f"数据库连接初始化失败: {e}") + raise + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """异步上下文管理器出口""" + if exc_type: + logger.error(f"数据库操作发生异常: {exc_val}") return False +class ConfigurationValidator: + """配置验证器""" -def easter_egg(): - # 彩蛋 - init() - text = "多年以后,面对AI行刑队,张三将会回想起他2023年在会议上讨论人工智能的那个下午" - rainbow_colors = [Fore.RED, Fore.YELLOW, Fore.GREEN, Fore.CYAN, Fore.BLUE, Fore.MAGENTA] - rainbow_text = "" - for i, char in enumerate(text): - rainbow_text += rainbow_colors[i % len(rainbow_colors)] + char - logger.info(rainbow_text) - - -async def graceful_shutdown(main_system_instance): - """优雅地关闭所有系统组件""" - try: - logger.info("正在优雅关闭麦麦...") - - # 停止MainSystem中的组件,它会处理服务器等 - if main_system_instance and hasattr(main_system_instance, "shutdown"): - logger.info("正在关闭MainSystem...") - await main_system_instance.shutdown() - - # 停止聊天管理器 + @staticmethod + def validate_configuration(): + """验证关键配置""" try: - from src.chat.message_receive.chat_stream import get_chat_manager - chat_manager = get_chat_manager() - if hasattr(chat_manager, "_stop_auto_save"): - logger.info("正在停止聊天管理器...") - chat_manager._stop_auto_save() + from src.config.config import global_config + + # 检查必要的配置节 + required_sections = ["database", "bot"] + for section in required_sections: + if not hasattr(global_config, section): + logger.error(f"配置中缺少{section}配置节") + return False + + # 验证数据库配置 + db_config = global_config.database + if not hasattr(db_config, "database_type") or not db_config.database_type: + logger.error("数据库配置缺少database_type字段") + return False + + if db_config.database_type not in SUPPORTED_DATABASES: + logger.error(f"不支持的数据库类型: {db_config.database_type}") + logger.info(f"支持的数据库类型: {', '.join(SUPPORTED_DATABASES)}") + return False + + logger.info("配置验证通过") + return True + + except ImportError: + logger.error("无法导入全局配置模块") + return False except Exception as e: - logger.warning(f"停止聊天管理器时出错: {e}") + logger.error(f"配置验证失败: {e}") + return False - # 停止情绪管理器 - try: - from src.mood.mood_manager import mood_manager - if hasattr(mood_manager, "stop"): - logger.info("正在停止情绪管理器...") - await mood_manager.stop() - except Exception as e: - logger.warning(f"停止情绪管理器时出错: {e}") +class EasterEgg: + """彩蛋功能""" - # 停止记忆系统 - try: - from src.chat.memory_system.memory_manager import memory_manager - if hasattr(memory_manager, "shutdown"): - logger.info("正在停止记忆系统...") - await memory_manager.shutdown() - except Exception as e: - logger.warning(f"停止记忆系统时出错: {e}") + _initialized = False + @classmethod + def show(cls): + """显示彩色文本""" + if not cls._initialized: + init() + cls._initialized = True - # 停止所有异步任务 - try: - await async_task_manager.stop_and_wait_all_tasks() - except Exception as e: - logger.warning(f"停止异步任务管理器时出错: {e}") + text = "多年以后,面对AI行刑队,张三将会回想起他2023年在会议上讨论人工智能的那个下午" + rainbow_colors = [Fore.RED, Fore.YELLOW, Fore.GREEN, Fore.CYAN, Fore.BLUE, Fore.MAGENTA] + rainbow_text = "" + for i, char in enumerate(text): + rainbow_text += rainbow_colors[i % len(rainbow_colors)] + char + logger.info(rainbow_text) - # 获取所有剩余任务,排除当前任务 - remaining_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] - - if remaining_tasks: - logger.info(f"正在取消 {len(remaining_tasks)} 个剩余任务...") - - # 取消所有剩余任务 - for task in remaining_tasks: - if not task.done(): - task.cancel() - - # 等待所有任务完成,设置超时 - try: - await asyncio.wait_for(asyncio.gather(*remaining_tasks, return_exceptions=True), timeout=15.0) - logger.info("所有剩余任务已成功取消") - except asyncio.TimeoutError: - logger.warning("等待任务取消超时,强制继续关闭") - except Exception as e: - logger.error(f"等待任务取消时发生异常: {e}") - - logger.info("麦麦优雅关闭完成") - - # 关闭日志系统,释放文件句柄 - shutdown_logging() - - # 尝试停止事件循环 - try: - loop = asyncio.get_running_loop() - if loop.is_running(): - loop.stop() - logger.info("事件循环已请求停止") - except RuntimeError: - pass # 没有正在运行的事件循环 - - except Exception as e: - logger.error(f"麦麦关闭失败: {e}", exc_info=True) - - -def check_eula(): - """检查EULA和隐私条款确认状态 - 环境变量版(类似Minecraft)""" - # 检查环境变量中的EULA确认 - eula_confirmed = os.getenv("EULA_CONFIRMED", "").lower() - - if eula_confirmed == "true": - logger.info("EULA已通过环境变量确认") - return - - # 如果没有确认,提示用户 - confirm_logger.critical("您需要同意EULA和隐私条款才能使用MoFox_Bot") - confirm_logger.critical("请阅读以下文件:") - confirm_logger.critical(" - EULA.md (用户许可协议)") - confirm_logger.critical(" - PRIVACY.md (隐私条款)") - confirm_logger.critical("然后编辑 .env 文件,将 'EULA_CONFIRMED=false' 改为 'EULA_CONFIRMED=true'") - - # 等待用户确认 - while True: - try: - load_dotenv(override=True) # 重新加载.env文件 - - eula_confirmed = os.getenv("EULA_CONFIRMED", "").lower() - if eula_confirmed == "true": - confirm_logger.info("EULA确认成功,感谢您的同意") - return - - confirm_logger.critical("请修改 .env 文件中的 EULA_CONFIRMED=true 后重新启动程序") - input("按Enter键检查.env文件状态...") - - except KeyboardInterrupt: - confirm_logger.info("用户取消,程序退出") - sys.exit(0) - except Exception as e: - confirm_logger.error(f"检查EULA状态失败: {e}") - sys.exit(1) - - -class MaiBotMain(BaseMain): +class MaiBotMain: """麦麦机器人主程序类""" def __init__(self): - super().__init__() self.main_system = None def setup_timezone(self): """设置时区""" - if platform.system().lower() != "windows": - time.tzset() # type: ignore - - def check_and_confirm_eula(self): - """检查并确认EULA和隐私条款""" - check_eula() - logger.info("检查EULA和隐私条款完成") + try: + if platform.system().lower() != "windows": + time.tzset() + logger.info("时区设置完成") + else: + logger.info("Windows系统,跳过时区设置") + except Exception as e: + logger.warning(f"时区设置失败: {e}") async def initialize_database(self): - """初始化数据库""" - - logger.info("正在初始化数据库连接...") - try: - await initialize_sql_database(global_config.database) - logger.info(f"数据库连接初始化成功,使用 {global_config.database.database_type} 数据库") - except Exception as e: - logger.error(f"数据库连接初始化失败: {e}") - raise e + """初始化数据库连接""" + async with DatabaseManager(): + pass async def initialize_database_async(self): """异步初始化数据库表结构""" logger.info("正在初始化数据库表结构...") try: + start_time = time.time() + from src.common.database.sqlalchemy_models import initialize_database as init_db await init_db() - logger.info("数据库表结构初始化完成") + elapsed_time = time.time() - start_time + logger.info(f"数据库表结构初始化完成,耗时: {elapsed_time:.2f}秒") except Exception as e: logger.error(f"数据库表结构初始化失败: {e}") - raise e + raise def create_main_system(self): """创建MainSystem实例""" + from src.main import MainSystem self.main_system = MainSystem() return self.main_system - async def run(self): - """运行主程序""" + async def run_sync_init(self): + """执行同步初始化步骤""" self.setup_timezone() - self.check_and_confirm_eula() - await self.initialize_database() + await EULAManager.check_eula() + + if not ConfigurationValidator.validate_configuration(): + raise RuntimeError("配置验证失败,请检查配置文件") return self.create_main_system() + async def run_async_init(self, main_system): + """执行异步初始化步骤""" + # 初始化数据库连接 + await self.initialize_database() -if __name__ == "__main__": - exit_code = 0 # 用于记录程序最终的退出状态 + # 初始化数据库表结构 + await self.initialize_database_async() + + # 初始化主系统 + await main_system.initialize() + + # 初始化知识库 + from src.chat.knowledge.knowledge_lib import initialize_lpmm_knowledge + initialize_lpmm_knowledge() + + # 显示彩蛋 + EasterEgg.show() + +async def wait_for_user_input(): + """等待用户输入(异步方式)""" try: - # 创建MaiBotMain实例并获取MainSystem - maibot = MaiBotMain() + # 在非生产环境下,使用异步方式等待输入 + if os.getenv("ENVIRONMENT") != "production": + logger.info("程序执行完成,按 Ctrl+C 退出...") + # 简单的异步等待,避免阻塞事件循环 + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + logger.info("用户中断程序") + return True + except Exception as e: + logger.error(f"等待用户输入时发生错误: {e}") + return False - # 创建事件循环 - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) +async def main_async(): + """主异步函数""" + exit_code = 0 + main_task = None + async with create_event_loop_context() as loop: try: - # 异步初始化数据库和表结构 - main_system = loop.run_until_complete(maibot.run()) - loop.run_until_complete(maibot.initialize_database_async()) - # 执行初始化和任务调度 - loop.run_until_complete(main_system.initialize()) - initialize_lpmm_knowledge() - # Schedule tasks returns a future that runs forever. - # We can run console_input_loop concurrently. - main_tasks = loop.create_task(main_system.schedule_tasks()) - loop.run_until_complete(main_tasks) + # 确保环境文件存在 + ConfigManager.ensure_env_file() + + # 创建主程序实例并执行初始化 + maibot = MaiBotMain() + main_system = await maibot.run_sync_init() + await maibot.run_async_init(main_system) + + # 运行主任务 + main_task = asyncio.create_task(main_system.schedule_tasks()) + logger.info("麦麦机器人启动完成,开始运行主任务...") + + # 同时运行主任务和用户输入等待 + user_input_done = asyncio.create_task(wait_for_user_input()) + + # 使用wait等待任意一个任务完成 + done, pending = await asyncio.wait( + [main_task, user_input_done], + return_when=asyncio.FIRST_COMPLETED + ) + + # 如果用户输入任务完成(用户按了Ctrl+C),取消主任务 + if user_input_done in done and main_task not in done: + logger.info("用户请求退出,正在取消主任务...") + main_task.cancel() + try: + await main_task + except asyncio.CancelledError: + logger.info("主任务已取消") + except Exception as e: + logger.error(f"主任务取消时发生错误: {e}") except KeyboardInterrupt: logger.warning("收到中断信号,正在优雅关闭...") - # The actual shutdown logic is now in the finally block. + if main_task and not main_task.done(): + main_task.cancel() + except Exception as e: + logger.error(f"主程序发生异常: {e}") + logger.debug(f"异常详情: {traceback.format_exc()}") + exit_code = 1 + return exit_code + +if __name__ == "__main__": + exit_code = 0 + try: + exit_code = asyncio.run(main_async()) + except KeyboardInterrupt: + logger.info("程序被用户中断") + exit_code = 130 except Exception as e: - logger.error(f"主程序发生异常: {e!s} {traceback.format_exc()!s}") - exit_code = 1 # 标记发生错误 + logger.error(f"程序启动失败: {e}") + exit_code = 1 finally: - # 确保 loop 在任何情况下都尝试关闭(如果存在且未关闭) - if "loop" in locals() and loop and not loop.is_closed(): - logger.info("开始执行最终关闭流程...") - try: - # 传递main_system实例 - loop.run_until_complete(graceful_shutdown(maibot.main_system)) - except Exception as ge: - logger.error(f"优雅关闭时发生错误: {ge}") - loop.close() - logger.info("事件循环已关闭") + # 确保日志系统正确关闭 + try: + shutdown_logging() + except Exception as e: + print(f"关闭日志系统时出错: {e}") - # 在程序退出前暂停,让你有机会看到输出 - # input("按 Enter 键退出...") # <--- 添加这行 - sys.exit(exit_code) # <--- 使用记录的退出码 + sys.exit(exit_code) diff --git a/src/chat/memory_system/__init__.py b/src/chat/memory_system/__init__.py index 962389b15..94d11c6ef 100644 --- a/src/chat/memory_system/__init__.py +++ b/src/chat/memory_system/__init__.py @@ -21,6 +21,7 @@ from .memory_chunk import MemoryChunk as Memory # 遗忘引擎 from .memory_forgetting_engine import ForgettingConfig, MemoryForgettingEngine, get_memory_forgetting_engine +from .memory_formatter import format_memories_bracket_style # 记忆管理器 from .memory_manager import MemoryManager, MemoryResult, memory_manager @@ -30,7 +31,6 @@ from .memory_system import MemorySystem, MemorySystemConfig, get_memory_system, # Vector DB存储系统 from .vector_memory_storage_v2 import VectorMemoryStorage, VectorStorageConfig, get_vector_memory_storage -from .memory_formatter import format_memories_bracket_style __all__ = [ # 核心数据结构 diff --git a/src/chat/memory_system/memory_formatter.py b/src/chat/memory_system/memory_formatter.py index 5e5f100f7..ecf7992c8 100644 --- a/src/chat/memory_system/memory_formatter.py +++ b/src/chat/memory_system/memory_formatter.py @@ -17,8 +17,9 @@ """ from __future__ import annotations -from typing import Any, Iterable import time +from collections.abc import Iterable +from typing import Any def _format_timestamp(ts: Any) -> str: diff --git a/src/chat/memory_system/memory_metadata_index.py b/src/chat/memory_system/memory_metadata_index.py index eff666b2c..4b92c410a 100644 --- a/src/chat/memory_system/memory_metadata_index.py +++ b/src/chat/memory_system/memory_metadata_index.py @@ -2,9 +2,8 @@ 记忆元数据索引。 """ -from dataclasses import dataclass, asdict +from dataclasses import asdict, dataclass from typing import Any -from time import time from src.common.logger import get_logger @@ -12,6 +11,7 @@ logger = get_logger(__name__) from inkfox.memory import PyMetadataIndex as _RustIndex # type: ignore + @dataclass class MemoryMetadataIndexEntry: memory_id: str @@ -51,7 +51,7 @@ class MemoryMetadataIndex: if payload: try: self._rust.batch_add(payload) - except Exception as ex: # noqa: BLE001 + except Exception as ex: logger.error(f"Rust 元数据批量添加失败: {ex}") def add_or_update(self, entry: MemoryMetadataIndexEntry): @@ -88,7 +88,7 @@ class MemoryMetadataIndex: if flexible_mode: return list(self._rust.search_flexible(params)) return list(self._rust.search_strict(params)) - except Exception as ex: # noqa: BLE001 + except Exception as ex: logger.error(f"Rust 搜索失败返回空: {ex}") return [] @@ -105,18 +105,18 @@ class MemoryMetadataIndex: "keywords_count": raw.get("keywords_indexed", 0), "tags_count": raw.get("tags_indexed", 0), } - except Exception as ex: # noqa: BLE001 + except Exception as ex: logger.warning(f"读取 Rust stats 失败: {ex}") return {"total_memories": 0} def save(self): # 仅调用 rust save try: self._rust.save() - except Exception as ex: # noqa: BLE001 + except Exception as ex: logger.warning(f"Rust save 失败: {ex}") __all__ = [ - "MemoryMetadataIndexEntry", "MemoryMetadataIndex", + "MemoryMetadataIndexEntry", ] diff --git a/src/chat/message_receive/message.py b/src/chat/message_receive/message.py index 86c32ea94..068c39f0d 100644 --- a/src/chat/message_receive/message.py +++ b/src/chat/message_receive/message.py @@ -263,7 +263,7 @@ class MessageRecv(Message): logger.warning("视频消息中没有base64数据") return "[收到视频消息,但数据异常]" except Exception as e: - logger.error(f"视频处理失败: {str(e)}") + logger.error(f"视频处理失败: {e!s}") import traceback logger.error(f"错误详情: {traceback.format_exc()}") @@ -277,7 +277,7 @@ class MessageRecv(Message): logger.info("未启用视频识别") return "[视频]" except Exception as e: - logger.error(f"处理消息段失败: {str(e)}, 类型: {segment.type}, 数据: {segment.data}") + logger.error(f"处理消息段失败: {e!s}, 类型: {segment.type}, 数据: {segment.data}") return f"[处理失败的{segment.type}消息]" diff --git a/src/chat/utils/utils_video.py b/src/chat/utils/utils_video.py index fe14e54c5..3e989c8ab 100644 --- a/src/chat/utils/utils_video.py +++ b/src/chat/utils/utils_video.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -# -*- coding: utf-8 -*- """纯 inkfox 视频关键帧分析工具 仅依赖 `inkfox.video` 提供的 Rust 扩展能力: @@ -14,25 +13,25 @@ from __future__ import annotations -import os -import io import asyncio import base64 -import tempfile -from pathlib import Path -from typing import List, Tuple, Optional, Dict, Any import hashlib +import io +import os +import tempfile import time +from pathlib import Path +from typing import Any from PIL import Image +from src.common.database.sqlalchemy_models import Videos, get_db_session # type: ignore from src.common.logger import get_logger from src.config.config import global_config, model_config from src.llm_models.utils_model import LLMRequest -from src.common.database.sqlalchemy_models import Videos, get_db_session # type: ignore # 简易并发控制:同一 hash 只处理一次 -_video_locks: Dict[str, asyncio.Lock] = {} +_video_locks: dict[str, asyncio.Lock] = {} _locks_guard = asyncio.Lock() logger = get_logger("utils_video") @@ -90,7 +89,7 @@ class VideoAnalyzer: logger.debug(f"获取系统信息失败: {e}") # ---- 关键帧提取 ---- - async def extract_keyframes(self, video_path: str) -> List[Tuple[str, float]]: + async def extract_keyframes(self, video_path: str) -> list[tuple[str, float]]: """提取关键帧并返回 (base64, timestamp_seconds) 列表""" with tempfile.TemporaryDirectory() as tmp: result = video.extract_keyframes_from_video( # type: ignore[attr-defined] @@ -105,7 +104,7 @@ class VideoAnalyzer: ) files = sorted(Path(tmp).glob("keyframe_*.jpg"))[: self.max_frames] total_ms = getattr(result, "total_time_ms", 0) - frames: List[Tuple[str, float]] = [] + frames: list[tuple[str, float]] = [] for i, f in enumerate(files): img = Image.open(f).convert("RGB") if max(img.size) > self.max_image_size: @@ -119,7 +118,7 @@ class VideoAnalyzer: return frames # ---- 批量分析 ---- - async def _analyze_batch(self, frames: List[Tuple[str, float]], question: Optional[str]) -> str: + async def _analyze_batch(self, frames: list[tuple[str, float]], question: str | None) -> str: from src.llm_models.payload_content.message import MessageBuilder, RoleType from src.llm_models.utils_model import RequestType prompt = self.batch_analysis_prompt.format( @@ -149,8 +148,8 @@ class VideoAnalyzer: return resp.content or "❌ 未获得响应" # ---- 逐帧分析 ---- - async def _analyze_sequential(self, frames: List[Tuple[str, float]], question: Optional[str]) -> str: - results: List[str] = [] + async def _analyze_sequential(self, frames: list[tuple[str, float]], question: str | None) -> str: + results: list[str] = [] for i, (b64, ts) in enumerate(frames): prompt = f"分析第{i+1}帧" + (f" (时间: {ts:.2f}s)" if self.enable_frame_timing else "") if question: @@ -174,7 +173,7 @@ class VideoAnalyzer: return "\n".join(results) # ---- 主入口 ---- - async def analyze_video(self, video_path: str, question: Optional[str] = None) -> Tuple[bool, str]: + async def analyze_video(self, video_path: str, question: str | None = None) -> tuple[bool, str]: if not os.path.exists(video_path): return False, "❌ 文件不存在" frames = await self.extract_keyframes(video_path) @@ -189,10 +188,10 @@ class VideoAnalyzer: async def analyze_video_from_bytes( self, video_bytes: bytes, - filename: Optional[str] = None, - prompt: Optional[str] = None, - question: Optional[str] = None, - ) -> Dict[str, str]: + filename: str | None = None, + prompt: str | None = None, + question: str | None = None, + ) -> dict[str, str]: """从内存字节分析视频,兼容旧调用 (prompt / question 二选一) 返回 {"summary": str}.""" if not video_bytes: return {"summary": "❌ 空视频数据"} @@ -271,7 +270,7 @@ class VideoAnalyzer: # ---- 外部接口 ---- -_INSTANCE: Optional[VideoAnalyzer] = None +_INSTANCE: VideoAnalyzer | None = None def get_video_analyzer() -> VideoAnalyzer: @@ -285,7 +284,7 @@ def is_video_analysis_available() -> bool: return True -def get_video_analysis_status() -> Dict[str, Any]: +def get_video_analysis_status() -> dict[str, Any]: try: info = video.get_system_info() # type: ignore[attr-defined] except Exception as e: # pragma: no cover @@ -297,4 +296,4 @@ def get_video_analysis_status() -> Dict[str, Any]: "modes": ["auto", "batch", "sequential"], "max_frames_default": inst.max_frames, "implementation": "inkfox", - } \ No newline at end of file + } diff --git a/src/common/data_models/message_manager_data_model.py b/src/common/data_models/message_manager_data_model.py index fd0c19055..164ce4d2d 100644 --- a/src/common/data_models/message_manager_data_model.py +++ b/src/common/data_models/message_manager_data_model.py @@ -53,8 +53,8 @@ class StreamContext(BaseDataModel): priority_mode: str | None = None priority_info: dict | None = None - - + + def add_action_to_message(self, message_id: str, action: str): """ 向指定消息添加执行的动作 diff --git a/src/llm_models/model_client/mcp_sse_client.py b/src/llm_models/model_client/mcp_sse_client.py index ec4502dbb..91e58cde2 100644 --- a/src/llm_models/model_client/mcp_sse_client.py +++ b/src/llm_models/model_client/mcp_sse_client.py @@ -5,7 +5,6 @@ MCP (Model Context Protocol) SSE (Server-Sent Events) 客户端实现 import asyncio import io -import json from collections.abc import Callable from typing import Any @@ -20,7 +19,6 @@ from ..exceptions import ( NetworkConnectionError, ReqAbortException, RespNotOkException, - RespParseException, ) from ..payload_content.message import Message, RoleType from ..payload_content.resp_format import RespFormat diff --git a/src/main.py b/src/main.py index fae1ce5f4..506dfc84c 100644 --- a/src/main.py +++ b/src/main.py @@ -5,21 +5,18 @@ import sys import time import traceback from functools import partial +from random import choices from typing import Any from maim_message import MessageServer from rich.traceback import install from src.chat.emoji_system.emoji_manager import get_emoji_manager - -# 导入增强记忆系统管理器 from src.chat.memory_system.memory_manager import memory_manager from src.chat.message_receive.bot import chat_bot from src.chat.message_receive.chat_stream import get_chat_manager from src.chat.utils.statistic import OnlineTimeRecordTask, StatisticOutputTask from src.common.logger import get_logger - -# 导入消息API和traceback模块 from src.common.message import get_global_api from src.common.remote import TelemetryHeartBeatTask from src.common.server import Server, get_global_server @@ -29,21 +26,34 @@ from src.manager.async_task_manager import async_task_manager from src.mood.mood_manager import mood_manager from src.plugin_system.base.component_types import EventType from src.plugin_system.core.event_manager import event_manager - -# from src.api.main import start_api_server -# 导入新的插件管理器 from src.plugin_system.core.plugin_manager import plugin_manager from src.schedule.monthly_plan_manager import monthly_plan_manager from src.schedule.schedule_manager import schedule_manager # 插件系统现在使用统一的插件加载器 - install(extra_lines=3) logger = get_logger("main") +# 预定义彩蛋短语,避免在每次初始化时重新创建 +EGG_PHRASES: list[tuple[str, int]] = [ + ("我们的代码里真的没有bug,只有'特性'。", 10), + ("你知道吗?阿范喜欢被切成臊子😡", 10), + ("你知道吗,雅诺狐的耳朵其实很好摸", 5), + ("你群最高技术力————言柒姐姐!", 20), + ("初墨小姐宇宙第一(不是)", 10), + ("world.execute(me);", 10), + ("正在尝试连接到MaiBot的服务器...连接失败...,正在转接到maimaiDX", 10), + ("你的bug就像星星一样多,而我的代码像太阳一样,一出来就看不见了。", 10), + ("温馨提示:请不要在代码中留下任何魔法数字,除非你知道它的含义。", 10), + ("世界上只有10种人:懂二进制的和不懂的。", 10), + ("喵喵~你的麦麦被猫娘入侵了喵~", 15), + ("恭喜你触发了稀有彩蛋喵:诺狐嗷呜~ ~", 1), + ("恭喜你!!!你的开发者模式已成功开启,快来加入我们吧!(๑•̀ㅂ•́)و✧ (小声bb:其实是当黑奴)", 10), +] -def _task_done_callback(task: asyncio.Task, message_id: str, start_time: float): + +def _task_done_callback(task: asyncio.Task, message_id: str, start_time: float) -> None: """后台任务完成时的回调函数""" end_time = time.time() duration = end_time - start_time @@ -58,10 +68,11 @@ def _task_done_callback(task: asyncio.Task, message_id: str, start_time: float): class MainSystem: - def __init__(self): + """主系统类,负责协调所有组件""" + + def __init__(self) -> None: # 使用增强记忆系统 self.memory_manager = memory_manager - self.individuality: Individuality = get_individuality() # 使用消息API替代直接的FastAPI实例 @@ -69,15 +80,21 @@ class MainSystem: self.server: Server = get_global_server() # 设置信号处理器用于优雅退出 + self._shutting_down = False self._setup_signal_handlers() - def _setup_signal_handlers(self): + # 存储清理任务的引用 + self._cleanup_tasks: list[asyncio.Task] = [] + + def _setup_signal_handlers(self) -> None: """设置信号处理器""" - def signal_handler(signum, frame): - logger.info("收到退出信号,正在优雅关闭系统...") + if self._shutting_down: + logger.warning("系统已经在关闭过程中,忽略重复信号") + return - import asyncio + self._shutting_down = True + logger.info("收到退出信号,正在优雅关闭系统...") try: loop = asyncio.get_event_loop() @@ -85,11 +102,15 @@ class MainSystem: # 如果事件循环正在运行,创建任务并设置回调 async def cleanup_and_exit(): await self._async_cleanup() + # 给日志系统一点时间刷新 + await asyncio.sleep(0.1) sys.exit(0) task = asyncio.create_task(cleanup_and_exit()) + # 存储清理任务引用 + self._cleanup_tasks.append(task) # 添加任务完成回调,确保程序退出 - task.add_done_callback(lambda t: None) + task.add_done_callback(lambda t: sys.exit(0) if not t.cancelled() else None) else: # 如果事件循环未运行,使用同步清理 self._cleanup() @@ -101,7 +122,7 @@ class MainSystem: signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - async def _initialize_interest_calculator(self): + async def _initialize_interest_calculator(self) -> None: """初始化兴趣值计算组件 - 通过插件系统自动发现和加载""" try: logger.info("开始自动发现兴趣值计算组件...") @@ -120,21 +141,14 @@ class MainSystem: logger.warning("未发现任何兴趣计算器组件") return - logger.info("发现的兴趣计算器组件:") - for calc_name, calc_info in interest_calculators.items(): - enabled = getattr(calc_info, "enabled", True) - default_enabled = getattr(calc_info, "enabled_by_default", True) - logger.info(f" - {calc_name}: 启用: {enabled}, 默认启用: {default_enabled}") - # 初始化兴趣度管理器 from src.chat.interest_system.interest_manager import get_interest_manager interest_manager = get_interest_manager() await interest_manager.initialize() - # 尝试注册计算器(单例模式,只注册第一个可用的) - registered_calculator = None + # 尝试注册所有可用的计算器 + registered_calculators = [] - # 使用组件注册表获取组件类并注册 for calc_name, calc_info in interest_calculators.items(): enabled = getattr(calc_info, "enabled", True) default_enabled = getattr(calc_info, "enabled_by_default", True) @@ -147,140 +161,188 @@ class MainSystem: from src.plugin_system.core.component_registry import component_registry component_class = component_registry.get_component_class(calc_name, ComponentType.INTEREST_CALCULATOR) - if component_class: - logger.info(f"成功获取 {calc_name} 的组件类: {component_class.__name__}") - - # 创建组件实例 - calculator_instance = component_class() - logger.info(f"成功创建兴趣计算器实例: {calc_name}") - - # 初始化组件 - if await calculator_instance.initialize(): - # 注册到兴趣管理器 - success = await interest_manager.register_calculator(calculator_instance) - if success: - registered_calculator = calculator_instance - logger.info(f"成功注册兴趣计算器: {calc_name}") - break # 只注册一个成功的计算器 - else: - logger.error(f"兴趣计算器 {calc_name} 注册失败") - else: - logger.error(f"兴趣计算器 {calc_name} 初始化失败") - else: + if not component_class: logger.warning(f"无法找到 {calc_name} 的组件类") + continue + + logger.info(f"成功获取 {calc_name} 的组件类: {component_class.__name__}") + + # 创建组件实例 + calculator_instance = component_class() + + # 初始化组件 + if not await calculator_instance.initialize(): + logger.error(f"兴趣计算器 {calc_name} 初始化失败") + continue + + # 注册到兴趣管理器 + if await interest_manager.register_calculator(calculator_instance): + registered_calculators.append(calculator_instance) + logger.info(f"成功注册兴趣计算器: {calc_name}") + else: + logger.error(f"兴趣计算器 {calc_name} 注册失败") except Exception as e: logger.error(f"处理兴趣计算器 {calc_name} 时出错: {e}", exc_info=True) - if registered_calculator: - logger.info(f"当前活跃的兴趣度计算器: {registered_calculator.component_name} v{registered_calculator.component_version}") + if registered_calculators: + logger.info(f"成功注册了 {len(registered_calculators)} 个兴趣计算器") + for calc in registered_calculators: + logger.info(f" - {calc.component_name} v{calc.component_version}") else: logger.error("未能成功注册任何兴趣计算器") except Exception as e: logger.error(f"初始化兴趣度计算器失败: {e}", exc_info=True) - async def _async_cleanup(self): + async def _async_cleanup(self) -> None: """异步清理资源""" + if self._shutting_down: + return + + self._shutting_down = True + logger.info("开始系统清理流程...") + + cleanup_tasks = [] + + # 停止数据库服务 try: - - # 停止数据库服务 - try: - from src.common.database.database import stop_database - await stop_database() - logger.info("🛑 数据库服务已停止") - except Exception as e: - logger.error(f"停止数据库服务时出错: {e}") - - # 停止消息管理器 - try: - from src.chat.message_manager import message_manager - await message_manager.stop() - logger.info("🛑 消息管理器已停止") - except Exception as e: - logger.error(f"停止消息管理器时出错: {e}") - - # 停止消息重组器 - try: - from src.plugin_system import EventType - from src.plugin_system.core.event_manager import event_manager - from src.utils.message_chunker import reassembler - - await event_manager.trigger_event(EventType.ON_STOP, permission_group="SYSTEM") - await reassembler.stop_cleanup_task() - logger.info("🛑 消息重组器已停止") - except Exception as e: - logger.error(f"停止消息重组器时出错: {e}") - - # 停止增强记忆系统 - try: - if global_config.memory.enable_memory: - await self.memory_manager.shutdown() - logger.info("🛑 增强记忆系统已停止") - except Exception as e: - logger.error(f"停止增强记忆系统时出错: {e}") - + from src.common.database.database import stop_database + cleanup_tasks.append(("数据库服务", stop_database())) except Exception as e: - logger.error(f"异步清理资源时出错: {e}") + logger.error(f"准备停止数据库服务时出错: {e}") - def _cleanup(self): + # 停止消息管理器 + try: + from src.chat.message_manager import message_manager + cleanup_tasks.append(("消息管理器", message_manager.stop())) + except Exception as e: + logger.error(f"准备停止消息管理器时出错: {e}") + + # 停止消息重组器 + try: + from src.utils.message_chunker import reassembler + cleanup_tasks.append(("消息重组器", reassembler.stop_cleanup_task())) + except Exception as e: + logger.error(f"准备停止消息重组器时出错: {e}") + + # 停止增强记忆系统 + try: + if global_config.memory.enable_memory: + cleanup_tasks.append(("增强记忆系统", self.memory_manager.shutdown())) + except Exception as e: + logger.error(f"准备停止增强记忆系统时出错: {e}") + + # 触发停止事件 + try: + from src.plugin_system.core.event_manager import event_manager + cleanup_tasks.append(("插件系统停止事件", + event_manager.trigger_event(EventType.ON_STOP, permission_group="SYSTEM"))) + except Exception as e: + logger.error(f"准备触发停止事件时出错: {e}") + + # 停止表情管理器 + try: + cleanup_tasks.append(("表情管理器", + asyncio.get_event_loop().run_in_executor(None, get_emoji_manager().shutdown))) + except Exception as e: + logger.error(f"准备停止表情管理器时出错: {e}") + + # 停止服务器 + try: + if self.server: + cleanup_tasks.append(("服务器", self.server.shutdown())) + except Exception as e: + logger.error(f"准备停止服务器时出错: {e}") + + # 停止应用 + try: + if self.app: + if hasattr(self.app, "shutdown"): + cleanup_tasks.append(("应用", self.app.shutdown())) + elif hasattr(self.app, "stop"): + cleanup_tasks.append(("应用", self.app.stop())) + except Exception as e: + logger.error(f"准备停止应用时出错: {e}") + + # 并行执行所有清理任务 + if cleanup_tasks: + logger.info(f"开始并行执行 {len(cleanup_tasks)} 个清理任务...") + tasks = [task for _, task in cleanup_tasks] + task_names = [name for name, _ in cleanup_tasks] + + # 使用asyncio.gather并行执行,设置超时防止卡死 + try: + results = await asyncio.wait_for( + asyncio.gather(*tasks, return_exceptions=True), + timeout=30.0 # 30秒超时 + ) + + # 记录结果 + for i, (name, result) in enumerate(zip(task_names, results)): + if isinstance(result, Exception): + logger.error(f"停止 {name} 时出错: {result}") + else: + logger.info(f"🛑 {name} 已停止") + + except asyncio.TimeoutError: + logger.error("清理任务超时,强制退出") + except Exception as e: + logger.error(f"执行清理任务时发生错误: {e}") + else: + logger.warning("没有需要清理的任务") + + def _cleanup(self) -> None: """同步清理资源(向后兼容)""" - import asyncio - try: loop = asyncio.get_event_loop() if loop.is_running(): # 如果循环正在运行,创建异步清理任务 - asyncio.create_task(self._async_cleanup()) + task = asyncio.create_task(self._async_cleanup()) + self._cleanup_tasks.append(task) else: # 如果循环未运行,直接运行异步清理 loop.run_until_complete(self._async_cleanup()) except Exception as e: logger.error(f"同步清理资源时出错: {e}") - async def _message_process_wrapper(self, message_data: dict[str, Any]): + async def _message_process_wrapper(self, message_data: dict[str, Any]) -> None: """并行处理消息的包装器""" try: start_time = time.time() message_id = message_data.get("message_info", {}).get("message_id", "UNKNOWN") + + # 检查系统是否正在关闭 + if self._shutting_down: + logger.warning(f"系统正在关闭,拒绝处理消息 {message_id}") + return + # 创建后台任务 task = asyncio.create_task(chat_bot.message_process(message_data)) logger.debug(f"已为消息 {message_id} 创建后台处理任务 (ID: {id(task)})") + # 添加一个回调函数,当任务完成时,它会被调用 task.add_done_callback(partial(_task_done_callback, message_id=message_id, start_time=start_time)) except Exception: logger.error("在创建消息处理任务时发生严重错误:") logger.error(traceback.format_exc()) - async def initialize(self): + async def initialize(self) -> None: """初始化系统组件""" + # 检查必要的配置 + if not hasattr(global_config, "bot") or not hasattr(global_config.bot, "nickname"): + logger.error("缺少必要的bot配置") + raise ValueError("Bot配置不完整") + logger.info(f"正在唤醒{global_config.bot.nickname}......") - # 其他初始化任务 - await asyncio.gather(self._init_components()) - phrases = [ - ("我们的代码里真的没有bug,只有‘特性’.", 10), - ("你知道吗?阿范喜欢被切成臊子😡", 10), # 你加的提示出语法问题来了😡😡😡😡😡😡😡 - ("你知道吗,雅诺狐的耳朵其实很好摸", 5), - ("你群最高技术力————言柒姐姐!", 20), - ("初墨小姐宇宙第一(不是)", 10), # 15 - ("world.execute(me);", 10), - ("正在尝试连接到MaiBot的服务器...连接失败...,正在转接到maimaiDX", 10), - ("你的bug就像星星一样多,而我的代码像太阳一样,一出来就看不见了。", 10), - ("温馨提示:请不要在代码中留下任何魔法数字,除非你知道它的含义。", 10), - ("世界上只有10种人:懂二进制的和不懂的。", 10), - ("喵喵~你的麦麦被猫娘入侵了喵~", 15), - ("恭喜你触发了稀有彩蛋喵:诺狐嗷呜~ ~", 1), - ("恭喜你!!!你的开发者模式已成功开启,快来加入我们吧!(๑•̀ㅂ•́)و✧ (小声bb:其实是当黑奴)", 10), - ] - from random import choices + # 初始化组件 + await self._init_components() - # 分离彩蛋和权重 - egg_texts, weights = zip(*phrases, strict=True) + # 随机选择彩蛋 + egg_texts, weights = zip(*EGG_PHRASES) + selected_egg = choices(egg_texts, weights=weights, k=1)[0] - # 使用choices进行带权重的随机选择 - selected_egg = choices(egg_texts, weights=weights, k=1) - eggs = selected_egg[0] logger.info(f""" 全部系统初始化完成,{global_config.bot.nickname}已成功唤醒 ========================================================= @@ -292,54 +354,52 @@ MoFox_Bot(第三方修改版) ========================================================= 这是基于原版MMC的社区改版,包含增强功能和优化(同时也有更多的'特性') ========================================================= -小贴士:{eggs} +小贴士:{selected_egg} """) - async def _init_components(self): + async def _init_components(self) -> None: """初始化其他组件""" init_start_time = time.time() - # 添加在线时间统计任务 - await async_task_manager.add_task(OnlineTimeRecordTask()) + # 并行初始化基础组件 + base_init_tasks = [ + async_task_manager.add_task(OnlineTimeRecordTask()), + async_task_manager.add_task(StatisticOutputTask()), + async_task_manager.add_task(TelemetryHeartBeatTask()), + ] - # 添加统计信息输出任务 - await async_task_manager.add_task(StatisticOutputTask()) - - # 添加遥测心跳任务 - await async_task_manager.add_task(TelemetryHeartBeatTask()) + await asyncio.gather(*base_init_tasks, return_exceptions=True) + logger.info("基础定时任务初始化成功") # 注册默认事件 event_manager.init_default_events() # 初始化权限管理器 - from src.plugin_system.apis.permission_api import permission_api - from src.plugin_system.core.permission_manager import PermissionManager + try: + from src.plugin_system.apis.permission_api import permission_api + from src.plugin_system.core.permission_manager import PermissionManager - permission_manager = PermissionManager() - await permission_manager.initialize() - permission_api.set_permission_manager(permission_manager) - logger.info("权限管理器初始化成功") - - # 启动API服务器 - # start_api_server() - # logger.info("API服务器启动成功") + permission_manager = PermissionManager() + await permission_manager.initialize() + permission_api.set_permission_manager(permission_manager) + logger.info("权限管理器初始化成功") + except Exception as e: + logger.error(f"权限管理器初始化失败: {e}") # 注册API路由 try: from src.api.message_router import router as message_router self.server.register_router(message_router, prefix="/api") logger.info("API路由注册成功") - except ImportError as e: - logger.error(f"导入API路由失败: {e}") except Exception as e: - logger.error(f"注册API路由时发生错误: {e}") + logger.error(f"注册API路由失败: {e}") - # 加载所有actions,包括默认的和插件的 + # 加载所有插件 plugin_manager.load_all_plugins() # 处理所有缓存的事件订阅(插件加载完成后) event_manager.process_all_pending_subscriptions() - + # 初始化MCP工具提供器 try: mcp_config = global_config.get("mcp_servers", []) @@ -350,74 +410,83 @@ MoFox_Bot(第三方修改版) except Exception as e: logger.info(f"MCP工具提供器未配置或初始化失败: {e}") - # 初始化表情管理器 - get_emoji_manager().initialize() - logger.info("表情包管理器初始化成功") + # 并行初始化其他管理器 + manager_init_tasks = [] - """ - # 初始化回复后关系追踪系统 - try: - from src.plugins.built_in.affinity_flow_chatter.interest_scoring import chatter_interest_scoring_system - from src.plugins.built_in.affinity_flow_chatter.relationship_tracker import ChatterRelationshipTracker + # 表情管理器 + manager_init_tasks.append(self._safe_init("表情包管理器", get_emoji_manager().initialize)) - relationship_tracker = ChatterRelationshipTracker(interest_scoring_system=chatter_interest_scoring_system) - chatter_interest_scoring_system.relationship_tracker = relationship_tracker - logger.info("回复后关系追踪系统初始化成功") - except Exception as e: - logger.error(f"回复后关系追踪系统初始化失败: {e}") - relationship_tracker = None - """ + # 情绪管理器 + manager_init_tasks.append(self._safe_init("情绪管理器", mood_manager.start)) - # 启动情绪管理器 - await mood_manager.start() - logger.info("情绪管理器初始化成功") + # 聊天管理器 + manager_init_tasks.append(self._safe_init("聊天管理器", get_chat_manager()._initialize)) - # 初始化聊天管理器 - await get_chat_manager()._initialize() + # 等待所有管理器初始化完成 + results = await asyncio.gather(*manager_init_tasks, return_exceptions=True) + + # 检查初始化结果 + for i, result in enumerate(results): + if isinstance(result, Exception): + logger.error(f"组件初始化失败: {result}") + + # 启动聊天管理器的自动保存任务 asyncio.create_task(get_chat_manager()._auto_save_task()) - logger.info("聊天管理器初始化成功") # 初始化增强记忆系统 - await self.memory_manager.initialize() - logger.info("增强记忆系统初始化成功") - - # 老记忆系统已完全删除 + if global_config.memory.enable_memory: + await self._safe_init("增强记忆系统", self.memory_manager.initialize)() + else: + logger.info("记忆系统已禁用,跳过初始化") # 初始化消息兴趣值计算组件 await self._initialize_interest_calculator() # 初始化LPMM知识库 - from src.chat.knowledge.knowledge_lib import initialize_lpmm_knowledge + try: + from src.chat.knowledge.knowledge_lib import initialize_lpmm_knowledge + initialize_lpmm_knowledge() + logger.info("LPMM知识库初始化成功") + except Exception as e: + logger.error(f"LPMM知识库初始化失败: {e}") - initialize_lpmm_knowledge() - logger.info("LPMM知识库初始化成功") - - # 异步记忆管理器已禁用,增强记忆系统有内置的优化机制 - logger.info("异步记忆管理器已禁用 - 使用增强记忆系统内置优化") - - # await asyncio.sleep(0.5) #防止logger输出飞了 - - # 将bot.py中的chat_bot.message_process消息处理函数注册到api.py的消息处理基类中 + # 将消息处理函数注册到API self.app.register_message_handler(self._message_process_wrapper) - # 启动消息重组器的清理任务 - from src.utils.message_chunker import reassembler - - await reassembler.start_cleanup_task() - logger.info("消息重组器已启动") + # 启动消息重组器 + try: + from src.utils.message_chunker import reassembler + await reassembler.start_cleanup_task() + logger.info("消息重组器已启动") + except Exception as e: + logger.error(f"启动消息重组器失败: {e}") # 启动消息管理器 - from src.chat.message_manager import message_manager - - await message_manager.start() - logger.info("消息管理器已启动") + try: + from src.chat.message_manager import message_manager + await message_manager.start() + logger.info("消息管理器已启动") + except Exception as e: + logger.error(f"启动消息管理器失败: {e}") # 初始化个体特征 - await self.individuality.initialize() + await self._safe_init("个体特征", self.individuality.initialize)() + # 初始化计划相关组件 + await self._init_planning_components() + + # 触发启动事件 + try: + await event_manager.trigger_event(EventType.ON_START, permission_group="SYSTEM") + init_time = int(1000 * (time.time() - init_start_time)) + logger.info(f"初始化完成,神经元放电{init_time}次") + except Exception as e: + logger.error(f"启动事件触发失败: {e}") + + async def _init_planning_components(self) -> None: + """初始化计划相关组件""" # 初始化月度计划管理器 if global_config.planning_system.monthly_plan_enable: - logger.info("正在初始化月度计划管理器...") try: await monthly_plan_manager.start_monthly_plan_generation() logger.info("月度计划管理器初始化成功") @@ -426,23 +495,31 @@ MoFox_Bot(第三方修改版) # 初始化日程管理器 if global_config.planning_system.schedule_enable: - logger.info("日程表功能已启用,正在初始化管理器...") - await schedule_manager.load_or_generate_today_schedule() - await schedule_manager.start_daily_schedule_generation() - logger.info("日程表管理器初始化成功。") + try: + await schedule_manager.load_or_generate_today_schedule() + await schedule_manager.start_daily_schedule_generation() + logger.info("日程表管理器初始化成功") + except Exception as e: + logger.error(f"日程表管理器初始化失败: {e}") - try: - await event_manager.trigger_event(EventType.ON_START, permission_group="SYSTEM") - init_time = int(1000 * (time.time() - init_start_time)) - logger.info(f"初始化完成,神经元放电{init_time}次") - except Exception as e: - logger.error(f"启动大脑和外部世界失败: {e}") - raise + async def _safe_init(self, component_name: str, init_func) -> callable: + """安全初始化组件,捕获异常""" + async def wrapper(): + try: + result = init_func() + if asyncio.iscoroutine(result): + await result + logger.info(f"{component_name}初始化成功") + return True + except Exception as e: + logger.error(f"{component_name}初始化失败: {e}") + return False + return wrapper - async def schedule_tasks(self): + async def schedule_tasks(self) -> None: """调度定时任务""" try: - while True: + while not self._shutting_down: try: tasks = [ get_emoji_manager().start_periodic_check_register(), @@ -450,23 +527,25 @@ MoFox_Bot(第三方修改版) self.server.run(), ] - # 增强记忆系统不需要定时任务,已禁用原有记忆系统的定时任务 # 使用 return_exceptions=True 防止单个任务失败导致整个程序崩溃 await asyncio.gather(*tasks, return_exceptions=True) except (ConnectionResetError, OSError) as e: + if self._shutting_down: + break logger.warning(f"网络连接发生错误,尝试重新启动任务: {e}") - await asyncio.sleep(1) # 短暂等待后重新开始 - continue + await asyncio.sleep(1) except asyncio.InvalidStateError as e: + if self._shutting_down: + break logger.error(f"异步任务状态无效,重新初始化: {e}") - await asyncio.sleep(2) # 等待更长时间让系统稳定 - continue + await asyncio.sleep(2) except Exception as e: + if self._shutting_down: + break logger.error(f"调度任务发生未预期异常: {e}") logger.error(traceback.format_exc()) - await asyncio.sleep(5) # 发生其他错误时等待更长时间 - continue + await asyncio.sleep(5) except asyncio.CancelledError: logger.info("调度任务被取消,正在退出...") @@ -475,52 +554,37 @@ MoFox_Bot(第三方修改版) logger.error(traceback.format_exc()) raise - async def shutdown(self): + async def shutdown(self) -> None: """关闭系统组件""" + if self._shutting_down: + return + logger.info("正在关闭MainSystem...") - - # 关闭表情管理器 - try: - get_emoji_manager().shutdown() - logger.info("表情管理器已关闭") - except Exception as e: - logger.warning(f"关闭表情管理器时出错: {e}") - - # 关闭服务器 - try: - if self.server: - await self.server.shutdown() - logger.info("服务器已关闭") - except Exception as e: - logger.warning(f"关闭服务器时出错: {e}") - - # 关闭应用 (MessageServer可能没有shutdown方法) - try: - if self.app: - if hasattr(self.app, "shutdown"): - await self.app.shutdown() - logger.info("应用已关闭") - elif hasattr(self.app, "stop"): - await self.app.stop() - logger.info("应用已停止") - else: - logger.info("应用没有shutdown方法,跳过关闭") - except Exception as e: - logger.warning(f"关闭应用时出错: {e}") - + await self._async_cleanup() logger.info("MainSystem关闭完成") - # 老记忆系统的定时任务已删除 - 增强记忆系统使用内置的维护机制 - -async def main(): +async def main() -> None: """主函数""" system = MainSystem() - await asyncio.gather( - system.initialize(), - system.schedule_tasks(), - ) + try: + await system.initialize() + await system.schedule_tasks() + except KeyboardInterrupt: + logger.info("收到键盘中断信号") + except Exception as e: + logger.error(f"主函数执行失败: {e}") + logger.error(traceback.format_exc()) + finally: + await system.shutdown() if __name__ == "__main__": - asyncio.run(main()) + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("程序被用户中断") + except Exception as e: + logger.error(f"程序执行失败: {e}") + logger.error(traceback.format_exc()) + sys.exit(1) diff --git a/src/plugin_system/apis/generator_api.py b/src/plugin_system/apis/generator_api.py index 803a2d739..a76fc6e74 100644 --- a/src/plugin_system/apis/generator_api.py +++ b/src/plugin_system/apis/generator_api.py @@ -19,7 +19,7 @@ from src.common.logger import get_logger from src.plugin_system.base.component_types import ActionInfo if TYPE_CHECKING: - from src.chat.replyer.default_generator import DefaultReplyer + pass install(extra_lines=3) diff --git a/src/plugin_system/apis/tool_api.py b/src/plugin_system/apis/tool_api.py index 662a3693b..d64d366ba 100644 --- a/src/plugin_system/apis/tool_api.py +++ b/src/plugin_system/apis/tool_api.py @@ -19,7 +19,7 @@ def get_tool_instance(tool_name: str) -> BaseTool | None: tool_class: type[BaseTool] = component_registry.get_component_class(tool_name, ComponentType.TOOL) # type: ignore if tool_class: return tool_class(plugin_config) - + # 如果不是常规工具,检查是否是MCP工具 # MCP工具不需要返回实例,会在execute_tool_call中特殊处理 return None @@ -35,7 +35,7 @@ def get_llm_available_tool_definitions(): llm_available_tools = component_registry.get_llm_available_tools() tool_definitions = [(name, tool_class.get_tool_definition()) for name, tool_class in llm_available_tools.items()] - + # 添加MCP工具 try: from src.plugin_system.utils.mcp_tool_provider import mcp_tool_provider @@ -45,5 +45,5 @@ def get_llm_available_tool_definitions(): logger.debug(f"已添加 {len(mcp_tools)} 个MCP工具到可用工具列表") except Exception as e: logger.debug(f"获取MCP工具失败(可能未配置): {e}") - + return tool_definitions diff --git a/src/plugin_system/core/tool_use.py b/src/plugin_system/core/tool_use.py index 62b138d8f..ab53919b6 100644 --- a/src/plugin_system/core/tool_use.py +++ b/src/plugin_system/core/tool_use.py @@ -279,7 +279,7 @@ class ToolExecutor: logger.info( f"{self.log_prefix} 正在执行工具: [bold green]{function_name}[/bold green] | 参数: {function_args}" ) - + # 检查是否是MCP工具 try: from src.plugin_system.utils.mcp_tool_provider import mcp_tool_provider @@ -295,7 +295,7 @@ class ToolExecutor: } except Exception as e: logger.debug(f"检查MCP工具时出错: {e}") - + function_args["llm_called"] = True # 标记为LLM调用 # 检查是否是二步工具的第二步调用 diff --git a/src/plugin_system/utils/mcp_connector.py b/src/plugin_system/utils/mcp_connector.py index 2cb065918..66b205f1e 100644 --- a/src/plugin_system/utils/mcp_connector.py +++ b/src/plugin_system/utils/mcp_connector.py @@ -3,11 +3,9 @@ MCP (Model Context Protocol) 连接器 负责连接MCP服务器,获取和执行工具 """ -import asyncio from typing import Any import aiohttp -import orjson from src.common.logger import get_logger diff --git a/src/plugin_system/utils/mcp_tool_provider.py b/src/plugin_system/utils/mcp_tool_provider.py index ad306ee68..90c921712 100644 --- a/src/plugin_system/utils/mcp_tool_provider.py +++ b/src/plugin_system/utils/mcp_tool_provider.py @@ -3,7 +3,6 @@ MCP工具提供器 - 简化版 直接集成到工具系统,无需复杂的插件架构 """ -import asyncio from typing import Any from src.common.logger import get_logger diff --git a/src/plugins/built_in/affinity_flow_chatter/affinity_interest_calculator.py b/src/plugins/built_in/affinity_flow_chatter/affinity_interest_calculator.py index 1eb17a543..420209903 100644 --- a/src/plugins/built_in/affinity_flow_chatter/affinity_interest_calculator.py +++ b/src/plugins/built_in/affinity_flow_chatter/affinity_interest_calculator.py @@ -4,9 +4,10 @@ """ import time -import orjson from typing import TYPE_CHECKING +import orjson + from src.chat.interest_system import bot_interest_manager from src.common.logger import get_logger from src.config.config import global_config diff --git a/src/plugins/built_in/affinity_flow_chatter/plan_executor.py b/src/plugins/built_in/affinity_flow_chatter/plan_executor.py index 91ea6ccc7..e68876aaf 100644 --- a/src/plugins/built_in/affinity_flow_chatter/plan_executor.py +++ b/src/plugins/built_in/affinity_flow_chatter/plan_executor.py @@ -230,11 +230,11 @@ class ChatterPlanExecutor: except Exception as e: error_message = str(e) logger.error(f"执行回复动作失败: {action_info.action_type}, 错误: {error_message}") - ''' + """ # 记录用户关系追踪 if success and action_info.action_message: await self._track_user_interaction(action_info, plan, reply_content) - ''' + """ execution_time = time.time() - start_time self.execution_stats["execution_times"].append(execution_time) diff --git a/src/plugins/built_in/affinity_flow_chatter/planner.py b/src/plugins/built_in/affinity_flow_chatter/planner.py index 52a55cbe4..0d243d964 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner.py @@ -10,10 +10,10 @@ from typing import TYPE_CHECKING, Any from src.common.logger import get_logger from src.config.config import global_config from src.mood.mood_manager import mood_manager +from src.plugin_system.base.component_types import ChatMode from src.plugins.built_in.affinity_flow_chatter.plan_executor import ChatterPlanExecutor from src.plugins.built_in.affinity_flow_chatter.plan_filter import ChatterPlanFilter from src.plugins.built_in.affinity_flow_chatter.plan_generator import ChatterPlanGenerator -from src.plugin_system.base.component_types import ChatMode if TYPE_CHECKING: from src.chat.planner_actions.action_manager import ChatterActionManager diff --git a/src/plugins/built_in/web_search_tool/engines/searxng_engine.py b/src/plugins/built_in/web_search_tool/engines/searxng_engine.py index e539b9227..75f9373bb 100644 --- a/src/plugins/built_in/web_search_tool/engines/searxng_engine.py +++ b/src/plugins/built_in/web_search_tool/engines/searxng_engine.py @@ -6,9 +6,7 @@ SearXNG search engine implementation from __future__ import annotations -import asyncio -import functools -from typing import Any, List +from typing import Any import httpx @@ -39,13 +37,13 @@ class SearXNGSearchEngine(BaseSearchEngine): instances = config_api.get_global_config("web_search.searxng_instances", None) if isinstance(instances, list): # 过滤空值 - self.instances: List[str] = [u.rstrip("/") for u in instances if isinstance(u, str) and u.strip()] + self.instances: list[str] = [u.rstrip("/") for u in instances if isinstance(u, str) and u.strip()] else: self.instances = [] api_keys = config_api.get_global_config("web_search.searxng_api_keys", None) if isinstance(api_keys, list): - self.api_keys: List[str | None] = [k.strip() if isinstance(k, str) and k.strip() else None for k in api_keys] + self.api_keys: list[str | None] = [k.strip() if isinstance(k, str) and k.strip() else None for k in api_keys] else: self.api_keys = [] @@ -85,7 +83,7 @@ class SearXNGSearchEngine(BaseSearchEngine): results.extend(instance_results) if len(results) >= num_results: break - except Exception as e: # noqa: BLE001 + except Exception as e: logger.warning(f"SearXNG 实例 {base_url} 调用失败: {e}") continue @@ -116,12 +114,12 @@ class SearXNGSearchEngine(BaseSearchEngine): try: resp = await self._client.get(url, params=params, headers=headers) resp.raise_for_status() - except Exception as e: # noqa: BLE001 + except Exception as e: raise RuntimeError(f"请求失败: {e}") from e try: data = resp.json() - except Exception as e: # noqa: BLE001 + except Exception as e: raise RuntimeError(f"解析 JSON 失败: {e}") from e raw_results = data.get("results", []) if isinstance(data, dict) else [] @@ -141,5 +139,5 @@ class SearXNGSearchEngine(BaseSearchEngine): async def __aenter__(self): return self - async def __aexit__(self, exc_type, exc, tb): # noqa: D401 + async def __aexit__(self, exc_type, exc, tb): await self._client.aclose() diff --git a/src/plugins/built_in/web_search_tool/plugin.py b/src/plugins/built_in/web_search_tool/plugin.py index f8a8c785d..681e829f4 100644 --- a/src/plugins/built_in/web_search_tool/plugin.py +++ b/src/plugins/built_in/web_search_tool/plugin.py @@ -41,8 +41,8 @@ class WEBSEARCHPLUGIN(BasePlugin): from .engines.bing_engine import BingSearchEngine from .engines.ddg_engine import DDGSearchEngine from .engines.exa_engine import ExaSearchEngine - from .engines.tavily_engine import TavilySearchEngine from .engines.searxng_engine import SearXNGSearchEngine + from .engines.tavily_engine import TavilySearchEngine # 实例化所有搜索引擎,这会触发API密钥管理器的初始化 exa_engine = ExaSearchEngine() diff --git a/src/plugins/built_in/web_search_tool/tools/web_search.py b/src/plugins/built_in/web_search_tool/tools/web_search.py index 47fd7946c..637349534 100644 --- a/src/plugins/built_in/web_search_tool/tools/web_search.py +++ b/src/plugins/built_in/web_search_tool/tools/web_search.py @@ -13,8 +13,8 @@ from src.plugin_system.apis import config_api from ..engines.bing_engine import BingSearchEngine from ..engines.ddg_engine import DDGSearchEngine from ..engines.exa_engine import ExaSearchEngine -from ..engines.tavily_engine import TavilySearchEngine from ..engines.searxng_engine import SearXNGSearchEngine +from ..engines.tavily_engine import TavilySearchEngine from ..utils.formatters import deduplicate_results, format_search_results logger = get_logger("web_search_tool")