refactor(core): 统一和改进程序优雅关闭逻辑
将分散的关闭逻辑集中到`MainSystem`中,并由`bot.py`中的顶层异常处理块统一调用。这简化了关闭流程,提高了系统的健壮性和可维护性。 - 将信号处理逻辑从`MainSystem`移除,改由`bot.py`中的`try...finally`块处理,以捕获更广泛的退出场景(如`KeyboardInterrupt`)。 - `graceful_shutdown`函数现在接收`main_system`实例,直接调用其`shutdown`方法,实现了责任的单一化。 - 为`EmojiManager`和`VectorMemoryStorage`添加了`shutdown`/`cleanup`方法,确保其后台任务和资源能被正确清理。 - 调整了`MemorySystem`中对`unified_storage.cleanup()`的调用,使其与接口保持一致。
This commit is contained in:
43
bot.py
43
bot.py
@@ -76,7 +76,7 @@ async def request_shutdown() -> bool:
|
|||||||
try:
|
try:
|
||||||
if loop and not loop.is_closed():
|
if loop and not loop.is_closed():
|
||||||
try:
|
try:
|
||||||
loop.run_until_complete(graceful_shutdown())
|
loop.run_until_complete(graceful_shutdown(maibot.main_system))
|
||||||
except Exception as ge: # 捕捉优雅关闭时可能发生的错误
|
except Exception as ge: # 捕捉优雅关闭时可能发生的错误
|
||||||
logger.error(f"优雅关闭时发生错误: {ge}")
|
logger.error(f"优雅关闭时发生错误: {ge}")
|
||||||
return False
|
return False
|
||||||
@@ -97,18 +97,15 @@ def easter_egg():
|
|||||||
logger.info(rainbow_text)
|
logger.info(rainbow_text)
|
||||||
|
|
||||||
|
|
||||||
async def graceful_shutdown():
|
async def graceful_shutdown(main_system_instance):
|
||||||
|
"""优雅地关闭所有系统组件"""
|
||||||
try:
|
try:
|
||||||
logger.info("正在优雅关闭麦麦...")
|
logger.info("正在优雅关闭麦麦...")
|
||||||
|
|
||||||
# 首先停止服务器组件,避免网络连接被强制关闭
|
# 停止MainSystem中的组件,它会处理服务器等
|
||||||
try:
|
if main_system_instance and hasattr(main_system_instance, 'shutdown'):
|
||||||
global server
|
logger.info("正在关闭MainSystem...")
|
||||||
if server and hasattr(server, 'shutdown'):
|
await main_system_instance.shutdown()
|
||||||
logger.info("正在关闭服务器...")
|
|
||||||
await server.shutdown()
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"关闭服务器时出错: {e}")
|
|
||||||
|
|
||||||
# 停止聊天管理器
|
# 停止聊天管理器
|
||||||
try:
|
try:
|
||||||
@@ -138,14 +135,6 @@ async def graceful_shutdown():
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"停止记忆系统时出错: {e}")
|
logger.warning(f"停止记忆系统时出错: {e}")
|
||||||
|
|
||||||
# 停止MainSystem
|
|
||||||
try:
|
|
||||||
global main_system
|
|
||||||
if main_system and hasattr(main_system, 'shutdown'):
|
|
||||||
logger.info("正在停止MainSystem...")
|
|
||||||
await main_system.shutdown()
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"停止MainSystem时出错: {e}")
|
|
||||||
|
|
||||||
# 停止所有异步任务
|
# 停止所有异步任务
|
||||||
try:
|
try:
|
||||||
@@ -178,6 +167,15 @@ async def graceful_shutdown():
|
|||||||
# 关闭日志系统,释放文件句柄
|
# 关闭日志系统,释放文件句柄
|
||||||
shutdown_logging()
|
shutdown_logging()
|
||||||
|
|
||||||
|
# 尝试停止事件循环
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
if loop.is_running():
|
||||||
|
loop.stop()
|
||||||
|
logger.info("事件循环已请求停止")
|
||||||
|
except RuntimeError:
|
||||||
|
pass # 没有正在运行的事件循环
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"麦麦关闭失败: {e}", exc_info=True)
|
logger.error(f"麦麦关闭失败: {e}", exc_info=True)
|
||||||
|
|
||||||
@@ -305,18 +303,13 @@ if __name__ == "__main__":
|
|||||||
if "loop" in locals() and loop and not loop.is_closed():
|
if "loop" in locals() and loop and not loop.is_closed():
|
||||||
logger.info("开始执行最终关闭流程...")
|
logger.info("开始执行最终关闭流程...")
|
||||||
try:
|
try:
|
||||||
loop.run_until_complete(graceful_shutdown())
|
# 传递main_system实例
|
||||||
|
loop.run_until_complete(graceful_shutdown(maibot.main_system))
|
||||||
except Exception as ge:
|
except Exception as ge:
|
||||||
logger.error(f"优雅关闭时发生错误: {ge}")
|
logger.error(f"优雅关闭时发生错误: {ge}")
|
||||||
loop.close()
|
loop.close()
|
||||||
logger.info("事件循环已关闭")
|
logger.info("事件循环已关闭")
|
||||||
|
|
||||||
# 关闭日志系统,释放文件句柄
|
|
||||||
try:
|
|
||||||
shutdown_logging()
|
|
||||||
except Exception as e:
|
|
||||||
print(f"关闭日志系统时出错: {e}")
|
|
||||||
|
|
||||||
# 在程序退出前暂停,让你有机会看到输出
|
# 在程序退出前暂停,让你有机会看到输出
|
||||||
# input("按 Enter 键退出...") # <--- 添加这行
|
# input("按 Enter 键退出...") # <--- 添加这行
|
||||||
sys.exit(exit_code) # <--- 使用记录的退出码
|
sys.exit(exit_code) # <--- 使用记录的退出码
|
||||||
|
|||||||
@@ -402,6 +402,12 @@ class EmojiManager:
|
|||||||
|
|
||||||
logger.info("启动表情包管理器")
|
logger.info("启动表情包管理器")
|
||||||
|
|
||||||
|
def shutdown(self) -> None:
|
||||||
|
"""关闭EmojiManager,取消正在运行的任务"""
|
||||||
|
if self._scan_task and not self._scan_task.done():
|
||||||
|
self._scan_task.cancel()
|
||||||
|
logger.info("表情包扫描任务已取消")
|
||||||
|
|
||||||
def initialize(self) -> None:
|
def initialize(self) -> None:
|
||||||
"""初始化数据库连接和表情目录"""
|
"""初始化数据库连接和表情目录"""
|
||||||
|
|
||||||
|
|||||||
@@ -1425,16 +1425,6 @@ class MemorySystem:
|
|||||||
def _fingerprint_key(user_id: str, fingerprint: str) -> str:
|
def _fingerprint_key(user_id: str, fingerprint: str) -> str:
|
||||||
return f"{user_id!s}:{fingerprint}"
|
return f"{user_id!s}:{fingerprint}"
|
||||||
|
|
||||||
def get_system_stats(self) -> dict[str, Any]:
|
|
||||||
"""获取系统统计信息"""
|
|
||||||
return {
|
|
||||||
"status": self.status.value,
|
|
||||||
"total_memories": self.total_memories,
|
|
||||||
"last_build_time": self.last_build_time,
|
|
||||||
"last_retrieval_time": self.last_retrieval_time,
|
|
||||||
"config": asdict(self.config),
|
|
||||||
}
|
|
||||||
|
|
||||||
def _compute_memory_score(self, query_text: str, memory: MemoryChunk, context: dict[str, Any]) -> float:
|
def _compute_memory_score(self, query_text: str, memory: MemoryChunk, context: dict[str, Any]) -> float:
|
||||||
"""根据查询和上下文为记忆计算匹配分数"""
|
"""根据查询和上下文为记忆计算匹配分数"""
|
||||||
tokens_query = self._tokenize_text(query_text)
|
tokens_query = self._tokenize_text(query_text)
|
||||||
@@ -1542,7 +1532,7 @@ class MemorySystem:
|
|||||||
|
|
||||||
# 保存统一存储数据
|
# 保存统一存储数据
|
||||||
if self.unified_storage:
|
if self.unified_storage:
|
||||||
await self.unified_storage.cleanup()
|
self.unified_storage.cleanup()
|
||||||
|
|
||||||
logger.info("✅ 简化记忆系统已关闭")
|
logger.info("✅ 简化记忆系统已关闭")
|
||||||
|
|
||||||
|
|||||||
@@ -964,6 +964,11 @@ class VectorMemoryStorage:
|
|||||||
|
|
||||||
logger.info("Vector记忆存储系统已停止")
|
logger.info("Vector记忆存储系统已停止")
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
"""清理资源,兼容旧接口"""
|
||||||
|
logger.info("正在清理VectorMemoryStorage资源...")
|
||||||
|
self.stop()
|
||||||
|
|
||||||
|
|
||||||
# 全局实例(可选)
|
# 全局实例(可选)
|
||||||
_global_vector_storage = None
|
_global_vector_storage = None
|
||||||
|
|||||||
92
src/main.py
92
src/main.py
@@ -68,96 +68,8 @@ class MainSystem:
|
|||||||
self.app: MessageServer = get_global_api()
|
self.app: MessageServer = get_global_api()
|
||||||
self.server: Server = get_global_server()
|
self.server: Server = get_global_server()
|
||||||
|
|
||||||
# 设置信号处理器用于优雅退出
|
# 信号处理现在由bot.py的KeyboardInterrupt处理
|
||||||
self._setup_signal_handlers()
|
pass
|
||||||
|
|
||||||
def _setup_signal_handlers(self):
|
|
||||||
"""设置信号处理器"""
|
|
||||||
|
|
||||||
def signal_handler(signum, frame):
|
|
||||||
logger.info("收到退出信号,正在优雅关闭系统...")
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
try:
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
if loop.is_running():
|
|
||||||
# 如果事件循环正在运行,创建任务并设置回调
|
|
||||||
async def cleanup_and_exit():
|
|
||||||
await self._async_cleanup()
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
task = asyncio.create_task(cleanup_and_exit())
|
|
||||||
# 添加任务完成回调,确保程序退出
|
|
||||||
task.add_done_callback(lambda t: None)
|
|
||||||
else:
|
|
||||||
# 如果事件循环未运行,使用同步清理
|
|
||||||
self._cleanup()
|
|
||||||
sys.exit(0)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"信号处理失败: {e}")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
signal.signal(signal.SIGINT, signal_handler)
|
|
||||||
signal.signal(signal.SIGTERM, signal_handler)
|
|
||||||
|
|
||||||
async def _async_cleanup(self):
|
|
||||||
"""异步清理资源"""
|
|
||||||
try:
|
|
||||||
|
|
||||||
# 停止数据库服务
|
|
||||||
try:
|
|
||||||
from src.common.database.database import stop_database
|
|
||||||
await stop_database()
|
|
||||||
logger.info("🛑 数据库服务已停止")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"停止数据库服务时出错: {e}")
|
|
||||||
|
|
||||||
# 停止消息管理器
|
|
||||||
try:
|
|
||||||
from src.chat.message_manager import message_manager
|
|
||||||
await message_manager.stop()
|
|
||||||
logger.info("🛑 消息管理器已停止")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"停止消息管理器时出错: {e}")
|
|
||||||
|
|
||||||
# 停止消息重组器
|
|
||||||
try:
|
|
||||||
from src.plugin_system import EventType
|
|
||||||
from src.plugin_system.core.event_manager import event_manager
|
|
||||||
from src.utils.message_chunker import reassembler
|
|
||||||
|
|
||||||
await event_manager.trigger_event(EventType.ON_STOP, permission_group="SYSTEM")
|
|
||||||
await reassembler.stop_cleanup_task()
|
|
||||||
logger.info("🛑 消息重组器已停止")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"停止消息重组器时出错: {e}")
|
|
||||||
|
|
||||||
# 停止增强记忆系统
|
|
||||||
try:
|
|
||||||
if global_config.memory.enable_memory:
|
|
||||||
await self.memory_manager.shutdown()
|
|
||||||
logger.info("🛑 增强记忆系统已停止")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"停止增强记忆系统时出错: {e}")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"异步清理资源时出错: {e}")
|
|
||||||
|
|
||||||
def _cleanup(self):
|
|
||||||
"""同步清理资源(向后兼容)"""
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
try:
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
if loop.is_running():
|
|
||||||
# 如果循环正在运行,创建异步清理任务
|
|
||||||
asyncio.create_task(self._async_cleanup())
|
|
||||||
else:
|
|
||||||
# 如果循环未运行,直接运行异步清理
|
|
||||||
loop.run_until_complete(self._async_cleanup())
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"同步清理资源时出错: {e}")
|
|
||||||
|
|
||||||
async def _message_process_wrapper(self, message_data: dict[str, Any]):
|
async def _message_process_wrapper(self, message_data: dict[str, Any]):
|
||||||
"""并行处理消息的包装器"""
|
"""并行处理消息的包装器"""
|
||||||
|
|||||||
Reference in New Issue
Block a user