调整部分结构

This commit is contained in:
LuiKlee
2025-10-05 18:42:28 +08:00
committed by GitHub
parent fbe35d1d01
commit 34495c07cd

675
bot.py
View File

@@ -1,315 +1,504 @@
# import asyncio # import asyncio
import asyncio import asyncio
import os import os
import platform
import sys import sys
import time import time
import platform
import traceback import traceback
from pathlib import Path from pathlib import Path
from contextlib import asynccontextmanager
import hashlib
from typing import Optional, Dict, Any
from colorama import Fore, init # 初始化基础工具
from dotenv import load_dotenv # 处理.env文件 from colorama import init, Fore
from dotenv import load_dotenv
from rich.traceback import install from rich.traceback import install
# maim_message imports for console input # 初始化日志系统
# 最早期初始化日志系统,确保所有后续模块都使用正确的日志格式 from src.common.logger import initialize_logging, get_logger, shutdown_logging
from src.common.logger import get_logger, initialize_logging, shutdown_logging
# UI日志适配器 # 初始化日志和错误显示
initialize_logging() initialize_logging()
from src.main import MainSystem # noqa
from src import BaseMain
from src.manager.async_task_manager import async_task_manager
from src.chat.knowledge.knowledge_lib import initialize_lpmm_knowledge
from src.config.config import global_config
from src.common.database.database import initialize_sql_database
from src.common.database.sqlalchemy_models import initialize_database as init_db
logger = get_logger("main") logger = get_logger("main")
install(extra_lines=3) install(extra_lines=3)
# 常量定义
SUPPORTED_DATABASES = ['sqlite', 'mysql', 'postgresql']
SHUTDOWN_TIMEOUT = 10.0
EULA_CHECK_INTERVAL = 2
MAX_EULA_CHECK_ATTEMPTS = 30
MAX_ENV_FILE_SIZE = 1024 * 1024 # 1MB限制
# 设置工作目录为脚本所在目录 # 设置工作目录为脚本所在目录
script_dir = os.path.dirname(os.path.abspath(__file__)) script_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(script_dir) os.chdir(script_dir)
logger.info(f"已设置工作目录为: {script_dir}") logger.info("工作目录已设置")
class ConfigManager:
"""配置管理器"""
@staticmethod
def ensure_env_file():
"""确保.env文件存在如果不存在则从模板创建"""
env_file = Path(".env")
template_env = Path("template/template.env")
if not env_file.exists():
if template_env.exists():
logger.info("未找到.env文件正在从模板创建...")
try:
env_file.write_text(template_env.read_text(encoding='utf-8'), encoding='utf-8')
logger.info("已从template/template.env创建.env文件")
logger.warning("请编辑.env文件将EULA_CONFIRMED设置为true并配置其他必要参数")
except Exception as e:
logger.error(f"创建.env文件失败: {e}")
sys.exit(1)
else:
logger.error("未找到.env文件和template.env模板文件")
sys.exit(1)
# 检查并创建.env文件 @staticmethod
def ensure_env_file(): def verify_env_file_integrity():
"""确保.env文件存在,如果不存在则从模板创建""" """验证.env文件完整性"""
env_file = Path(".env") env_file = Path(".env")
template_env = Path("template/template.env") if not env_file.exists():
return False
# 检查文件大小
file_size = env_file.stat().st_size
if file_size == 0 or file_size > MAX_ENV_FILE_SIZE:
logger.error(f".env文件大小异常: {file_size}字节")
return False
# 检查文件内容是否包含必要字段
try:
content = env_file.read_text(encoding='utf-8')
if 'EULA_CONFIRMED' not in content:
logger.error(".env文件缺少EULA_CONFIRMED字段")
return False
except Exception as e:
logger.error(f"读取.env文件失败: {e}")
return False
return True
if not env_file.exists(): @staticmethod
if template_env.exists(): def safe_load_dotenv():
logger.info("未找到.env文件正在从模板创建...") """安全加载环境变量"""
import shutil try:
if not ConfigManager.verify_env_file_integrity():
logger.error(".env文件完整性验证失败")
return False
load_dotenv()
logger.info("环境变量加载成功")
return True
except Exception as e:
logger.error(f"加载环境变量失败: {e}")
return False
shutil.copy(template_env, env_file) class EULAManager:
logger.info("已从template/template.env创建.env文件") """EULA管理类"""
logger.warning("请编辑.env文件将EULA_CONFIRMED设置为true并配置其他必要参数")
else: @staticmethod
logger.error("未找到.env文件和template.env模板文件") async def check_eula():
"""检查EULA和隐私条款确认状态"""
confirm_logger = get_logger("confirm")
if not ConfigManager.safe_load_dotenv():
confirm_logger.error("无法加载环境变量EULA检查失败")
sys.exit(1) sys.exit(1)
eula_confirmed = os.getenv('EULA_CONFIRMED', '').lower()
if eula_confirmed == 'true':
logger.info("EULA已通过环境变量确认")
return
# 提示用户确认EULA
confirm_logger.critical("您需要同意EULA和隐私条款才能使用MoFox_Bot")
confirm_logger.critical("请阅读以下文件:")
confirm_logger.critical(" - EULA.md (用户许可协议)")
confirm_logger.critical(" - PRIVACY.md (隐私条款)")
confirm_logger.critical("然后编辑 .env 文件,将 'EULA_CONFIRMED=false' 改为 'EULA_CONFIRMED=true'")
attempts = 0
while attempts < MAX_EULA_CHECK_ATTEMPTS:
try:
await asyncio.sleep(EULA_CHECK_INTERVAL)
attempts += 1
# 重新加载环境变量
ConfigManager.safe_load_dotenv()
eula_confirmed = os.getenv('EULA_CONFIRMED', '').lower()
if eula_confirmed == 'true':
confirm_logger.info("EULA确认成功感谢您的同意")
return
if attempts % 5 == 0:
confirm_logger.critical(f"请修改 .env 文件中的 EULA_CONFIRMED=true (尝试 {attempts}/{MAX_EULA_CHECK_ATTEMPTS})")
except KeyboardInterrupt:
confirm_logger.info("用户取消,程序退出")
sys.exit(0)
except Exception as e:
confirm_logger.error(f"检查EULA状态失败: {e}")
if attempts >= MAX_EULA_CHECK_ATTEMPTS:
confirm_logger.error("达到最大检查次数,程序退出")
sys.exit(1)
confirm_logger.error("EULA确认超时程序退出")
sys.exit(1)
class TaskManager:
"""任务管理器"""
@staticmethod
async def cancel_pending_tasks(loop, timeout=SHUTDOWN_TIMEOUT):
"""取消所有待处理的任务"""
remaining_tasks = [
t for t in asyncio.all_tasks(loop)
if t is not asyncio.current_task(loop) and not t.done()
]
if not remaining_tasks:
logger.info("没有待取消的任务")
return True
logger.info(f"正在取消 {len(remaining_tasks)} 个剩余任务...")
# 取消任务
for task in remaining_tasks:
task.cancel()
# 等待任务完成
try:
results = await asyncio.wait_for(
asyncio.gather(*remaining_tasks, return_exceptions=True),
timeout=timeout
)
# 检查任务结果
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.warning(f"任务 {i} 取消时发生异常: {result}")
logger.info("所有剩余任务已成功取消")
return True
except asyncio.TimeoutError:
logger.warning("等待任务取消超时,强制继续关闭")
return False
except Exception as e:
logger.error(f"等待任务取消时发生异常: {e}")
return False
# 确保环境文件存在 @staticmethod
ensure_env_file() async def stop_async_tasks():
"""停止所有异步任务"""
try:
from src.manager.async_task_manager import async_task_manager
await async_task_manager.stop_and_wait_all_tasks()
return True
except ImportError:
logger.warning("异步任务管理器不可用,跳过任务停止")
return False
except Exception as e:
logger.error(f"停止异步任务失败: {e}")
return False
# 加载环境变量 class ShutdownManager:
load_dotenv() """关闭管理器"""
@staticmethod
async def graceful_shutdown(loop=None):
"""优雅关闭程序"""
try:
logger.info("正在优雅关闭麦麦...")
start_time = time.time()
# 停止异步任务
tasks_stopped = await TaskManager.stop_async_tasks()
# 取消待处理任务
tasks_cancelled = True
if loop and not loop.is_closed():
tasks_cancelled = await TaskManager.cancel_pending_tasks(loop)
shutdown_time = time.time() - start_time
success = tasks_stopped and tasks_cancelled
if success:
logger.info(f"麦麦优雅关闭完成,耗时: {shutdown_time:.2f}")
else:
logger.warning(f"麦麦关闭完成,但部分操作未成功,耗时: {shutdown_time:.2f}")
return success
confirm_logger = get_logger("confirm") except Exception as e:
# 获取没有加载env时的环境变量 logger.error(f"麦麦关闭失败: {e}", exc_info=True)
return False
uvicorn_server = None @asynccontextmanager
driver = None async def create_event_loop_context():
app = None """创建事件循环的上下文管理器"""
loop = None loop = None
main_system = None
async def request_shutdown() -> bool:
"""请求关闭程序"""
try: try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
except Exception as e:
logger.error(f"创建事件循环失败: {e}")
raise
finally:
if loop and not loop.is_closed(): if loop and not loop.is_closed():
try: try:
loop.run_until_complete(graceful_shutdown(maibot.main_system)) await ShutdownManager.graceful_shutdown(loop)
except Exception as ge: # 捕捉优雅关闭时可能发生的错误 except Exception as e:
logger.error(f"优雅关闭时发生错误: {ge}") logger.error(f"关闭事件循环时出错: {e}")
return False finally:
return True try:
except Exception as e: loop.close()
logger.error(f"请求关闭程序时发生错误: {e}") logger.info("事件循环已关闭")
except Exception as e:
logger.error(f"关闭事件循环失败: {e}")
class DatabaseManager:
"""数据库连接管理器"""
def __init__(self):
self._connection = None
async def __aenter__(self):
"""异步上下文管理器入口"""
try:
from src.common.database.database import initialize_sql_database
from src.config.config import global_config
logger.info("正在初始化数据库连接...")
start_time = time.time()
# 使用线程执行器运行潜在的阻塞操作
await asyncio.to_thread(initialize_sql_database, global_config.database)
elapsed_time = time.time() - start_time
logger.info(f"数据库连接初始化成功,使用 {global_config.database.database_type} 数据库,耗时: {elapsed_time:.2f}")
return self
except Exception as e:
logger.error(f"数据库连接初始化失败: {e}")
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
if exc_type:
logger.error(f"数据库操作发生异常: {exc_val}")
return False return False
class ConfigurationValidator:
def easter_egg(): """配置验证器"""
# 彩蛋
init() @staticmethod
text = "多年以后面对AI行刑队张三将会回想起他2023年在会议上讨论人工智能的那个下午" def validate_configuration():
rainbow_colors = [Fore.RED, Fore.YELLOW, Fore.GREEN, Fore.CYAN, Fore.BLUE, Fore.MAGENTA] """验证关键配置"""
rainbow_text = ""
for i, char in enumerate(text):
rainbow_text += rainbow_colors[i % len(rainbow_colors)] + char
logger.info(rainbow_text)
async def graceful_shutdown(main_system_instance):
"""优雅地关闭所有系统组件"""
try:
logger.info("正在优雅关闭麦麦...")
# 停止MainSystem中的组件它会处理服务器等
if main_system_instance and hasattr(main_system_instance, "shutdown"):
logger.info("正在关闭MainSystem...")
await main_system_instance.shutdown()
# 停止聊天管理器
try: try:
from src.chat.message_receive.chat_stream import get_chat_manager from src.config.config import global_config
chat_manager = get_chat_manager()
if hasattr(chat_manager, "_stop_auto_save"): # 检查必要的配置节
logger.info("正在停止聊天管理器...") required_sections = ['database', 'bot']
chat_manager._stop_auto_save() for section in required_sections:
if not hasattr(global_config, section):
logger.error(f"配置中缺少{section}配置节")
return False
# 验证数据库配置
db_config = global_config.database
if not hasattr(db_config, 'database_type') or not db_config.database_type:
logger.error("数据库配置缺少database_type字段")
return False
if db_config.database_type not in SUPPORTED_DATABASES:
logger.error(f"不支持的数据库类型: {db_config.database_type}")
logger.info(f"支持的数据库类型: {', '.join(SUPPORTED_DATABASES)}")
return False
logger.info("配置验证通过")
return True
except ImportError:
logger.error("无法导入全局配置模块")
return False
except Exception as e: except Exception as e:
logger.warning(f"停止聊天管理器时出错: {e}") logger.error(f"配置验证失败: {e}")
return False
# 停止情绪管理器 class EasterEgg:
try: """彩蛋功能"""
from src.mood.mood_manager import mood_manager
if hasattr(mood_manager, "stop"): _initialized = False
logger.info("正在停止情绪管理器...")
await mood_manager.stop() @classmethod
except Exception as e: def show(cls):
logger.warning(f"停止情绪管理器时出错: {e}") """显示彩色文本"""
if not cls._initialized:
init()
cls._initialized = True
text = "多年以后面对AI行刑队张三将会回想起他2023年在会议上讨论人工智能的那个下午"
rainbow_colors = [Fore.RED, Fore.YELLOW, Fore.GREEN, Fore.CYAN, Fore.BLUE, Fore.MAGENTA]
rainbow_text = ""
for i, char in enumerate(text):
rainbow_text += rainbow_colors[i % len(rainbow_colors)] + char
logger.info(rainbow_text)
# 停止记忆系统 class MaiBotMain:
try:
from src.chat.memory_system.memory_manager import memory_manager
if hasattr(memory_manager, "shutdown"):
logger.info("正在停止记忆系统...")
await memory_manager.shutdown()
except Exception as e:
logger.warning(f"停止记忆系统时出错: {e}")
# 停止所有异步任务
try:
await async_task_manager.stop_and_wait_all_tasks()
except Exception as e:
logger.warning(f"停止异步任务管理器时出错: {e}")
# 获取所有剩余任务,排除当前任务
remaining_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
if remaining_tasks:
logger.info(f"正在取消 {len(remaining_tasks)} 个剩余任务...")
# 取消所有剩余任务
for task in remaining_tasks:
if not task.done():
task.cancel()
# 等待所有任务完成,设置超时
try:
await asyncio.wait_for(asyncio.gather(*remaining_tasks, return_exceptions=True), timeout=15.0)
logger.info("所有剩余任务已成功取消")
except asyncio.TimeoutError:
logger.warning("等待任务取消超时,强制继续关闭")
except Exception as e:
logger.error(f"等待任务取消时发生异常: {e}")
logger.info("麦麦优雅关闭完成")
# 关闭日志系统,释放文件句柄
shutdown_logging()
# 尝试停止事件循环
try:
loop = asyncio.get_running_loop()
if loop.is_running():
loop.stop()
logger.info("事件循环已请求停止")
except RuntimeError:
pass # 没有正在运行的事件循环
except Exception as e:
logger.error(f"麦麦关闭失败: {e}", exc_info=True)
def check_eula():
"""检查EULA和隐私条款确认状态 - 环境变量版类似Minecraft"""
# 检查环境变量中的EULA确认
eula_confirmed = os.getenv("EULA_CONFIRMED", "").lower()
if eula_confirmed == "true":
logger.info("EULA已通过环境变量确认")
return
# 如果没有确认,提示用户
confirm_logger.critical("您需要同意EULA和隐私条款才能使用MoFox_Bot")
confirm_logger.critical("请阅读以下文件:")
confirm_logger.critical(" - EULA.md (用户许可协议)")
confirm_logger.critical(" - PRIVACY.md (隐私条款)")
confirm_logger.critical("然后编辑 .env 文件,将 'EULA_CONFIRMED=false' 改为 'EULA_CONFIRMED=true'")
# 等待用户确认
while True:
try:
load_dotenv(override=True) # 重新加载.env文件
eula_confirmed = os.getenv("EULA_CONFIRMED", "").lower()
if eula_confirmed == "true":
confirm_logger.info("EULA确认成功感谢您的同意")
return
confirm_logger.critical("请修改 .env 文件中的 EULA_CONFIRMED=true 后重新启动程序")
input("按Enter键检查.env文件状态...")
except KeyboardInterrupt:
confirm_logger.info("用户取消,程序退出")
sys.exit(0)
except Exception as e:
confirm_logger.error(f"检查EULA状态失败: {e}")
sys.exit(1)
class MaiBotMain(BaseMain):
"""麦麦机器人主程序类""" """麦麦机器人主程序类"""
def __init__(self): def __init__(self):
super().__init__()
self.main_system = None self.main_system = None
def setup_timezone(self): def setup_timezone(self):
"""设置时区""" """设置时区"""
if platform.system().lower() != "windows": try:
time.tzset() # type: ignore if platform.system().lower() != "windows":
time.tzset()
def check_and_confirm_eula(self): logger.info("时区设置完成")
"""检查并确认EULA和隐私条款""" else:
check_eula() logger.info("Windows系统跳过时区设置")
logger.info("检查EULA和隐私条款完成") except Exception as e:
logger.warning(f"时区设置失败: {e}")
async def initialize_database(self): async def initialize_database(self):
"""初始化数据库""" """初始化数据库连接"""
async with DatabaseManager():
logger.info("正在初始化数据库连接...") pass
try:
await initialize_sql_database(global_config.database)
logger.info(f"数据库连接初始化成功,使用 {global_config.database.database_type} 数据库")
except Exception as e:
logger.error(f"数据库连接初始化失败: {e}")
raise e
async def initialize_database_async(self): async def initialize_database_async(self):
"""异步初始化数据库表结构""" """异步初始化数据库表结构"""
logger.info("正在初始化数据库表结构...") logger.info("正在初始化数据库表结构...")
try: try:
start_time = time.time()
from src.common.database.sqlalchemy_models import initialize_database as init_db
await init_db() await init_db()
logger.info("数据库表结构初始化完成") elapsed_time = time.time() - start_time
logger.info(f"数据库表结构初始化完成,耗时: {elapsed_time:.2f}")
except Exception as e: except Exception as e:
logger.error(f"数据库表结构初始化失败: {e}") logger.error(f"数据库表结构初始化失败: {e}")
raise e raise
def create_main_system(self): def create_main_system(self):
"""创建MainSystem实例""" """创建MainSystem实例"""
from src.main import MainSystem
self.main_system = MainSystem() self.main_system = MainSystem()
return self.main_system return self.main_system
async def run(self): async def run_sync_init(self):
"""运行主程序""" """执行同步初始化步骤"""
self.setup_timezone() self.setup_timezone()
self.check_and_confirm_eula() await EULAManager.check_eula()
await self.initialize_database()
if not ConfigurationValidator.validate_configuration():
raise RuntimeError("配置验证失败,请检查配置文件")
return self.create_main_system() return self.create_main_system()
async def run_async_init(self, main_system):
"""执行异步初始化步骤"""
# 初始化数据库连接
await self.initialize_database()
# 初始化数据库表结构
await self.initialize_database_async()
# 初始化主系统
await main_system.initialize()
# 初始化知识库
from src.chat.knowledge.knowledge_lib import initialize_lpmm_knowledge
initialize_lpmm_knowledge()
# 显示彩蛋
EasterEgg.show()
if __name__ == "__main__": async def wait_for_user_input():
exit_code = 0 # 用于记录程序最终的退出状态 """等待用户输入(异步方式)"""
try: try:
# 创建MaiBotMain实例并获取MainSystem # 在非生产环境下,使用异步方式等待输入
maibot = MaiBotMain() if os.getenv('ENVIRONMENT') != 'production':
logger.info("程序执行完成,按 Ctrl+C 退出...")
# 创建事件循环 # 简单的异步等待,避免阻塞事件循环
loop = asyncio.new_event_loop() while True:
asyncio.set_event_loop(loop) await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("用户中断程序")
return True
except Exception as e:
logger.error(f"等待用户输入时发生错误: {e}")
return False
async def main_async():
"""主异步函数"""
exit_code = 0
main_task = None
async with create_event_loop_context() as loop:
try: try:
# 异步初始化数据库和表结构 # 确保环境文件存在
main_system = loop.run_until_complete(maibot.run()) ConfigManager.ensure_env_file()
loop.run_until_complete(maibot.initialize_database_async())
# 执行初始化和任务调度 # 创建主程序实例并执行初始化
loop.run_until_complete(main_system.initialize()) maibot = MaiBotMain()
initialize_lpmm_knowledge() main_system = await maibot.run_sync_init()
# Schedule tasks returns a future that runs forever. await maibot.run_async_init(main_system)
# We can run console_input_loop concurrently.
main_tasks = loop.create_task(main_system.schedule_tasks()) # 运行主任务
loop.run_until_complete(main_tasks) main_task = asyncio.create_task(main_system.schedule_tasks())
logger.info("麦麦机器人启动完成,开始运行主任务...")
# 同时运行主任务和用户输入等待
user_input_done = asyncio.create_task(wait_for_user_input())
# 使用wait等待任意一个任务完成
done, pending = await asyncio.wait(
[main_task, user_input_done],
return_when=asyncio.FIRST_COMPLETED
)
# 如果用户输入任务完成用户按了Ctrl+C取消主任务
if user_input_done in done and main_task not in done:
logger.info("用户请求退出,正在取消主任务...")
main_task.cancel()
try:
await main_task
except asyncio.CancelledError:
logger.info("主任务已取消")
except Exception as e:
logger.error(f"主任务取消时发生错误: {e}")
except KeyboardInterrupt: except KeyboardInterrupt:
logger.warning("收到中断信号,正在优雅关闭...") logger.warning("收到中断信号,正在优雅关闭...")
# The actual shutdown logic is now in the finally block. if main_task and not main_task.done():
main_task.cancel()
except Exception as e:
logger.error(f"主程序发生异常: {e}")
logger.debug(f"异常详情: {traceback.format_exc()}")
exit_code = 1
return exit_code
if __name__ == "__main__":
exit_code = 0
try:
exit_code = asyncio.run(main_async())
except KeyboardInterrupt:
logger.info("程序被用户中断")
exit_code = 130
except Exception as e: except Exception as e:
logger.error(f"程序发生异常: {e!s} {traceback.format_exc()!s}") logger.error(f"程序启动失败: {e}")
exit_code = 1 # 标记发生错误 exit_code = 1
finally: finally:
# 确保 loop 在任何情况下都尝试关闭(如果存在且未关闭) # 确保日志系统正确关闭
if "loop" in locals() and loop and not loop.is_closed(): try:
logger.info("开始执行最终关闭流程...") shutdown_logging()
try: except Exception as e:
# 传递main_system实例 print(f"关闭日志系统时出错: {e}")
loop.run_until_complete(graceful_shutdown(maibot.main_system))
except Exception as ge: sys.exit(exit_code)
logger.error(f"优雅关闭时发生错误: {ge}")
loop.close()
logger.info("事件循环已关闭")
# 在程序退出前暂停,让你有机会看到输出
# input("按 Enter 键退出...") # <--- 添加这行
sys.exit(exit_code) # <--- 使用记录的退出码