Files
Mofox-Core/bot.py
ikun两年半 3b295938e0 更新 bot.py
修改彩蛋权重喵~主打一个众生平等喵
2025-08-19 11:59:15 +08:00

321 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import hashlib
import os
from dotenv import load_dotenv
if os.path.exists(".env"):
load_dotenv(".env", override=True)
print("成功加载环境变量配置")
else:
print("未找到.env文件请确保程序所需的环境变量被正确设置")
raise FileNotFoundError(".env 文件不存在,请创建并配置所需的环境变量")
import sys
import time
import platform
import traceback
from pathlib import Path
from rich.traceback import install
# maim_message imports for console input
# 最早期初始化日志系统,确保所有后续模块都使用正确的日志格式
from src.common.logger import initialize_logging, get_logger, shutdown_logging
initialize_logging()
from src.main import MainSystem #noqa
from src.manager.async_task_manager import async_task_manager #noqa
from colorama import init, Fore
import random
from typing import List, Sequence, Optional
logger = get_logger("main")
install(extra_lines=3)
# 设置工作目录为脚本所在目录
script_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(script_dir)
logger.info(f"已设置工作目录为: {script_dir}")
confirm_logger = get_logger("confirm")
# 获取没有加载env时的环境变量
env_mask = {key: os.getenv(key) for key in os.environ}
uvicorn_server = None
driver = None
app = None
loop = None
async def request_shutdown() -> bool:
"""请求关闭程序"""
try:
if loop and not loop.is_closed():
try:
loop.run_until_complete(graceful_shutdown())
except Exception as ge: # 捕捉优雅关闭时可能发生的错误
logger.error(f"优雅关闭时发生错误: {ge}")
return False
return True
except Exception as e:
logger.error(f"请求关闭程序时发生错误: {e}")
return False
def weighted_choice(data: Sequence[str],
weights: Optional[List[float]] = None) -> str:
"""
从 data 中按权重随机返回一条。
若 weights 为 None则所有元素权重默认为 1。
"""
if weights is None:
weights = [1.0] * len(data)
if len(data) != len(weights):
raise ValueError("data 和 weights 长度必须相等")
# 计算累计权重区间
total = 0.0
acc = []
for w in weights:
total += w
acc.append(total)
if total <= 0:
raise ValueError("总权重必须大于 0")
# 随机落点
r = random.random() * total
# 二分查找落点所在的区间
left, right = 0, len(acc) - 1
while left < right:
mid = (left + right) // 2
if r < acc[mid]:
right = mid
else:
left = mid + 1
return data[left]
def easter_egg():
# 彩蛋
init()
items = ["多年以后面对AI行刑队张三将会回想起他2023年在会议上讨论人工智能的那个下午",
"你知道吗诺狐的耳朵很软很好rua",
"喵喵~你的麦麦被猫娘入侵了喵~"]
w = [1, 1, 1]
text = weighted_choice(items, w)
rainbow_colors = [Fore.RED, Fore.YELLOW, Fore.GREEN, Fore.CYAN, Fore.BLUE, Fore.MAGENTA]
rainbow_text = ""
for i, char in enumerate(text):
rainbow_text += rainbow_colors[i % len(rainbow_colors)] + char
print(rainbow_text)
async def graceful_shutdown():
try:
logger.info("正在优雅关闭麦麦...")
# 停止所有异步任务
await async_task_manager.stop_and_wait_all_tasks()
# 获取所有剩余任务,排除当前任务
remaining_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
if remaining_tasks:
logger.info(f"正在取消 {len(remaining_tasks)} 个剩余任务...")
# 取消所有剩余任务
for task in remaining_tasks:
if not task.done():
task.cancel()
# 等待所有任务完成,设置超时
try:
await asyncio.wait_for(asyncio.gather(*remaining_tasks, return_exceptions=True), timeout=15.0)
logger.info("所有剩余任务已成功取消")
except asyncio.TimeoutError:
logger.warning("等待任务取消超时,强制继续关闭")
except Exception as e:
logger.error(f"等待任务取消时发生异常: {e}")
logger.info("麦麦优雅关闭完成")
# 关闭日志系统,释放文件句柄
shutdown_logging()
except Exception as e:
logger.error(f"麦麦关闭失败: {e}", exc_info=True)
def _calculate_file_hash(file_path: Path, file_type: str) -> str:
"""计算文件的MD5哈希值"""
if not file_path.exists():
logger.error(f"{file_type} 文件不存在")
raise FileNotFoundError(f"{file_type} 文件不存在")
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
return hashlib.md5(content.encode("utf-8")).hexdigest()
def _check_agreement_status(file_hash: str, confirm_file: Path, env_var: str) -> tuple[bool, bool]:
"""检查协议确认状态
Returns:
tuple[bool, bool]: (已确认, 未更新)
"""
# 检查环境变量确认
if file_hash == os.getenv(env_var):
return True, False
# 检查确认文件
if confirm_file.exists():
with open(confirm_file, "r", encoding="utf-8") as f:
confirmed_content = f.read()
if file_hash == confirmed_content:
return True, False
return False, True
def _prompt_user_confirmation(eula_hash: str, privacy_hash: str) -> None:
"""提示用户确认协议"""
confirm_logger.critical("EULA或隐私条款内容已更新请在阅读后重新确认继续运行视为同意更新后的以上两款协议")
confirm_logger.critical(
f'输入"同意""confirmed"或设置环境变量"EULA_AGREE={eula_hash}""PRIVACY_AGREE={privacy_hash}"继续运行'
)
while True:
user_input = input().strip().lower()
if user_input in ["同意", "confirmed"]:
return
confirm_logger.critical('请输入"同意""confirmed"以继续运行')
def _save_confirmations(eula_updated: bool, privacy_updated: bool, eula_hash: str, privacy_hash: str) -> None:
"""保存用户确认结果"""
if eula_updated:
logger.info(f"更新EULA确认文件{eula_hash}")
Path("eula.confirmed").write_text(eula_hash, encoding="utf-8")
if privacy_updated:
logger.info(f"更新隐私条款确认文件{privacy_hash}")
Path("privacy.confirmed").write_text(privacy_hash, encoding="utf-8")
def check_eula():
"""检查EULA和隐私条款确认状态"""
# 计算文件哈希值
eula_hash = _calculate_file_hash(Path("EULA.md"), "EULA.md")
privacy_hash = _calculate_file_hash(Path("PRIVACY.md"), "PRIVACY.md")
# 检查确认状态
eula_confirmed, eula_updated = _check_agreement_status(eula_hash, Path("eula.confirmed"), "EULA_AGREE")
privacy_confirmed, privacy_updated = _check_agreement_status(
privacy_hash, Path("privacy.confirmed"), "PRIVACY_AGREE"
)
# 早期返回:如果都已确认且未更新
if eula_confirmed and privacy_confirmed:
return
# 如果有更新,需要重新确认
if eula_updated or privacy_updated:
_prompt_user_confirmation(eula_hash, privacy_hash)
_save_confirmations(eula_updated, privacy_updated, eula_hash, privacy_hash)
def raw_main():
# 利用 TZ 环境变量设定程序工作的时区
if platform.system().lower() != "windows":
time.tzset() # type: ignore
check_eula()
logger.info("检查EULA和隐私条款完成")
easter_egg()
# 在此处初始化数据库
from src.config.config import global_config
from src.common.database.database import initialize_sql_database
from src.common.database.sqlalchemy_models import initialize_database as init_db
from src.common.database.db_migration import check_and_migrate_database
logger.info("正在初始化数据库连接...")
try:
initialize_sql_database(global_config.database)
logger.info(f"数据库连接初始化成功,使用 {global_config.database.database_type} 数据库")
except Exception as e:
logger.error(f"数据库连接初始化失败: {e}")
raise e
logger.info("正在初始化数据库表结构...")
try:
init_db()
logger.info("数据库表结构初始化完成")
except Exception as e:
logger.error(f"数据库表结构初始化失败: {e}")
raise e
# 执行数据库自动迁移检查
try:
check_and_migrate_database()
except Exception as e:
logger.error(f"数据库自动迁移失败: {e}")
raise e
# 返回MainSystem实例
return MainSystem()
if __name__ == "__main__":
exit_code = 0 # 用于记录程序最终的退出状态
try:
# 获取MainSystem实例
main_system = raw_main()
# 创建事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# 执行初始化和任务调度
loop.run_until_complete(main_system.initialize())
# Schedule tasks returns a future that runs forever.
# We can run console_input_loop concurrently.
main_tasks = loop.create_task(main_system.schedule_tasks())
loop.run_until_complete(main_tasks)
except KeyboardInterrupt:
# loop.run_until_complete(get_global_api().stop())
logger.warning("收到中断信号,正在优雅关闭...")
if loop and not loop.is_closed():
try:
loop.run_until_complete(graceful_shutdown())
except Exception as ge: # 捕捉优雅关闭时可能发生的错误
logger.error(f"优雅关闭时发生错误: {ge}")
# 新增:检测外部请求关闭
except Exception as e:
logger.error(f"主程序发生异常: {str(e)} {str(traceback.format_exc())}")
exit_code = 1 # 标记发生错误
finally:
# 确保 loop 在任何情况下都尝试关闭(如果存在且未关闭)
if "loop" in locals() and loop and not loop.is_closed():
loop.close()
logger.info("事件循环已关闭")
# 关闭日志系统,释放文件句柄
try:
shutdown_logging()
except Exception as e:
print(f"关闭日志系统时出错: {e}")
# 在程序退出前暂停,让你有机会看到输出
# input("按 Enter 键退出...") # <--- 添加这行
sys.exit(exit_code) # <--- 使用记录的退出码