From a273be76d39c5c5b55bb667986c5636e7116195e Mon Sep 17 00:00:00 2001 From: minecraft1024a Date: Sat, 4 Oct 2025 16:47:55 +0800 Subject: [PATCH] =?UTF-8?q?refactor(core):=20=E7=BB=9F=E4=B8=80=E5=92=8C?= =?UTF-8?q?=E6=94=B9=E8=BF=9B=E7=A8=8B=E5=BA=8F=E4=BC=98=E9=9B=85=E5=85=B3?= =?UTF-8?q?=E9=97=AD=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将分散的关闭逻辑集中到`MainSystem`中,并由`bot.py`中的顶层异常处理块统一调用。这简化了关闭流程,提高了系统的健壮性和可维护性。 - 将信号处理逻辑从`MainSystem`移除,改由`bot.py`中的`try...finally`块处理,以捕获更广泛的退出场景(如`KeyboardInterrupt`)。 - `graceful_shutdown`函数现在接收`main_system`实例,直接调用其`shutdown`方法,实现了责任的单一化。 - 为`EmojiManager`和`VectorMemoryStorage`添加了`shutdown`/`cleanup`方法,确保其后台任务和资源能被正确清理。 - 调整了`MemorySystem`中对`unified_storage.cleanup()`的调用,使其与接口保持一致。 --- bot.py | 43 ++--- src/chat/emoji_system/emoji_manager.py | 6 + src/chat/memory_system/memory_system.py | 12 +- .../memory_system/vector_memory_storage_v2.py | 5 + src/main.py | 171 +----------------- 5 files changed, 32 insertions(+), 205 deletions(-) diff --git a/bot.py b/bot.py index cde011249..4d5e12a49 100644 --- a/bot.py +++ b/bot.py @@ -76,7 +76,7 @@ async def request_shutdown() -> bool: try: if loop and not loop.is_closed(): try: - loop.run_until_complete(graceful_shutdown()) + loop.run_until_complete(graceful_shutdown(maibot.main_system)) except Exception as ge: # 捕捉优雅关闭时可能发生的错误 logger.error(f"优雅关闭时发生错误: {ge}") return False @@ -97,18 +97,15 @@ def easter_egg(): logger.info(rainbow_text) -async def graceful_shutdown(): +async def graceful_shutdown(main_system_instance): + """优雅地关闭所有系统组件""" try: logger.info("正在优雅关闭麦麦...") - # 首先停止服务器组件,避免网络连接被强制关闭 - try: - global server - if server and hasattr(server, 'shutdown'): - logger.info("正在关闭服务器...") - await server.shutdown() - except Exception as e: - logger.warning(f"关闭服务器时出错: {e}") + # 停止MainSystem中的组件,它会处理服务器等 + if main_system_instance and hasattr(main_system_instance, 'shutdown'): + logger.info("正在关闭MainSystem...") + await main_system_instance.shutdown() # 停止聊天管理器 try: @@ -138,14 +135,6 @@ async def graceful_shutdown(): except Exception as e: logger.warning(f"停止记忆系统时出错: {e}") - # 停止MainSystem - try: - global main_system - if main_system and hasattr(main_system, 'shutdown'): - logger.info("正在停止MainSystem...") - await main_system.shutdown() - except Exception as e: - logger.warning(f"停止MainSystem时出错: {e}") # 停止所有异步任务 try: @@ -178,6 +167,15 @@ async def graceful_shutdown(): # 关闭日志系统,释放文件句柄 shutdown_logging() + # 尝试停止事件循环 + try: + loop = asyncio.get_running_loop() + if loop.is_running(): + loop.stop() + logger.info("事件循环已请求停止") + except RuntimeError: + pass # 没有正在运行的事件循环 + except Exception as e: logger.error(f"麦麦关闭失败: {e}", exc_info=True) @@ -305,18 +303,13 @@ if __name__ == "__main__": if "loop" in locals() and loop and not loop.is_closed(): logger.info("开始执行最终关闭流程...") try: - loop.run_until_complete(graceful_shutdown()) + # 传递main_system实例 + loop.run_until_complete(graceful_shutdown(maibot.main_system)) except Exception as ge: logger.error(f"优雅关闭时发生错误: {ge}") loop.close() logger.info("事件循环已关闭") - # 关闭日志系统,释放文件句柄 - try: - shutdown_logging() - except Exception as e: - print(f"关闭日志系统时出错: {e}") - # 在程序退出前暂停,让你有机会看到输出 # input("按 Enter 键退出...") # <--- 添加这行 sys.exit(exit_code) # <--- 使用记录的退出码 diff --git a/src/chat/emoji_system/emoji_manager.py b/src/chat/emoji_system/emoji_manager.py index caefebff3..261f050cf 100644 --- a/src/chat/emoji_system/emoji_manager.py +++ b/src/chat/emoji_system/emoji_manager.py @@ -402,6 +402,12 @@ class EmojiManager: logger.info("启动表情包管理器") + def shutdown(self) -> None: + """关闭EmojiManager,取消正在运行的任务""" + if self._scan_task and not self._scan_task.done(): + self._scan_task.cancel() + logger.info("表情包扫描任务已取消") + def initialize(self) -> None: """初始化数据库连接和表情目录""" diff --git a/src/chat/memory_system/memory_system.py b/src/chat/memory_system/memory_system.py index dadfdb37b..6c00a84fe 100644 --- a/src/chat/memory_system/memory_system.py +++ b/src/chat/memory_system/memory_system.py @@ -1425,16 +1425,6 @@ class MemorySystem: def _fingerprint_key(user_id: str, fingerprint: str) -> str: return f"{user_id!s}:{fingerprint}" - def get_system_stats(self) -> dict[str, Any]: - """获取系统统计信息""" - return { - "status": self.status.value, - "total_memories": self.total_memories, - "last_build_time": self.last_build_time, - "last_retrieval_time": self.last_retrieval_time, - "config": asdict(self.config), - } - def _compute_memory_score(self, query_text: str, memory: MemoryChunk, context: dict[str, Any]) -> float: """根据查询和上下文为记忆计算匹配分数""" tokens_query = self._tokenize_text(query_text) @@ -1542,7 +1532,7 @@ class MemorySystem: # 保存统一存储数据 if self.unified_storage: - await self.unified_storage.cleanup() + self.unified_storage.cleanup() logger.info("✅ 简化记忆系统已关闭") diff --git a/src/chat/memory_system/vector_memory_storage_v2.py b/src/chat/memory_system/vector_memory_storage_v2.py index 5c349b584..fd5ca144f 100644 --- a/src/chat/memory_system/vector_memory_storage_v2.py +++ b/src/chat/memory_system/vector_memory_storage_v2.py @@ -964,6 +964,11 @@ class VectorMemoryStorage: logger.info("Vector记忆存储系统已停止") + def cleanup(self): + """清理资源,兼容旧接口""" + logger.info("正在清理VectorMemoryStorage资源...") + self.stop() + # 全局实例(可选) _global_vector_storage = None diff --git a/src/main.py b/src/main.py index 8c1f9cb52..e1eb6a9f1 100644 --- a/src/main.py +++ b/src/main.py @@ -68,175 +68,8 @@ class MainSystem: self.app: MessageServer = get_global_api() self.server: Server = get_global_server() - # 设置信号处理器用于优雅退出 - self._setup_signal_handlers() - - def _setup_signal_handlers(self): - """设置信号处理器""" - - def signal_handler(signum, frame): - logger.info("收到退出信号,正在优雅关闭系统...") - - import asyncio - - try: - loop = asyncio.get_event_loop() - if loop.is_running(): - # 如果事件循环正在运行,创建任务并设置回调 - async def cleanup_and_exit(): - await self._async_cleanup() - sys.exit(0) - - task = asyncio.create_task(cleanup_and_exit()) - # 添加任务完成回调,确保程序退出 - task.add_done_callback(lambda t: None) - else: - # 如果事件循环未运行,使用同步清理 - self._cleanup() - sys.exit(0) - except Exception as e: - logger.error(f"信号处理失败: {e}") - sys.exit(1) - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - async def _initialize_interest_calculator(self): - """初始化兴趣值计算组件 - 通过插件系统自动发现和加载""" - try: - logger.info("开始自动发现兴趣值计算组件...") - - # 使用组件注册表自动发现兴趣计算器组件 - interest_calculators = {} - try: - from src.plugin_system.apis.component_manage_api import get_components_info_by_type - from src.plugin_system.base.component_types import ComponentType - interest_calculators = get_components_info_by_type(ComponentType.INTEREST_CALCULATOR) - logger.info(f"通过组件注册表发现 {len(interest_calculators)} 个兴趣计算器组件") - except Exception as e: - logger.error(f"从组件注册表获取兴趣计算器失败: {e}") - - if not interest_calculators: - logger.warning("未发现任何兴趣计算器组件") - return - - logger.info(f"发现的兴趣计算器组件:") - for calc_name, calc_info in interest_calculators.items(): - enabled = getattr(calc_info, 'enabled', True) - default_enabled = getattr(calc_info, 'enabled_by_default', True) - logger.info(f" - {calc_name}: 启用: {enabled}, 默认启用: {default_enabled}") - - # 初始化兴趣度管理器 - from src.chat.interest_system.interest_manager import get_interest_manager - interest_manager = get_interest_manager() - await interest_manager.initialize() - - # 尝试注册计算器(单例模式,只注册第一个可用的) - registered_calculator = None - - # 使用组件注册表获取组件类并注册 - for calc_name, calc_info in interest_calculators.items(): - enabled = getattr(calc_info, 'enabled', True) - default_enabled = getattr(calc_info, 'enabled_by_default', True) - - if not enabled or not default_enabled: - logger.info(f"兴趣计算器 {calc_name} 未启用,跳过") - continue - - try: - from src.plugin_system.core.component_registry import component_registry - component_class = component_registry.get_component_class(calc_name, ComponentType.INTEREST_CALCULATOR) - - if component_class: - logger.info(f"成功获取 {calc_name} 的组件类: {component_class.__name__}") - - # 创建组件实例 - calculator_instance = component_class() - logger.info(f"成功创建兴趣计算器实例: {calc_name}") - - # 初始化组件 - if await calculator_instance.initialize(): - # 注册到兴趣管理器 - success = await interest_manager.register_calculator(calculator_instance) - if success: - registered_calculator = calculator_instance - logger.info(f"成功注册兴趣计算器: {calc_name}") - break # 只注册一个成功的计算器 - else: - logger.error(f"兴趣计算器 {calc_name} 注册失败") - else: - logger.error(f"兴趣计算器 {calc_name} 初始化失败") - else: - logger.warning(f"无法找到 {calc_name} 的组件类") - - except Exception as e: - logger.error(f"处理兴趣计算器 {calc_name} 时出错: {e}", exc_info=True) - - if registered_calculator: - logger.info(f"当前活跃的兴趣度计算器: {registered_calculator.component_name} v{registered_calculator.component_version}") - else: - logger.error("未能成功注册任何兴趣计算器") - - except Exception as e: - logger.error(f"初始化兴趣度计算器失败: {e}", exc_info=True) - - async def _async_cleanup(self): - """异步清理资源""" - try: - - # 停止数据库服务 - try: - from src.common.database.database import stop_database - await stop_database() - logger.info("🛑 数据库服务已停止") - except Exception as e: - logger.error(f"停止数据库服务时出错: {e}") - - # 停止消息管理器 - try: - from src.chat.message_manager import message_manager - await message_manager.stop() - logger.info("🛑 消息管理器已停止") - except Exception as e: - logger.error(f"停止消息管理器时出错: {e}") - - # 停止消息重组器 - try: - from src.plugin_system import EventType - from src.plugin_system.core.event_manager import event_manager - from src.utils.message_chunker import reassembler - - await event_manager.trigger_event(EventType.ON_STOP, permission_group="SYSTEM") - await reassembler.stop_cleanup_task() - logger.info("🛑 消息重组器已停止") - except Exception as e: - logger.error(f"停止消息重组器时出错: {e}") - - # 停止增强记忆系统 - try: - if global_config.memory.enable_memory: - await self.memory_manager.shutdown() - logger.info("🛑 增强记忆系统已停止") - except Exception as e: - logger.error(f"停止增强记忆系统时出错: {e}") - - except Exception as e: - logger.error(f"异步清理资源时出错: {e}") - - def _cleanup(self): - """同步清理资源(向后兼容)""" - import asyncio - - try: - loop = asyncio.get_event_loop() - if loop.is_running(): - # 如果循环正在运行,创建异步清理任务 - asyncio.create_task(self._async_cleanup()) - else: - # 如果循环未运行,直接运行异步清理 - loop.run_until_complete(self._async_cleanup()) - except Exception as e: - logger.error(f"同步清理资源时出错: {e}") + # 信号处理现在由bot.py的KeyboardInterrupt处理 + pass async def _message_process_wrapper(self, message_data: dict[str, Any]): """并行处理消息的包装器"""