This commit is contained in:
subiz
2025-10-05 20:48:54 +08:00
20 changed files with 780 additions and 535 deletions

647
bot.py
View File

@@ -5,311 +5,498 @@ import platform
import sys import sys
import time import time
import traceback import traceback
from contextlib import asynccontextmanager
from pathlib import Path from pathlib import Path
# 初始化基础工具
from colorama import Fore, init from colorama import Fore, init
from dotenv import load_dotenv # 处理.env文件 from dotenv import load_dotenv
from rich.traceback import install from rich.traceback import install
# maim_message imports for console input # 初始化日志系统
# 最早期初始化日志系统,确保所有后续模块都使用正确的日志格式
from src.common.logger import get_logger, initialize_logging, shutdown_logging from src.common.logger import get_logger, initialize_logging, shutdown_logging
# UI日志适配器 # 初始化日志和错误显示
initialize_logging() initialize_logging()
from src.main import MainSystem # noqa
from src import BaseMain
from src.manager.async_task_manager import async_task_manager
from src.chat.knowledge.knowledge_lib import initialize_lpmm_knowledge
from src.config.config import global_config
from src.common.database.database import initialize_sql_database
from src.common.database.sqlalchemy_models import initialize_database as init_db
logger = get_logger("main") logger = get_logger("main")
install(extra_lines=3) install(extra_lines=3)
# 常量定义
SUPPORTED_DATABASES = ["sqlite", "mysql", "postgresql"]
SHUTDOWN_TIMEOUT = 10.0
EULA_CHECK_INTERVAL = 2
MAX_EULA_CHECK_ATTEMPTS = 30
MAX_ENV_FILE_SIZE = 1024 * 1024 # 1MB限制
# 设置工作目录为脚本所在目录 # 设置工作目录为脚本所在目录
script_dir = os.path.dirname(os.path.abspath(__file__)) script_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(script_dir) os.chdir(script_dir)
logger.info(f"已设置工作目录为: {script_dir}") logger.info("工作目录已设置")
class ConfigManager:
"""配置管理器"""
# 检查并创建.env文件 @staticmethod
def ensure_env_file(): def ensure_env_file():
"""确保.env文件存在如果不存在则从模板创建""" """确保.env文件存在如果不存在则从模板创建"""
env_file = Path(".env") env_file = Path(".env")
template_env = Path("template/template.env") template_env = Path("template/template.env")
if not env_file.exists(): if not env_file.exists():
if template_env.exists(): if template_env.exists():
logger.info("未找到.env文件正在从模板创建...") logger.info("未找到.env文件正在从模板创建...")
import shutil try:
env_file.write_text(template_env.read_text(encoding="utf-8"), encoding="utf-8")
logger.info("已从template/template.env创建.env文件")
logger.warning("请编辑.env文件将EULA_CONFIRMED设置为true并配置其他必要参数")
except Exception as e:
logger.error(f"创建.env文件失败: {e}")
sys.exit(1)
else:
logger.error("未找到.env文件和template.env模板文件")
sys.exit(1)
shutil.copy(template_env, env_file) @staticmethod
logger.info("已从template/template.env创建.env文件") def verify_env_file_integrity():
logger.warning("请编辑.env文件将EULA_CONFIRMED设置为true并配置其他必要参数") """验证.env文件完整性"""
else: env_file = Path(".env")
logger.error("未找到.env文件和template.env模板文件") if not env_file.exists():
return False
# 检查文件大小
file_size = env_file.stat().st_size
if file_size == 0 or file_size > MAX_ENV_FILE_SIZE:
logger.error(f".env文件大小异常: {file_size}字节")
return False
# 检查文件内容是否包含必要字段
try:
content = env_file.read_text(encoding="utf-8")
if "EULA_CONFIRMED" not in content:
logger.error(".env文件缺少EULA_CONFIRMED字段")
return False
except Exception as e:
logger.error(f"读取.env文件失败: {e}")
return False
return True
@staticmethod
def safe_load_dotenv():
"""安全加载环境变量"""
try:
if not ConfigManager.verify_env_file_integrity():
logger.error(".env文件完整性验证失败")
return False
load_dotenv()
logger.info("环境变量加载成功")
return True
except Exception as e:
logger.error(f"加载环境变量失败: {e}")
return False
class EULAManager:
"""EULA管理类"""
@staticmethod
async def check_eula():
"""检查EULA和隐私条款确认状态"""
confirm_logger = get_logger("confirm")
if not ConfigManager.safe_load_dotenv():
confirm_logger.error("无法加载环境变量EULA检查失败")
sys.exit(1) sys.exit(1)
eula_confirmed = os.getenv("EULA_CONFIRMED", "").lower()
if eula_confirmed == "true":
logger.info("EULA已通过环境变量确认")
return
# 确保环境文件存在 # 提示用户确认EULA
ensure_env_file() confirm_logger.critical("您需要同意EULA和隐私条款才能使用MoFox_Bot")
confirm_logger.critical("请阅读以下文件:")
confirm_logger.critical(" - EULA.md (用户许可协议)")
confirm_logger.critical(" - PRIVACY.md (隐私条款)")
confirm_logger.critical("然后编辑 .env 文件,将 'EULA_CONFIRMED=false' 改为 'EULA_CONFIRMED=true'")
# 加载环境变量 attempts = 0
load_dotenv() while attempts < MAX_EULA_CHECK_ATTEMPTS:
try:
await asyncio.sleep(EULA_CHECK_INTERVAL)
attempts += 1
confirm_logger = get_logger("confirm") # 重新加载环境变量
# 获取没有加载env时的环境变量 ConfigManager.safe_load_dotenv()
eula_confirmed = os.getenv("EULA_CONFIRMED", "").lower()
if eula_confirmed == "true":
confirm_logger.info("EULA确认成功感谢您的同意")
return
uvicorn_server = None if attempts % 5 == 0:
driver = None confirm_logger.critical(f"请修改 .env 文件中的 EULA_CONFIRMED=true (尝试 {attempts}/{MAX_EULA_CHECK_ATTEMPTS})")
app = None
loop = None
main_system = None
except KeyboardInterrupt:
confirm_logger.info("用户取消,程序退出")
sys.exit(0)
except Exception as e:
confirm_logger.error(f"检查EULA状态失败: {e}")
if attempts >= MAX_EULA_CHECK_ATTEMPTS:
confirm_logger.error("达到最大检查次数,程序退出")
sys.exit(1)
async def request_shutdown() -> bool: confirm_logger.error("EULA确认超时程序退出")
"""请求关闭程序""" sys.exit(1)
class TaskManager:
"""任务管理器"""
@staticmethod
async def cancel_pending_tasks(loop, timeout=SHUTDOWN_TIMEOUT):
"""取消所有待处理的任务"""
remaining_tasks = [
t for t in asyncio.all_tasks(loop)
if t is not asyncio.current_task(loop) and not t.done()
]
if not remaining_tasks:
logger.info("没有待取消的任务")
return True
logger.info(f"正在取消 {len(remaining_tasks)} 个剩余任务...")
# 取消任务
for task in remaining_tasks:
task.cancel()
# 等待任务完成
try:
results = await asyncio.wait_for(
asyncio.gather(*remaining_tasks, return_exceptions=True),
timeout=timeout
)
# 检查任务结果
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.warning(f"任务 {i} 取消时发生异常: {result}")
logger.info("所有剩余任务已成功取消")
return True
except asyncio.TimeoutError:
logger.warning("等待任务取消超时,强制继续关闭")
return False
except Exception as e:
logger.error(f"等待任务取消时发生异常: {e}")
return False
@staticmethod
async def stop_async_tasks():
"""停止所有异步任务"""
try:
from src.manager.async_task_manager import async_task_manager
await async_task_manager.stop_and_wait_all_tasks()
return True
except ImportError:
logger.warning("异步任务管理器不可用,跳过任务停止")
return False
except Exception as e:
logger.error(f"停止异步任务失败: {e}")
return False
class ShutdownManager:
"""关闭管理器"""
@staticmethod
async def graceful_shutdown(loop=None):
"""优雅关闭程序"""
try:
logger.info("正在优雅关闭麦麦...")
start_time = time.time()
# 停止异步任务
tasks_stopped = await TaskManager.stop_async_tasks()
# 取消待处理任务
tasks_cancelled = True
if loop and not loop.is_closed():
tasks_cancelled = await TaskManager.cancel_pending_tasks(loop)
shutdown_time = time.time() - start_time
success = tasks_stopped and tasks_cancelled
if success:
logger.info(f"麦麦优雅关闭完成,耗时: {shutdown_time:.2f}")
else:
logger.warning(f"麦麦关闭完成,但部分操作未成功,耗时: {shutdown_time:.2f}")
return success
except Exception as e:
logger.error(f"麦麦关闭失败: {e}", exc_info=True)
return False
@asynccontextmanager
async def create_event_loop_context():
"""创建事件循环的上下文管理器"""
loop = None
try: try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
except Exception as e:
logger.error(f"创建事件循环失败: {e}")
raise
finally:
if loop and not loop.is_closed(): if loop and not loop.is_closed():
try: try:
loop.run_until_complete(graceful_shutdown(maibot.main_system)) await ShutdownManager.graceful_shutdown(loop)
except Exception as ge: # 捕捉优雅关闭时可能发生的错误 except Exception as e:
logger.error(f"优雅关闭时发生错误: {ge}") logger.error(f"关闭事件循环时出错: {e}")
return False finally:
return True try:
except Exception as e: loop.close()
logger.error(f"请求关闭程序时发生错误: {e}") logger.info("事件循环已关闭")
except Exception as e:
logger.error(f"关闭事件循环失败: {e}")
class DatabaseManager:
"""数据库连接管理器"""
def __init__(self):
self._connection = None
async def __aenter__(self):
"""异步上下文管理器入口"""
try:
from src.common.database.database import initialize_sql_database
from src.config.config import global_config
logger.info("正在初始化数据库连接...")
start_time = time.time()
# 使用线程执行器运行潜在的阻塞操作
await asyncio.to_thread(initialize_sql_database, global_config.database)
elapsed_time = time.time() - start_time
logger.info(f"数据库连接初始化成功,使用 {global_config.database.database_type} 数据库,耗时: {elapsed_time:.2f}")
return self
except Exception as e:
logger.error(f"数据库连接初始化失败: {e}")
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
if exc_type:
logger.error(f"数据库操作发生异常: {exc_val}")
return False return False
class ConfigurationValidator:
"""配置验证器"""
def easter_egg(): @staticmethod
# 彩蛋 def validate_configuration():
init() """验证关键配置"""
text = "多年以后面对AI行刑队张三将会回想起他2023年在会议上讨论人工智能的那个下午"
rainbow_colors = [Fore.RED, Fore.YELLOW, Fore.GREEN, Fore.CYAN, Fore.BLUE, Fore.MAGENTA]
rainbow_text = ""
for i, char in enumerate(text):
rainbow_text += rainbow_colors[i % len(rainbow_colors)] + char
logger.info(rainbow_text)
async def graceful_shutdown(main_system_instance):
"""优雅地关闭所有系统组件"""
try:
logger.info("正在优雅关闭麦麦...")
# 停止MainSystem中的组件它会处理服务器等
if main_system_instance and hasattr(main_system_instance, "shutdown"):
logger.info("正在关闭MainSystem...")
await main_system_instance.shutdown()
# 停止聊天管理器
try: try:
from src.chat.message_receive.chat_stream import get_chat_manager from src.config.config import global_config
chat_manager = get_chat_manager()
if hasattr(chat_manager, "_stop_auto_save"): # 检查必要的配置节
logger.info("正在停止聊天管理器...") required_sections = ["database", "bot"]
chat_manager._stop_auto_save() for section in required_sections:
if not hasattr(global_config, section):
logger.error(f"配置中缺少{section}配置节")
return False
# 验证数据库配置
db_config = global_config.database
if not hasattr(db_config, "database_type") or not db_config.database_type:
logger.error("数据库配置缺少database_type字段")
return False
if db_config.database_type not in SUPPORTED_DATABASES:
logger.error(f"不支持的数据库类型: {db_config.database_type}")
logger.info(f"支持的数据库类型: {', '.join(SUPPORTED_DATABASES)}")
return False
logger.info("配置验证通过")
return True
except ImportError:
logger.error("无法导入全局配置模块")
return False
except Exception as e: except Exception as e:
logger.warning(f"停止聊天管理器时出错: {e}") logger.error(f"配置验证失败: {e}")
return False
# 停止情绪管理器 class EasterEgg:
try: """彩蛋功能"""
from src.mood.mood_manager import mood_manager
if hasattr(mood_manager, "stop"):
logger.info("正在停止情绪管理器...")
await mood_manager.stop()
except Exception as e:
logger.warning(f"停止情绪管理器时出错: {e}")
# 停止记忆系统 _initialized = False
try:
from src.chat.memory_system.memory_manager import memory_manager
if hasattr(memory_manager, "shutdown"):
logger.info("正在停止记忆系统...")
await memory_manager.shutdown()
except Exception as e:
logger.warning(f"停止记忆系统时出错: {e}")
@classmethod
def show(cls):
"""显示彩色文本"""
if not cls._initialized:
init()
cls._initialized = True
# 停止所有异步任务 text = "多年以后面对AI行刑队张三将会回想起他2023年在会议上讨论人工智能的那个下午"
try: rainbow_colors = [Fore.RED, Fore.YELLOW, Fore.GREEN, Fore.CYAN, Fore.BLUE, Fore.MAGENTA]
await async_task_manager.stop_and_wait_all_tasks() rainbow_text = ""
except Exception as e: for i, char in enumerate(text):
logger.warning(f"停止异步任务管理器时出错: {e}") rainbow_text += rainbow_colors[i % len(rainbow_colors)] + char
logger.info(rainbow_text)
# 获取所有剩余任务,排除当前任务 class MaiBotMain:
remaining_tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
if remaining_tasks:
logger.info(f"正在取消 {len(remaining_tasks)} 个剩余任务...")
# 取消所有剩余任务
for task in remaining_tasks:
if not task.done():
task.cancel()
# 等待所有任务完成,设置超时
try:
await asyncio.wait_for(asyncio.gather(*remaining_tasks, return_exceptions=True), timeout=15.0)
logger.info("所有剩余任务已成功取消")
except asyncio.TimeoutError:
logger.warning("等待任务取消超时,强制继续关闭")
except Exception as e:
logger.error(f"等待任务取消时发生异常: {e}")
logger.info("麦麦优雅关闭完成")
# 关闭日志系统,释放文件句柄
shutdown_logging()
# 尝试停止事件循环
try:
loop = asyncio.get_running_loop()
if loop.is_running():
loop.stop()
logger.info("事件循环已请求停止")
except RuntimeError:
pass # 没有正在运行的事件循环
except Exception as e:
logger.error(f"麦麦关闭失败: {e}", exc_info=True)
def check_eula():
"""检查EULA和隐私条款确认状态 - 环境变量版类似Minecraft"""
# 检查环境变量中的EULA确认
eula_confirmed = os.getenv("EULA_CONFIRMED", "").lower()
if eula_confirmed == "true":
logger.info("EULA已通过环境变量确认")
return
# 如果没有确认,提示用户
confirm_logger.critical("您需要同意EULA和隐私条款才能使用MoFox_Bot")
confirm_logger.critical("请阅读以下文件:")
confirm_logger.critical(" - EULA.md (用户许可协议)")
confirm_logger.critical(" - PRIVACY.md (隐私条款)")
confirm_logger.critical("然后编辑 .env 文件,将 'EULA_CONFIRMED=false' 改为 'EULA_CONFIRMED=true'")
# 等待用户确认
while True:
try:
load_dotenv(override=True) # 重新加载.env文件
eula_confirmed = os.getenv("EULA_CONFIRMED", "").lower()
if eula_confirmed == "true":
confirm_logger.info("EULA确认成功感谢您的同意")
return
confirm_logger.critical("请修改 .env 文件中的 EULA_CONFIRMED=true 后重新启动程序")
input("按Enter键检查.env文件状态...")
except KeyboardInterrupt:
confirm_logger.info("用户取消,程序退出")
sys.exit(0)
except Exception as e:
confirm_logger.error(f"检查EULA状态失败: {e}")
sys.exit(1)
class MaiBotMain(BaseMain):
"""麦麦机器人主程序类""" """麦麦机器人主程序类"""
def __init__(self): def __init__(self):
super().__init__()
self.main_system = None self.main_system = None
def setup_timezone(self): def setup_timezone(self):
"""设置时区""" """设置时区"""
if platform.system().lower() != "windows": try:
time.tzset() # type: ignore if platform.system().lower() != "windows":
time.tzset()
def check_and_confirm_eula(self): logger.info("时区设置完成")
"""检查并确认EULA和隐私条款""" else:
check_eula() logger.info("Windows系统跳过时区设置")
logger.info("检查EULA和隐私条款完成") except Exception as e:
logger.warning(f"时区设置失败: {e}")
async def initialize_database(self): async def initialize_database(self):
"""初始化数据库""" """初始化数据库连接"""
async with DatabaseManager():
logger.info("正在初始化数据库连接...") pass
try:
await initialize_sql_database(global_config.database)
logger.info(f"数据库连接初始化成功,使用 {global_config.database.database_type} 数据库")
except Exception as e:
logger.error(f"数据库连接初始化失败: {e}")
raise e
async def initialize_database_async(self): async def initialize_database_async(self):
"""异步初始化数据库表结构""" """异步初始化数据库表结构"""
logger.info("正在初始化数据库表结构...") logger.info("正在初始化数据库表结构...")
try: try:
start_time = time.time()
from src.common.database.sqlalchemy_models import initialize_database as init_db
await init_db() await init_db()
logger.info("数据库表结构初始化完成") elapsed_time = time.time() - start_time
logger.info(f"数据库表结构初始化完成,耗时: {elapsed_time:.2f}")
except Exception as e: except Exception as e:
logger.error(f"数据库表结构初始化失败: {e}") logger.error(f"数据库表结构初始化失败: {e}")
raise e raise
def create_main_system(self): def create_main_system(self):
"""创建MainSystem实例""" """创建MainSystem实例"""
from src.main import MainSystem
self.main_system = MainSystem() self.main_system = MainSystem()
return self.main_system return self.main_system
async def run(self): async def run_sync_init(self):
"""运行主程序""" """执行同步初始化步骤"""
self.setup_timezone() self.setup_timezone()
self.check_and_confirm_eula() await EULAManager.check_eula()
await self.initialize_database()
if not ConfigurationValidator.validate_configuration():
raise RuntimeError("配置验证失败,请检查配置文件")
return self.create_main_system() return self.create_main_system()
async def run_async_init(self, main_system):
"""执行异步初始化步骤"""
# 初始化数据库连接
await self.initialize_database()
if __name__ == "__main__": # 初始化数据库表结构
exit_code = 0 # 用于记录程序最终的退出状态 await self.initialize_database_async()
# 初始化主系统
await main_system.initialize()
# 初始化知识库
from src.chat.knowledge.knowledge_lib import initialize_lpmm_knowledge
initialize_lpmm_knowledge()
# 显示彩蛋
EasterEgg.show()
async def wait_for_user_input():
"""等待用户输入(异步方式)"""
try: try:
# 创建MaiBotMain实例并获取MainSystem # 在非生产环境下,使用异步方式等待输入
maibot = MaiBotMain() if os.getenv("ENVIRONMENT") != "production":
logger.info("程序执行完成,按 Ctrl+C 退出...")
# 简单的异步等待,避免阻塞事件循环
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("用户中断程序")
return True
except Exception as e:
logger.error(f"等待用户输入时发生错误: {e}")
return False
# 创建事件循环 async def main_async():
loop = asyncio.new_event_loop() """主异步函数"""
asyncio.set_event_loop(loop) exit_code = 0
main_task = None
async with create_event_loop_context() as loop:
try: try:
# 异步初始化数据库和表结构 # 确保环境文件存在
main_system = loop.run_until_complete(maibot.run()) ConfigManager.ensure_env_file()
loop.run_until_complete(maibot.initialize_database_async())
# 执行初始化和任务调度 # 创建主程序实例并执行初始化
loop.run_until_complete(main_system.initialize()) maibot = MaiBotMain()
initialize_lpmm_knowledge() main_system = await maibot.run_sync_init()
# Schedule tasks returns a future that runs forever. await maibot.run_async_init(main_system)
# We can run console_input_loop concurrently.
main_tasks = loop.create_task(main_system.schedule_tasks()) # 运行主任务
loop.run_until_complete(main_tasks) main_task = asyncio.create_task(main_system.schedule_tasks())
logger.info("麦麦机器人启动完成,开始运行主任务...")
# 同时运行主任务和用户输入等待
user_input_done = asyncio.create_task(wait_for_user_input())
# 使用wait等待任意一个任务完成
done, pending = await asyncio.wait(
[main_task, user_input_done],
return_when=asyncio.FIRST_COMPLETED
)
# 如果用户输入任务完成用户按了Ctrl+C取消主任务
if user_input_done in done and main_task not in done:
logger.info("用户请求退出,正在取消主任务...")
main_task.cancel()
try:
await main_task
except asyncio.CancelledError:
logger.info("主任务已取消")
except Exception as e:
logger.error(f"主任务取消时发生错误: {e}")
except KeyboardInterrupt: except KeyboardInterrupt:
logger.warning("收到中断信号,正在优雅关闭...") logger.warning("收到中断信号,正在优雅关闭...")
# The actual shutdown logic is now in the finally block. if main_task and not main_task.done():
main_task.cancel()
except Exception as e:
logger.error(f"主程序发生异常: {e}")
logger.debug(f"异常详情: {traceback.format_exc()}")
exit_code = 1
return exit_code
if __name__ == "__main__":
exit_code = 0
try:
exit_code = asyncio.run(main_async())
except KeyboardInterrupt:
logger.info("程序被用户中断")
exit_code = 130
except Exception as e: except Exception as e:
logger.error(f"程序发生异常: {e!s} {traceback.format_exc()!s}") logger.error(f"程序启动失败: {e}")
exit_code = 1 # 标记发生错误 exit_code = 1
finally: finally:
# 确保 loop 在任何情况下都尝试关闭(如果存在且未关闭) # 确保日志系统正确关闭
if "loop" in locals() and loop and not loop.is_closed(): try:
logger.info("开始执行最终关闭流程...") shutdown_logging()
try: except Exception as e:
# 传递main_system实例 print(f"关闭日志系统时出错: {e}")
loop.run_until_complete(graceful_shutdown(maibot.main_system))
except Exception as ge:
logger.error(f"优雅关闭时发生错误: {ge}")
loop.close()
logger.info("事件循环已关闭")
# 在程序退出前暂停,让你有机会看到输出 sys.exit(exit_code)
# input("按 Enter 键退出...") # <--- 添加这行
sys.exit(exit_code) # <--- 使用记录的退出码

View File

@@ -21,6 +21,7 @@ from .memory_chunk import MemoryChunk as Memory
# 遗忘引擎 # 遗忘引擎
from .memory_forgetting_engine import ForgettingConfig, MemoryForgettingEngine, get_memory_forgetting_engine from .memory_forgetting_engine import ForgettingConfig, MemoryForgettingEngine, get_memory_forgetting_engine
from .memory_formatter import format_memories_bracket_style
# 记忆管理器 # 记忆管理器
from .memory_manager import MemoryManager, MemoryResult, memory_manager from .memory_manager import MemoryManager, MemoryResult, memory_manager
@@ -30,7 +31,6 @@ from .memory_system import MemorySystem, MemorySystemConfig, get_memory_system,
# Vector DB存储系统 # Vector DB存储系统
from .vector_memory_storage_v2 import VectorMemoryStorage, VectorStorageConfig, get_vector_memory_storage from .vector_memory_storage_v2 import VectorMemoryStorage, VectorStorageConfig, get_vector_memory_storage
from .memory_formatter import format_memories_bracket_style
__all__ = [ __all__ = [
# 核心数据结构 # 核心数据结构

View File

@@ -17,8 +17,9 @@
""" """
from __future__ import annotations from __future__ import annotations
from typing import Any, Iterable
import time import time
from collections.abc import Iterable
from typing import Any
def _format_timestamp(ts: Any) -> str: def _format_timestamp(ts: Any) -> str:

View File

@@ -2,9 +2,8 @@
记忆元数据索引。 记忆元数据索引。
""" """
from dataclasses import dataclass, asdict from dataclasses import asdict, dataclass
from typing import Any from typing import Any
from time import time
from src.common.logger import get_logger from src.common.logger import get_logger
@@ -12,6 +11,7 @@ logger = get_logger(__name__)
from inkfox.memory import PyMetadataIndex as _RustIndex # type: ignore from inkfox.memory import PyMetadataIndex as _RustIndex # type: ignore
@dataclass @dataclass
class MemoryMetadataIndexEntry: class MemoryMetadataIndexEntry:
memory_id: str memory_id: str
@@ -51,7 +51,7 @@ class MemoryMetadataIndex:
if payload: if payload:
try: try:
self._rust.batch_add(payload) self._rust.batch_add(payload)
except Exception as ex: # noqa: BLE001 except Exception as ex:
logger.error(f"Rust 元数据批量添加失败: {ex}") logger.error(f"Rust 元数据批量添加失败: {ex}")
def add_or_update(self, entry: MemoryMetadataIndexEntry): def add_or_update(self, entry: MemoryMetadataIndexEntry):
@@ -88,7 +88,7 @@ class MemoryMetadataIndex:
if flexible_mode: if flexible_mode:
return list(self._rust.search_flexible(params)) return list(self._rust.search_flexible(params))
return list(self._rust.search_strict(params)) return list(self._rust.search_strict(params))
except Exception as ex: # noqa: BLE001 except Exception as ex:
logger.error(f"Rust 搜索失败返回空: {ex}") logger.error(f"Rust 搜索失败返回空: {ex}")
return [] return []
@@ -105,18 +105,18 @@ class MemoryMetadataIndex:
"keywords_count": raw.get("keywords_indexed", 0), "keywords_count": raw.get("keywords_indexed", 0),
"tags_count": raw.get("tags_indexed", 0), "tags_count": raw.get("tags_indexed", 0),
} }
except Exception as ex: # noqa: BLE001 except Exception as ex:
logger.warning(f"读取 Rust stats 失败: {ex}") logger.warning(f"读取 Rust stats 失败: {ex}")
return {"total_memories": 0} return {"total_memories": 0}
def save(self): # 仅调用 rust save def save(self): # 仅调用 rust save
try: try:
self._rust.save() self._rust.save()
except Exception as ex: # noqa: BLE001 except Exception as ex:
logger.warning(f"Rust save 失败: {ex}") logger.warning(f"Rust save 失败: {ex}")
__all__ = [ __all__ = [
"MemoryMetadataIndexEntry",
"MemoryMetadataIndex", "MemoryMetadataIndex",
"MemoryMetadataIndexEntry",
] ]

View File

@@ -263,7 +263,7 @@ class MessageRecv(Message):
logger.warning("视频消息中没有base64数据") logger.warning("视频消息中没有base64数据")
return "[收到视频消息,但数据异常]" return "[收到视频消息,但数据异常]"
except Exception as e: except Exception as e:
logger.error(f"视频处理失败: {str(e)}") logger.error(f"视频处理失败: {e!s}")
import traceback import traceback
logger.error(f"错误详情: {traceback.format_exc()}") logger.error(f"错误详情: {traceback.format_exc()}")
@@ -277,7 +277,7 @@ class MessageRecv(Message):
logger.info("未启用视频识别") logger.info("未启用视频识别")
return "[视频]" return "[视频]"
except Exception as e: except Exception as e:
logger.error(f"处理消息段失败: {str(e)}, 类型: {segment.type}, 数据: {segment.data}") logger.error(f"处理消息段失败: {e!s}, 类型: {segment.type}, 数据: {segment.data}")
return f"[处理失败的{segment.type}消息]" return f"[处理失败的{segment.type}消息]"

View File

@@ -1,5 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""纯 inkfox 视频关键帧分析工具 """纯 inkfox 视频关键帧分析工具
仅依赖 `inkfox.video` 提供的 Rust 扩展能力: 仅依赖 `inkfox.video` 提供的 Rust 扩展能力:
@@ -14,25 +13,25 @@
from __future__ import annotations from __future__ import annotations
import os
import io
import asyncio import asyncio
import base64 import base64
import tempfile
from pathlib import Path
from typing import List, Tuple, Optional, Dict, Any
import hashlib import hashlib
import io
import os
import tempfile
import time import time
from pathlib import Path
from typing import Any
from PIL import Image from PIL import Image
from src.common.database.sqlalchemy_models import Videos, get_db_session # type: ignore
from src.common.logger import get_logger from src.common.logger import get_logger
from src.config.config import global_config, model_config from src.config.config import global_config, model_config
from src.llm_models.utils_model import LLMRequest from src.llm_models.utils_model import LLMRequest
from src.common.database.sqlalchemy_models import Videos, get_db_session # type: ignore
# 简易并发控制:同一 hash 只处理一次 # 简易并发控制:同一 hash 只处理一次
_video_locks: Dict[str, asyncio.Lock] = {} _video_locks: dict[str, asyncio.Lock] = {}
_locks_guard = asyncio.Lock() _locks_guard = asyncio.Lock()
logger = get_logger("utils_video") logger = get_logger("utils_video")
@@ -90,7 +89,7 @@ class VideoAnalyzer:
logger.debug(f"获取系统信息失败: {e}") logger.debug(f"获取系统信息失败: {e}")
# ---- 关键帧提取 ---- # ---- 关键帧提取 ----
async def extract_keyframes(self, video_path: str) -> List[Tuple[str, float]]: async def extract_keyframes(self, video_path: str) -> list[tuple[str, float]]:
"""提取关键帧并返回 (base64, timestamp_seconds) 列表""" """提取关键帧并返回 (base64, timestamp_seconds) 列表"""
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
result = video.extract_keyframes_from_video( # type: ignore[attr-defined] result = video.extract_keyframes_from_video( # type: ignore[attr-defined]
@@ -105,7 +104,7 @@ class VideoAnalyzer:
) )
files = sorted(Path(tmp).glob("keyframe_*.jpg"))[: self.max_frames] files = sorted(Path(tmp).glob("keyframe_*.jpg"))[: self.max_frames]
total_ms = getattr(result, "total_time_ms", 0) total_ms = getattr(result, "total_time_ms", 0)
frames: List[Tuple[str, float]] = [] frames: list[tuple[str, float]] = []
for i, f in enumerate(files): for i, f in enumerate(files):
img = Image.open(f).convert("RGB") img = Image.open(f).convert("RGB")
if max(img.size) > self.max_image_size: if max(img.size) > self.max_image_size:
@@ -119,7 +118,7 @@ class VideoAnalyzer:
return frames return frames
# ---- 批量分析 ---- # ---- 批量分析 ----
async def _analyze_batch(self, frames: List[Tuple[str, float]], question: Optional[str]) -> str: async def _analyze_batch(self, frames: list[tuple[str, float]], question: str | None) -> str:
from src.llm_models.payload_content.message import MessageBuilder, RoleType from src.llm_models.payload_content.message import MessageBuilder, RoleType
from src.llm_models.utils_model import RequestType from src.llm_models.utils_model import RequestType
prompt = self.batch_analysis_prompt.format( prompt = self.batch_analysis_prompt.format(
@@ -149,8 +148,8 @@ class VideoAnalyzer:
return resp.content or "❌ 未获得响应" return resp.content or "❌ 未获得响应"
# ---- 逐帧分析 ---- # ---- 逐帧分析 ----
async def _analyze_sequential(self, frames: List[Tuple[str, float]], question: Optional[str]) -> str: async def _analyze_sequential(self, frames: list[tuple[str, float]], question: str | None) -> str:
results: List[str] = [] results: list[str] = []
for i, (b64, ts) in enumerate(frames): for i, (b64, ts) in enumerate(frames):
prompt = f"分析第{i+1}" + (f" (时间: {ts:.2f}s)" if self.enable_frame_timing else "") prompt = f"分析第{i+1}" + (f" (时间: {ts:.2f}s)" if self.enable_frame_timing else "")
if question: if question:
@@ -174,7 +173,7 @@ class VideoAnalyzer:
return "\n".join(results) return "\n".join(results)
# ---- 主入口 ---- # ---- 主入口 ----
async def analyze_video(self, video_path: str, question: Optional[str] = None) -> Tuple[bool, str]: async def analyze_video(self, video_path: str, question: str | None = None) -> tuple[bool, str]:
if not os.path.exists(video_path): if not os.path.exists(video_path):
return False, "❌ 文件不存在" return False, "❌ 文件不存在"
frames = await self.extract_keyframes(video_path) frames = await self.extract_keyframes(video_path)
@@ -189,10 +188,10 @@ class VideoAnalyzer:
async def analyze_video_from_bytes( async def analyze_video_from_bytes(
self, self,
video_bytes: bytes, video_bytes: bytes,
filename: Optional[str] = None, filename: str | None = None,
prompt: Optional[str] = None, prompt: str | None = None,
question: Optional[str] = None, question: str | None = None,
) -> Dict[str, str]: ) -> dict[str, str]:
"""从内存字节分析视频,兼容旧调用 (prompt / question 二选一) 返回 {"summary": str}.""" """从内存字节分析视频,兼容旧调用 (prompt / question 二选一) 返回 {"summary": str}."""
if not video_bytes: if not video_bytes:
return {"summary": "❌ 空视频数据"} return {"summary": "❌ 空视频数据"}
@@ -271,7 +270,7 @@ class VideoAnalyzer:
# ---- 外部接口 ---- # ---- 外部接口 ----
_INSTANCE: Optional[VideoAnalyzer] = None _INSTANCE: VideoAnalyzer | None = None
def get_video_analyzer() -> VideoAnalyzer: def get_video_analyzer() -> VideoAnalyzer:
@@ -285,7 +284,7 @@ def is_video_analysis_available() -> bool:
return True return True
def get_video_analysis_status() -> Dict[str, Any]: def get_video_analysis_status() -> dict[str, Any]:
try: try:
info = video.get_system_info() # type: ignore[attr-defined] info = video.get_system_info() # type: ignore[attr-defined]
except Exception as e: # pragma: no cover except Exception as e: # pragma: no cover
@@ -297,4 +296,4 @@ def get_video_analysis_status() -> Dict[str, Any]:
"modes": ["auto", "batch", "sequential"], "modes": ["auto", "batch", "sequential"],
"max_frames_default": inst.max_frames, "max_frames_default": inst.max_frames,
"implementation": "inkfox", "implementation": "inkfox",
} }

View File

@@ -53,8 +53,8 @@ class StreamContext(BaseDataModel):
priority_mode: str | None = None priority_mode: str | None = None
priority_info: dict | None = None priority_info: dict | None = None
def add_action_to_message(self, message_id: str, action: str): def add_action_to_message(self, message_id: str, action: str):
""" """
向指定消息添加执行的动作 向指定消息添加执行的动作

View File

@@ -5,7 +5,6 @@ MCP (Model Context Protocol) SSE (Server-Sent Events) 客户端实现
import asyncio import asyncio
import io import io
import json
from collections.abc import Callable from collections.abc import Callable
from typing import Any from typing import Any
@@ -20,7 +19,6 @@ from ..exceptions import (
NetworkConnectionError, NetworkConnectionError,
ReqAbortException, ReqAbortException,
RespNotOkException, RespNotOkException,
RespParseException,
) )
from ..payload_content.message import Message, RoleType from ..payload_content.message import Message, RoleType
from ..payload_content.resp_format import RespFormat from ..payload_content.resp_format import RespFormat

View File

@@ -5,21 +5,18 @@ import sys
import time import time
import traceback import traceback
from functools import partial from functools import partial
from random import choices
from typing import Any from typing import Any
from maim_message import MessageServer from maim_message import MessageServer
from rich.traceback import install from rich.traceback import install
from src.chat.emoji_system.emoji_manager import get_emoji_manager from src.chat.emoji_system.emoji_manager import get_emoji_manager
# 导入增强记忆系统管理器
from src.chat.memory_system.memory_manager import memory_manager from src.chat.memory_system.memory_manager import memory_manager
from src.chat.message_receive.bot import chat_bot from src.chat.message_receive.bot import chat_bot
from src.chat.message_receive.chat_stream import get_chat_manager from src.chat.message_receive.chat_stream import get_chat_manager
from src.chat.utils.statistic import OnlineTimeRecordTask, StatisticOutputTask from src.chat.utils.statistic import OnlineTimeRecordTask, StatisticOutputTask
from src.common.logger import get_logger from src.common.logger import get_logger
# 导入消息API和traceback模块
from src.common.message import get_global_api from src.common.message import get_global_api
from src.common.remote import TelemetryHeartBeatTask from src.common.remote import TelemetryHeartBeatTask
from src.common.server import Server, get_global_server from src.common.server import Server, get_global_server
@@ -29,21 +26,34 @@ from src.manager.async_task_manager import async_task_manager
from src.mood.mood_manager import mood_manager from src.mood.mood_manager import mood_manager
from src.plugin_system.base.component_types import EventType from src.plugin_system.base.component_types import EventType
from src.plugin_system.core.event_manager import event_manager from src.plugin_system.core.event_manager import event_manager
# from src.api.main import start_api_server
# 导入新的插件管理器
from src.plugin_system.core.plugin_manager import plugin_manager from src.plugin_system.core.plugin_manager import plugin_manager
from src.schedule.monthly_plan_manager import monthly_plan_manager from src.schedule.monthly_plan_manager import monthly_plan_manager
from src.schedule.schedule_manager import schedule_manager from src.schedule.schedule_manager import schedule_manager
# 插件系统现在使用统一的插件加载器 # 插件系统现在使用统一的插件加载器
install(extra_lines=3) install(extra_lines=3)
logger = get_logger("main") logger = get_logger("main")
# 预定义彩蛋短语,避免在每次初始化时重新创建
EGG_PHRASES: list[tuple[str, int]] = [
("我们的代码里真的没有bug只有'特性'", 10),
("你知道吗?阿范喜欢被切成臊子😡", 10),
("你知道吗,雅诺狐的耳朵其实很好摸", 5),
("你群最高技术力————言柒姐姐!", 20),
("初墨小姐宇宙第一(不是)", 10),
("world.execute(me);", 10),
("正在尝试连接到MaiBot的服务器...连接失败...正在转接到maimaiDX", 10),
("你的bug就像星星一样多而我的代码像太阳一样一出来就看不见了。", 10),
("温馨提示:请不要在代码中留下任何魔法数字,除非你知道它的含义。", 10),
("世界上只有10种人懂二进制的和不懂的。", 10),
("喵喵~你的麦麦被猫娘入侵了喵~", 15),
("恭喜你触发了稀有彩蛋喵:诺狐嗷呜~ ~", 1),
("恭喜你!!!你的开发者模式已成功开启,快来加入我们吧!(๑•̀ㅂ•́)و✧ (小声bb:其实是当黑奴)", 10),
]
def _task_done_callback(task: asyncio.Task, message_id: str, start_time: float):
def _task_done_callback(task: asyncio.Task, message_id: str, start_time: float) -> None:
"""后台任务完成时的回调函数""" """后台任务完成时的回调函数"""
end_time = time.time() end_time = time.time()
duration = end_time - start_time duration = end_time - start_time
@@ -58,10 +68,11 @@ def _task_done_callback(task: asyncio.Task, message_id: str, start_time: float):
class MainSystem: class MainSystem:
def __init__(self): """主系统类,负责协调所有组件"""
def __init__(self) -> None:
# 使用增强记忆系统 # 使用增强记忆系统
self.memory_manager = memory_manager self.memory_manager = memory_manager
self.individuality: Individuality = get_individuality() self.individuality: Individuality = get_individuality()
# 使用消息API替代直接的FastAPI实例 # 使用消息API替代直接的FastAPI实例
@@ -69,15 +80,21 @@ class MainSystem:
self.server: Server = get_global_server() self.server: Server = get_global_server()
# 设置信号处理器用于优雅退出 # 设置信号处理器用于优雅退出
self._shutting_down = False
self._setup_signal_handlers() self._setup_signal_handlers()
def _setup_signal_handlers(self): # 存储清理任务的引用
self._cleanup_tasks: list[asyncio.Task] = []
def _setup_signal_handlers(self) -> None:
"""设置信号处理器""" """设置信号处理器"""
def signal_handler(signum, frame): def signal_handler(signum, frame):
logger.info("收到退出信号,正在优雅关闭系统...") if self._shutting_down:
logger.warning("系统已经在关闭过程中,忽略重复信号")
return
import asyncio self._shutting_down = True
logger.info("收到退出信号,正在优雅关闭系统...")
try: try:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
@@ -85,11 +102,15 @@ class MainSystem:
# 如果事件循环正在运行,创建任务并设置回调 # 如果事件循环正在运行,创建任务并设置回调
async def cleanup_and_exit(): async def cleanup_and_exit():
await self._async_cleanup() await self._async_cleanup()
# 给日志系统一点时间刷新
await asyncio.sleep(0.1)
sys.exit(0) sys.exit(0)
task = asyncio.create_task(cleanup_and_exit()) task = asyncio.create_task(cleanup_and_exit())
# 存储清理任务引用
self._cleanup_tasks.append(task)
# 添加任务完成回调,确保程序退出 # 添加任务完成回调,确保程序退出
task.add_done_callback(lambda t: None) task.add_done_callback(lambda t: sys.exit(0) if not t.cancelled() else None)
else: else:
# 如果事件循环未运行,使用同步清理 # 如果事件循环未运行,使用同步清理
self._cleanup() self._cleanup()
@@ -101,7 +122,7 @@ class MainSystem:
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
async def _initialize_interest_calculator(self): async def _initialize_interest_calculator(self) -> None:
"""初始化兴趣值计算组件 - 通过插件系统自动发现和加载""" """初始化兴趣值计算组件 - 通过插件系统自动发现和加载"""
try: try:
logger.info("开始自动发现兴趣值计算组件...") logger.info("开始自动发现兴趣值计算组件...")
@@ -120,21 +141,14 @@ class MainSystem:
logger.warning("未发现任何兴趣计算器组件") logger.warning("未发现任何兴趣计算器组件")
return return
logger.info("发现的兴趣计算器组件:")
for calc_name, calc_info in interest_calculators.items():
enabled = getattr(calc_info, "enabled", True)
default_enabled = getattr(calc_info, "enabled_by_default", True)
logger.info(f" - {calc_name}: 启用: {enabled}, 默认启用: {default_enabled}")
# 初始化兴趣度管理器 # 初始化兴趣度管理器
from src.chat.interest_system.interest_manager import get_interest_manager from src.chat.interest_system.interest_manager import get_interest_manager
interest_manager = get_interest_manager() interest_manager = get_interest_manager()
await interest_manager.initialize() await interest_manager.initialize()
# 尝试注册计算器(单例模式,只注册第一个可用的) # 尝试注册所有可用的计算器
registered_calculator = None registered_calculators = []
# 使用组件注册表获取组件类并注册
for calc_name, calc_info in interest_calculators.items(): for calc_name, calc_info in interest_calculators.items():
enabled = getattr(calc_info, "enabled", True) enabled = getattr(calc_info, "enabled", True)
default_enabled = getattr(calc_info, "enabled_by_default", True) default_enabled = getattr(calc_info, "enabled_by_default", True)
@@ -147,140 +161,188 @@ class MainSystem:
from src.plugin_system.core.component_registry import component_registry from src.plugin_system.core.component_registry import component_registry
component_class = component_registry.get_component_class(calc_name, ComponentType.INTEREST_CALCULATOR) component_class = component_registry.get_component_class(calc_name, ComponentType.INTEREST_CALCULATOR)
if component_class: if not component_class:
logger.info(f"成功获取 {calc_name} 的组件类: {component_class.__name__}")
# 创建组件实例
calculator_instance = component_class()
logger.info(f"成功创建兴趣计算器实例: {calc_name}")
# 初始化组件
if await calculator_instance.initialize():
# 注册到兴趣管理器
success = await interest_manager.register_calculator(calculator_instance)
if success:
registered_calculator = calculator_instance
logger.info(f"成功注册兴趣计算器: {calc_name}")
break # 只注册一个成功的计算器
else:
logger.error(f"兴趣计算器 {calc_name} 注册失败")
else:
logger.error(f"兴趣计算器 {calc_name} 初始化失败")
else:
logger.warning(f"无法找到 {calc_name} 的组件类") logger.warning(f"无法找到 {calc_name} 的组件类")
continue
logger.info(f"成功获取 {calc_name} 的组件类: {component_class.__name__}")
# 创建组件实例
calculator_instance = component_class()
# 初始化组件
if not await calculator_instance.initialize():
logger.error(f"兴趣计算器 {calc_name} 初始化失败")
continue
# 注册到兴趣管理器
if await interest_manager.register_calculator(calculator_instance):
registered_calculators.append(calculator_instance)
logger.info(f"成功注册兴趣计算器: {calc_name}")
else:
logger.error(f"兴趣计算器 {calc_name} 注册失败")
except Exception as e: except Exception as e:
logger.error(f"处理兴趣计算器 {calc_name} 时出错: {e}", exc_info=True) logger.error(f"处理兴趣计算器 {calc_name} 时出错: {e}", exc_info=True)
if registered_calculator: if registered_calculators:
logger.info(f"当前活跃的兴趣度计算器: {registered_calculator.component_name} v{registered_calculator.component_version}") logger.info(f"成功注册了 {len(registered_calculators)} 个兴趣计算器")
for calc in registered_calculators:
logger.info(f" - {calc.component_name} v{calc.component_version}")
else: else:
logger.error("未能成功注册任何兴趣计算器") logger.error("未能成功注册任何兴趣计算器")
except Exception as e: except Exception as e:
logger.error(f"初始化兴趣度计算器失败: {e}", exc_info=True) logger.error(f"初始化兴趣度计算器失败: {e}", exc_info=True)
async def _async_cleanup(self): async def _async_cleanup(self) -> None:
"""异步清理资源""" """异步清理资源"""
if self._shutting_down:
return
self._shutting_down = True
logger.info("开始系统清理流程...")
cleanup_tasks = []
# 停止数据库服务
try: try:
from src.common.database.database import stop_database
# 停止数据库服务 cleanup_tasks.append(("数据库服务", stop_database()))
try:
from src.common.database.database import stop_database
await stop_database()
logger.info("🛑 数据库服务已停止")
except Exception as e:
logger.error(f"停止数据库服务时出错: {e}")
# 停止消息管理器
try:
from src.chat.message_manager import message_manager
await message_manager.stop()
logger.info("🛑 消息管理器已停止")
except Exception as e:
logger.error(f"停止消息管理器时出错: {e}")
# 停止消息重组器
try:
from src.plugin_system import EventType
from src.plugin_system.core.event_manager import event_manager
from src.utils.message_chunker import reassembler
await event_manager.trigger_event(EventType.ON_STOP, permission_group="SYSTEM")
await reassembler.stop_cleanup_task()
logger.info("🛑 消息重组器已停止")
except Exception as e:
logger.error(f"停止消息重组器时出错: {e}")
# 停止增强记忆系统
try:
if global_config.memory.enable_memory:
await self.memory_manager.shutdown()
logger.info("🛑 增强记忆系统已停止")
except Exception as e:
logger.error(f"停止增强记忆系统时出错: {e}")
except Exception as e: except Exception as e:
logger.error(f"异步清理资源时出错: {e}") logger.error(f"准备停止数据库服务时出错: {e}")
def _cleanup(self): # 停止消息管理器
try:
from src.chat.message_manager import message_manager
cleanup_tasks.append(("消息管理器", message_manager.stop()))
except Exception as e:
logger.error(f"准备停止消息管理器时出错: {e}")
# 停止消息重组器
try:
from src.utils.message_chunker import reassembler
cleanup_tasks.append(("消息重组器", reassembler.stop_cleanup_task()))
except Exception as e:
logger.error(f"准备停止消息重组器时出错: {e}")
# 停止增强记忆系统
try:
if global_config.memory.enable_memory:
cleanup_tasks.append(("增强记忆系统", self.memory_manager.shutdown()))
except Exception as e:
logger.error(f"准备停止增强记忆系统时出错: {e}")
# 触发停止事件
try:
from src.plugin_system.core.event_manager import event_manager
cleanup_tasks.append(("插件系统停止事件",
event_manager.trigger_event(EventType.ON_STOP, permission_group="SYSTEM")))
except Exception as e:
logger.error(f"准备触发停止事件时出错: {e}")
# 停止表情管理器
try:
cleanup_tasks.append(("表情管理器",
asyncio.get_event_loop().run_in_executor(None, get_emoji_manager().shutdown)))
except Exception as e:
logger.error(f"准备停止表情管理器时出错: {e}")
# 停止服务器
try:
if self.server:
cleanup_tasks.append(("服务器", self.server.shutdown()))
except Exception as e:
logger.error(f"准备停止服务器时出错: {e}")
# 停止应用
try:
if self.app:
if hasattr(self.app, "shutdown"):
cleanup_tasks.append(("应用", self.app.shutdown()))
elif hasattr(self.app, "stop"):
cleanup_tasks.append(("应用", self.app.stop()))
except Exception as e:
logger.error(f"准备停止应用时出错: {e}")
# 并行执行所有清理任务
if cleanup_tasks:
logger.info(f"开始并行执行 {len(cleanup_tasks)} 个清理任务...")
tasks = [task for _, task in cleanup_tasks]
task_names = [name for name, _ in cleanup_tasks]
# 使用asyncio.gather并行执行设置超时防止卡死
try:
results = await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=30.0 # 30秒超时
)
# 记录结果
for i, (name, result) in enumerate(zip(task_names, results)):
if isinstance(result, Exception):
logger.error(f"停止 {name} 时出错: {result}")
else:
logger.info(f"🛑 {name} 已停止")
except asyncio.TimeoutError:
logger.error("清理任务超时,强制退出")
except Exception as e:
logger.error(f"执行清理任务时发生错误: {e}")
else:
logger.warning("没有需要清理的任务")
def _cleanup(self) -> None:
"""同步清理资源(向后兼容)""" """同步清理资源(向后兼容)"""
import asyncio
try: try:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
if loop.is_running(): if loop.is_running():
# 如果循环正在运行,创建异步清理任务 # 如果循环正在运行,创建异步清理任务
asyncio.create_task(self._async_cleanup()) task = asyncio.create_task(self._async_cleanup())
self._cleanup_tasks.append(task)
else: else:
# 如果循环未运行,直接运行异步清理 # 如果循环未运行,直接运行异步清理
loop.run_until_complete(self._async_cleanup()) loop.run_until_complete(self._async_cleanup())
except Exception as e: except Exception as e:
logger.error(f"同步清理资源时出错: {e}") logger.error(f"同步清理资源时出错: {e}")
async def _message_process_wrapper(self, message_data: dict[str, Any]): async def _message_process_wrapper(self, message_data: dict[str, Any]) -> None:
"""并行处理消息的包装器""" """并行处理消息的包装器"""
try: try:
start_time = time.time() start_time = time.time()
message_id = message_data.get("message_info", {}).get("message_id", "UNKNOWN") message_id = message_data.get("message_info", {}).get("message_id", "UNKNOWN")
# 检查系统是否正在关闭
if self._shutting_down:
logger.warning(f"系统正在关闭,拒绝处理消息 {message_id}")
return
# 创建后台任务 # 创建后台任务
task = asyncio.create_task(chat_bot.message_process(message_data)) task = asyncio.create_task(chat_bot.message_process(message_data))
logger.debug(f"已为消息 {message_id} 创建后台处理任务 (ID: {id(task)})") logger.debug(f"已为消息 {message_id} 创建后台处理任务 (ID: {id(task)})")
# 添加一个回调函数,当任务完成时,它会被调用 # 添加一个回调函数,当任务完成时,它会被调用
task.add_done_callback(partial(_task_done_callback, message_id=message_id, start_time=start_time)) task.add_done_callback(partial(_task_done_callback, message_id=message_id, start_time=start_time))
except Exception: except Exception:
logger.error("在创建消息处理任务时发生严重错误:") logger.error("在创建消息处理任务时发生严重错误:")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
async def initialize(self): async def initialize(self) -> None:
"""初始化系统组件""" """初始化系统组件"""
# 检查必要的配置
if not hasattr(global_config, "bot") or not hasattr(global_config.bot, "nickname"):
logger.error("缺少必要的bot配置")
raise ValueError("Bot配置不完整")
logger.info(f"正在唤醒{global_config.bot.nickname}......") logger.info(f"正在唤醒{global_config.bot.nickname}......")
# 其他初始化任务 # 初始化组件
await asyncio.gather(self._init_components()) await self._init_components()
phrases = [
("我们的代码里真的没有bug只有特性.", 10),
("你知道吗?阿范喜欢被切成臊子😡", 10), # 你加的提示出语法问题来了😡😡😡😡😡😡😡
("你知道吗,雅诺狐的耳朵其实很好摸", 5),
("你群最高技术力————言柒姐姐!", 20),
("初墨小姐宇宙第一(不是)", 10), # 15
("world.execute(me);", 10),
("正在尝试连接到MaiBot的服务器...连接失败...正在转接到maimaiDX", 10),
("你的bug就像星星一样多而我的代码像太阳一样一出来就看不见了。", 10),
("温馨提示:请不要在代码中留下任何魔法数字,除非你知道它的含义。", 10),
("世界上只有10种人懂二进制的和不懂的。", 10),
("喵喵~你的麦麦被猫娘入侵了喵~", 15),
("恭喜你触发了稀有彩蛋喵:诺狐嗷呜~ ~", 1),
("恭喜你!!!你的开发者模式已成功开启,快来加入我们吧!(๑•̀ㅂ•́)و✧ (小声bb:其实是当黑奴)", 10),
]
from random import choices
# 分离彩蛋和权重 # 随机选择彩蛋
egg_texts, weights = zip(*phrases, strict=True) egg_texts, weights = zip(*EGG_PHRASES)
selected_egg = choices(egg_texts, weights=weights, k=1)[0]
# 使用choices进行带权重的随机选择
selected_egg = choices(egg_texts, weights=weights, k=1)
eggs = selected_egg[0]
logger.info(f""" logger.info(f"""
全部系统初始化完成,{global_config.bot.nickname}已成功唤醒 全部系统初始化完成,{global_config.bot.nickname}已成功唤醒
========================================================= =========================================================
@@ -292,54 +354,52 @@ MoFox_Bot(第三方修改版)
========================================================= =========================================================
这是基于原版MMC的社区改版包含增强功能和优化(同时也有更多的'特性') 这是基于原版MMC的社区改版包含增强功能和优化(同时也有更多的'特性')
========================================================= =========================================================
小贴士:{eggs} 小贴士:{selected_egg}
""") """)
async def _init_components(self): async def _init_components(self) -> None:
"""初始化其他组件""" """初始化其他组件"""
init_start_time = time.time() init_start_time = time.time()
# 添加在线时间统计任务 # 并行初始化基础组件
await async_task_manager.add_task(OnlineTimeRecordTask()) base_init_tasks = [
async_task_manager.add_task(OnlineTimeRecordTask()),
async_task_manager.add_task(StatisticOutputTask()),
async_task_manager.add_task(TelemetryHeartBeatTask()),
]
# 添加统计信息输出任务 await asyncio.gather(*base_init_tasks, return_exceptions=True)
await async_task_manager.add_task(StatisticOutputTask()) logger.info("基础定时任务初始化成功")
# 添加遥测心跳任务
await async_task_manager.add_task(TelemetryHeartBeatTask())
# 注册默认事件 # 注册默认事件
event_manager.init_default_events() event_manager.init_default_events()
# 初始化权限管理器 # 初始化权限管理器
from src.plugin_system.apis.permission_api import permission_api try:
from src.plugin_system.core.permission_manager import PermissionManager from src.plugin_system.apis.permission_api import permission_api
from src.plugin_system.core.permission_manager import PermissionManager
permission_manager = PermissionManager() permission_manager = PermissionManager()
await permission_manager.initialize() await permission_manager.initialize()
permission_api.set_permission_manager(permission_manager) permission_api.set_permission_manager(permission_manager)
logger.info("权限管理器初始化成功") logger.info("权限管理器初始化成功")
except Exception as e:
# 启动API服务器 logger.error(f"权限管理器初始化失败: {e}")
# start_api_server()
# logger.info("API服务器启动成功")
# 注册API路由 # 注册API路由
try: try:
from src.api.message_router import router as message_router from src.api.message_router import router as message_router
self.server.register_router(message_router, prefix="/api") self.server.register_router(message_router, prefix="/api")
logger.info("API路由注册成功") logger.info("API路由注册成功")
except ImportError as e:
logger.error(f"导入API路由失败: {e}")
except Exception as e: except Exception as e:
logger.error(f"注册API路由时发生错误: {e}") logger.error(f"注册API路由失败: {e}")
# 加载所有actions包括默认的和插件 # 加载所有插件
plugin_manager.load_all_plugins() plugin_manager.load_all_plugins()
# 处理所有缓存的事件订阅(插件加载完成后) # 处理所有缓存的事件订阅(插件加载完成后)
event_manager.process_all_pending_subscriptions() event_manager.process_all_pending_subscriptions()
# 初始化MCP工具提供器 # 初始化MCP工具提供器
try: try:
mcp_config = global_config.get("mcp_servers", []) mcp_config = global_config.get("mcp_servers", [])
@@ -350,74 +410,83 @@ MoFox_Bot(第三方修改版)
except Exception as e: except Exception as e:
logger.info(f"MCP工具提供器未配置或初始化失败: {e}") logger.info(f"MCP工具提供器未配置或初始化失败: {e}")
# 初始化表情管理器 # 并行初始化其他管理器
get_emoji_manager().initialize() manager_init_tasks = []
logger.info("表情包管理器初始化成功")
""" # 表情管理器
# 初始化回复后关系追踪系统 manager_init_tasks.append(self._safe_init("表情包管理器", get_emoji_manager().initialize))
try:
from src.plugins.built_in.affinity_flow_chatter.interest_scoring import chatter_interest_scoring_system
from src.plugins.built_in.affinity_flow_chatter.relationship_tracker import ChatterRelationshipTracker
relationship_tracker = ChatterRelationshipTracker(interest_scoring_system=chatter_interest_scoring_system) # 情绪管理器
chatter_interest_scoring_system.relationship_tracker = relationship_tracker manager_init_tasks.append(self._safe_init("情绪管理器", mood_manager.start))
logger.info("回复后关系追踪系统初始化成功")
except Exception as e:
logger.error(f"回复后关系追踪系统初始化失败: {e}")
relationship_tracker = None
"""
# 启动情绪管理器 # 聊天管理器
await mood_manager.start() manager_init_tasks.append(self._safe_init("聊天管理器", get_chat_manager()._initialize))
logger.info("情绪管理器初始化成功")
# 初始化聊天管理器 # 等待所有管理器初始化完成
await get_chat_manager()._initialize() results = await asyncio.gather(*manager_init_tasks, return_exceptions=True)
# 检查初始化结果
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"组件初始化失败: {result}")
# 启动聊天管理器的自动保存任务
asyncio.create_task(get_chat_manager()._auto_save_task()) asyncio.create_task(get_chat_manager()._auto_save_task())
logger.info("聊天管理器初始化成功")
# 初始化增强记忆系统 # 初始化增强记忆系统
await self.memory_manager.initialize() if global_config.memory.enable_memory:
logger.info("增强记忆系统初始化成功") await self._safe_init("增强记忆系统", self.memory_manager.initialize)()
else:
# 老记忆系统已完全删除 logger.info("记忆系统已禁用,跳过初始化")
# 初始化消息兴趣值计算组件 # 初始化消息兴趣值计算组件
await self._initialize_interest_calculator() await self._initialize_interest_calculator()
# 初始化LPMM知识库 # 初始化LPMM知识库
from src.chat.knowledge.knowledge_lib import initialize_lpmm_knowledge try:
from src.chat.knowledge.knowledge_lib import initialize_lpmm_knowledge
initialize_lpmm_knowledge()
logger.info("LPMM知识库初始化成功")
except Exception as e:
logger.error(f"LPMM知识库初始化失败: {e}")
initialize_lpmm_knowledge() # 将消息处理函数注册到API
logger.info("LPMM知识库初始化成功")
# 异步记忆管理器已禁用,增强记忆系统有内置的优化机制
logger.info("异步记忆管理器已禁用 - 使用增强记忆系统内置优化")
# await asyncio.sleep(0.5) #防止logger输出飞了
# 将bot.py中的chat_bot.message_process消息处理函数注册到api.py的消息处理基类中
self.app.register_message_handler(self._message_process_wrapper) self.app.register_message_handler(self._message_process_wrapper)
# 启动消息重组器的清理任务 # 启动消息重组器
from src.utils.message_chunker import reassembler try:
from src.utils.message_chunker import reassembler
await reassembler.start_cleanup_task() await reassembler.start_cleanup_task()
logger.info("消息重组器已启动") logger.info("消息重组器已启动")
except Exception as e:
logger.error(f"启动消息重组器失败: {e}")
# 启动消息管理器 # 启动消息管理器
from src.chat.message_manager import message_manager try:
from src.chat.message_manager import message_manager
await message_manager.start() await message_manager.start()
logger.info("消息管理器已启动") logger.info("消息管理器已启动")
except Exception as e:
logger.error(f"启动消息管理器失败: {e}")
# 初始化个体特征 # 初始化个体特征
await self.individuality.initialize() await self._safe_init("个体特征", self.individuality.initialize)()
# 初始化计划相关组件
await self._init_planning_components()
# 触发启动事件
try:
await event_manager.trigger_event(EventType.ON_START, permission_group="SYSTEM")
init_time = int(1000 * (time.time() - init_start_time))
logger.info(f"初始化完成,神经元放电{init_time}")
except Exception as e:
logger.error(f"启动事件触发失败: {e}")
async def _init_planning_components(self) -> None:
"""初始化计划相关组件"""
# 初始化月度计划管理器 # 初始化月度计划管理器
if global_config.planning_system.monthly_plan_enable: if global_config.planning_system.monthly_plan_enable:
logger.info("正在初始化月度计划管理器...")
try: try:
await monthly_plan_manager.start_monthly_plan_generation() await monthly_plan_manager.start_monthly_plan_generation()
logger.info("月度计划管理器初始化成功") logger.info("月度计划管理器初始化成功")
@@ -426,23 +495,31 @@ MoFox_Bot(第三方修改版)
# 初始化日程管理器 # 初始化日程管理器
if global_config.planning_system.schedule_enable: if global_config.planning_system.schedule_enable:
logger.info("日程表功能已启用,正在初始化管理器...") try:
await schedule_manager.load_or_generate_today_schedule() await schedule_manager.load_or_generate_today_schedule()
await schedule_manager.start_daily_schedule_generation() await schedule_manager.start_daily_schedule_generation()
logger.info("日程表管理器初始化成功") logger.info("日程表管理器初始化成功")
except Exception as e:
logger.error(f"日程表管理器初始化失败: {e}")
try: async def _safe_init(self, component_name: str, init_func) -> callable:
await event_manager.trigger_event(EventType.ON_START, permission_group="SYSTEM") """安全初始化组件,捕获异常"""
init_time = int(1000 * (time.time() - init_start_time)) async def wrapper():
logger.info(f"初始化完成,神经元放电{init_time}") try:
except Exception as e: result = init_func()
logger.error(f"启动大脑和外部世界失败: {e}") if asyncio.iscoroutine(result):
raise await result
logger.info(f"{component_name}初始化成功")
return True
except Exception as e:
logger.error(f"{component_name}初始化失败: {e}")
return False
return wrapper
async def schedule_tasks(self): async def schedule_tasks(self) -> None:
"""调度定时任务""" """调度定时任务"""
try: try:
while True: while not self._shutting_down:
try: try:
tasks = [ tasks = [
get_emoji_manager().start_periodic_check_register(), get_emoji_manager().start_periodic_check_register(),
@@ -450,23 +527,25 @@ MoFox_Bot(第三方修改版)
self.server.run(), self.server.run(),
] ]
# 增强记忆系统不需要定时任务,已禁用原有记忆系统的定时任务
# 使用 return_exceptions=True 防止单个任务失败导致整个程序崩溃 # 使用 return_exceptions=True 防止单个任务失败导致整个程序崩溃
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)
except (ConnectionResetError, OSError) as e: except (ConnectionResetError, OSError) as e:
if self._shutting_down:
break
logger.warning(f"网络连接发生错误,尝试重新启动任务: {e}") logger.warning(f"网络连接发生错误,尝试重新启动任务: {e}")
await asyncio.sleep(1) # 短暂等待后重新开始 await asyncio.sleep(1)
continue
except asyncio.InvalidStateError as e: except asyncio.InvalidStateError as e:
if self._shutting_down:
break
logger.error(f"异步任务状态无效,重新初始化: {e}") logger.error(f"异步任务状态无效,重新初始化: {e}")
await asyncio.sleep(2) # 等待更长时间让系统稳定 await asyncio.sleep(2)
continue
except Exception as e: except Exception as e:
if self._shutting_down:
break
logger.error(f"调度任务发生未预期异常: {e}") logger.error(f"调度任务发生未预期异常: {e}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
await asyncio.sleep(5) # 发生其他错误时等待更长时间 await asyncio.sleep(5)
continue
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("调度任务被取消,正在退出...") logger.info("调度任务被取消,正在退出...")
@@ -475,52 +554,37 @@ MoFox_Bot(第三方修改版)
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
raise raise
async def shutdown(self): async def shutdown(self) -> None:
"""关闭系统组件""" """关闭系统组件"""
if self._shutting_down:
return
logger.info("正在关闭MainSystem...") logger.info("正在关闭MainSystem...")
await self._async_cleanup()
# 关闭表情管理器
try:
get_emoji_manager().shutdown()
logger.info("表情管理器已关闭")
except Exception as e:
logger.warning(f"关闭表情管理器时出错: {e}")
# 关闭服务器
try:
if self.server:
await self.server.shutdown()
logger.info("服务器已关闭")
except Exception as e:
logger.warning(f"关闭服务器时出错: {e}")
# 关闭应用 (MessageServer可能没有shutdown方法)
try:
if self.app:
if hasattr(self.app, "shutdown"):
await self.app.shutdown()
logger.info("应用已关闭")
elif hasattr(self.app, "stop"):
await self.app.stop()
logger.info("应用已停止")
else:
logger.info("应用没有shutdown方法跳过关闭")
except Exception as e:
logger.warning(f"关闭应用时出错: {e}")
logger.info("MainSystem关闭完成") logger.info("MainSystem关闭完成")
# 老记忆系统的定时任务已删除 - 增强记忆系统使用内置的维护机制
async def main() -> None:
async def main():
"""主函数""" """主函数"""
system = MainSystem() system = MainSystem()
await asyncio.gather( try:
system.initialize(), await system.initialize()
system.schedule_tasks(), await system.schedule_tasks()
) except KeyboardInterrupt:
logger.info("收到键盘中断信号")
except Exception as e:
logger.error(f"主函数执行失败: {e}")
logger.error(traceback.format_exc())
finally:
await system.shutdown()
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("程序被用户中断")
except Exception as e:
logger.error(f"程序执行失败: {e}")
logger.error(traceback.format_exc())
sys.exit(1)

View File

@@ -19,7 +19,7 @@ from src.common.logger import get_logger
from src.plugin_system.base.component_types import ActionInfo from src.plugin_system.base.component_types import ActionInfo
if TYPE_CHECKING: if TYPE_CHECKING:
from src.chat.replyer.default_generator import DefaultReplyer pass
install(extra_lines=3) install(extra_lines=3)

View File

@@ -19,7 +19,7 @@ def get_tool_instance(tool_name: str) -> BaseTool | None:
tool_class: type[BaseTool] = component_registry.get_component_class(tool_name, ComponentType.TOOL) # type: ignore tool_class: type[BaseTool] = component_registry.get_component_class(tool_name, ComponentType.TOOL) # type: ignore
if tool_class: if tool_class:
return tool_class(plugin_config) return tool_class(plugin_config)
# 如果不是常规工具检查是否是MCP工具 # 如果不是常规工具检查是否是MCP工具
# MCP工具不需要返回实例会在execute_tool_call中特殊处理 # MCP工具不需要返回实例会在execute_tool_call中特殊处理
return None return None
@@ -35,7 +35,7 @@ def get_llm_available_tool_definitions():
llm_available_tools = component_registry.get_llm_available_tools() llm_available_tools = component_registry.get_llm_available_tools()
tool_definitions = [(name, tool_class.get_tool_definition()) for name, tool_class in llm_available_tools.items()] tool_definitions = [(name, tool_class.get_tool_definition()) for name, tool_class in llm_available_tools.items()]
# 添加MCP工具 # 添加MCP工具
try: try:
from src.plugin_system.utils.mcp_tool_provider import mcp_tool_provider from src.plugin_system.utils.mcp_tool_provider import mcp_tool_provider
@@ -45,5 +45,5 @@ def get_llm_available_tool_definitions():
logger.debug(f"已添加 {len(mcp_tools)} 个MCP工具到可用工具列表") logger.debug(f"已添加 {len(mcp_tools)} 个MCP工具到可用工具列表")
except Exception as e: except Exception as e:
logger.debug(f"获取MCP工具失败可能未配置: {e}") logger.debug(f"获取MCP工具失败可能未配置: {e}")
return tool_definitions return tool_definitions

View File

@@ -279,7 +279,7 @@ class ToolExecutor:
logger.info( logger.info(
f"{self.log_prefix} 正在执行工具: [bold green]{function_name}[/bold green] | 参数: {function_args}" f"{self.log_prefix} 正在执行工具: [bold green]{function_name}[/bold green] | 参数: {function_args}"
) )
# 检查是否是MCP工具 # 检查是否是MCP工具
try: try:
from src.plugin_system.utils.mcp_tool_provider import mcp_tool_provider from src.plugin_system.utils.mcp_tool_provider import mcp_tool_provider
@@ -295,7 +295,7 @@ class ToolExecutor:
} }
except Exception as e: except Exception as e:
logger.debug(f"检查MCP工具时出错: {e}") logger.debug(f"检查MCP工具时出错: {e}")
function_args["llm_called"] = True # 标记为LLM调用 function_args["llm_called"] = True # 标记为LLM调用
# 检查是否是二步工具的第二步调用 # 检查是否是二步工具的第二步调用

View File

@@ -3,11 +3,9 @@ MCP (Model Context Protocol) 连接器
负责连接MCP服务器获取和执行工具 负责连接MCP服务器获取和执行工具
""" """
import asyncio
from typing import Any from typing import Any
import aiohttp import aiohttp
import orjson
from src.common.logger import get_logger from src.common.logger import get_logger

View File

@@ -3,7 +3,6 @@ MCP工具提供器 - 简化版
直接集成到工具系统,无需复杂的插件架构 直接集成到工具系统,无需复杂的插件架构
""" """
import asyncio
from typing import Any from typing import Any
from src.common.logger import get_logger from src.common.logger import get_logger

View File

@@ -4,9 +4,10 @@
""" """
import time import time
import orjson
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import orjson
from src.chat.interest_system import bot_interest_manager from src.chat.interest_system import bot_interest_manager
from src.common.logger import get_logger from src.common.logger import get_logger
from src.config.config import global_config from src.config.config import global_config

View File

@@ -230,11 +230,11 @@ class ChatterPlanExecutor:
except Exception as e: except Exception as e:
error_message = str(e) error_message = str(e)
logger.error(f"执行回复动作失败: {action_info.action_type}, 错误: {error_message}") logger.error(f"执行回复动作失败: {action_info.action_type}, 错误: {error_message}")
''' """
# 记录用户关系追踪 # 记录用户关系追踪
if success and action_info.action_message: if success and action_info.action_message:
await self._track_user_interaction(action_info, plan, reply_content) await self._track_user_interaction(action_info, plan, reply_content)
''' """
execution_time = time.time() - start_time execution_time = time.time() - start_time
self.execution_stats["execution_times"].append(execution_time) self.execution_stats["execution_times"].append(execution_time)

View File

@@ -10,10 +10,10 @@ from typing import TYPE_CHECKING, Any
from src.common.logger import get_logger from src.common.logger import get_logger
from src.config.config import global_config from src.config.config import global_config
from src.mood.mood_manager import mood_manager from src.mood.mood_manager import mood_manager
from src.plugin_system.base.component_types import ChatMode
from src.plugins.built_in.affinity_flow_chatter.plan_executor import ChatterPlanExecutor from src.plugins.built_in.affinity_flow_chatter.plan_executor import ChatterPlanExecutor
from src.plugins.built_in.affinity_flow_chatter.plan_filter import ChatterPlanFilter from src.plugins.built_in.affinity_flow_chatter.plan_filter import ChatterPlanFilter
from src.plugins.built_in.affinity_flow_chatter.plan_generator import ChatterPlanGenerator from src.plugins.built_in.affinity_flow_chatter.plan_generator import ChatterPlanGenerator
from src.plugin_system.base.component_types import ChatMode
if TYPE_CHECKING: if TYPE_CHECKING:
from src.chat.planner_actions.action_manager import ChatterActionManager from src.chat.planner_actions.action_manager import ChatterActionManager

View File

@@ -6,9 +6,7 @@ SearXNG search engine implementation
from __future__ import annotations from __future__ import annotations
import asyncio from typing import Any
import functools
from typing import Any, List
import httpx import httpx
@@ -39,13 +37,13 @@ class SearXNGSearchEngine(BaseSearchEngine):
instances = config_api.get_global_config("web_search.searxng_instances", None) instances = config_api.get_global_config("web_search.searxng_instances", None)
if isinstance(instances, list): if isinstance(instances, list):
# 过滤空值 # 过滤空值
self.instances: List[str] = [u.rstrip("/") for u in instances if isinstance(u, str) and u.strip()] self.instances: list[str] = [u.rstrip("/") for u in instances if isinstance(u, str) and u.strip()]
else: else:
self.instances = [] self.instances = []
api_keys = config_api.get_global_config("web_search.searxng_api_keys", None) api_keys = config_api.get_global_config("web_search.searxng_api_keys", None)
if isinstance(api_keys, list): if isinstance(api_keys, list):
self.api_keys: List[str | None] = [k.strip() if isinstance(k, str) and k.strip() else None for k in api_keys] self.api_keys: list[str | None] = [k.strip() if isinstance(k, str) and k.strip() else None for k in api_keys]
else: else:
self.api_keys = [] self.api_keys = []
@@ -85,7 +83,7 @@ class SearXNGSearchEngine(BaseSearchEngine):
results.extend(instance_results) results.extend(instance_results)
if len(results) >= num_results: if len(results) >= num_results:
break break
except Exception as e: # noqa: BLE001 except Exception as e:
logger.warning(f"SearXNG 实例 {base_url} 调用失败: {e}") logger.warning(f"SearXNG 实例 {base_url} 调用失败: {e}")
continue continue
@@ -116,12 +114,12 @@ class SearXNGSearchEngine(BaseSearchEngine):
try: try:
resp = await self._client.get(url, params=params, headers=headers) resp = await self._client.get(url, params=params, headers=headers)
resp.raise_for_status() resp.raise_for_status()
except Exception as e: # noqa: BLE001 except Exception as e:
raise RuntimeError(f"请求失败: {e}") from e raise RuntimeError(f"请求失败: {e}") from e
try: try:
data = resp.json() data = resp.json()
except Exception as e: # noqa: BLE001 except Exception as e:
raise RuntimeError(f"解析 JSON 失败: {e}") from e raise RuntimeError(f"解析 JSON 失败: {e}") from e
raw_results = data.get("results", []) if isinstance(data, dict) else [] raw_results = data.get("results", []) if isinstance(data, dict) else []
@@ -141,5 +139,5 @@ class SearXNGSearchEngine(BaseSearchEngine):
async def __aenter__(self): async def __aenter__(self):
return self return self
async def __aexit__(self, exc_type, exc, tb): # noqa: D401 async def __aexit__(self, exc_type, exc, tb):
await self._client.aclose() await self._client.aclose()

View File

@@ -41,8 +41,8 @@ class WEBSEARCHPLUGIN(BasePlugin):
from .engines.bing_engine import BingSearchEngine from .engines.bing_engine import BingSearchEngine
from .engines.ddg_engine import DDGSearchEngine from .engines.ddg_engine import DDGSearchEngine
from .engines.exa_engine import ExaSearchEngine from .engines.exa_engine import ExaSearchEngine
from .engines.tavily_engine import TavilySearchEngine
from .engines.searxng_engine import SearXNGSearchEngine from .engines.searxng_engine import SearXNGSearchEngine
from .engines.tavily_engine import TavilySearchEngine
# 实例化所有搜索引擎这会触发API密钥管理器的初始化 # 实例化所有搜索引擎这会触发API密钥管理器的初始化
exa_engine = ExaSearchEngine() exa_engine = ExaSearchEngine()

View File

@@ -13,8 +13,8 @@ from src.plugin_system.apis import config_api
from ..engines.bing_engine import BingSearchEngine from ..engines.bing_engine import BingSearchEngine
from ..engines.ddg_engine import DDGSearchEngine from ..engines.ddg_engine import DDGSearchEngine
from ..engines.exa_engine import ExaSearchEngine from ..engines.exa_engine import ExaSearchEngine
from ..engines.tavily_engine import TavilySearchEngine
from ..engines.searxng_engine import SearXNGSearchEngine from ..engines.searxng_engine import SearXNGSearchEngine
from ..engines.tavily_engine import TavilySearchEngine
from ..utils.formatters import deduplicate_results, format_search_results from ..utils.formatters import deduplicate_results, format_search_results
logger = get_logger("web_search_tool") logger = get_logger("web_search_tool")