diff --git a/src/chat/actions/plugin_action.py b/src/chat/actions/plugin_action.py index 04f9a545c..f678a0e18 100644 --- a/src/chat/actions/plugin_action.py +++ b/src/chat/actions/plugin_action.py @@ -6,14 +6,14 @@ import inspect import toml # 导入 toml 库 from abc import abstractmethod -# 导入拆分后的API模块 -from src.chat.actions.plugin_api.message_api import MessageAPI -from src.chat.actions.plugin_api.llm_api import LLMAPI -from src.chat.actions.plugin_api.database_api import DatabaseAPI -from src.chat.actions.plugin_api.config_api import ConfigAPI -from src.chat.actions.plugin_api.utils_api import UtilsAPI -from src.chat.actions.plugin_api.stream_api import StreamAPI -from src.chat.actions.plugin_api.hearflow_api import HearflowAPI +# 导入新插件系统的API模块 +from src.plugin_system.apis.message_api import MessageAPI +from src.plugin_system.apis.llm_api import LLMAPI +from src.plugin_system.apis.database_api import DatabaseAPI +from src.plugin_system.apis.config_api import ConfigAPI +from src.plugin_system.apis.utils_api import UtilsAPI +from src.plugin_system.apis.stream_api import StreamAPI +from src.plugin_system.apis.hearflow_api import HearflowAPI # 以下为类型注解需要 from src.chat.message_receive.chat_stream import ChatStream # noqa @@ -25,9 +25,14 @@ logger = get_logger("plugin_action") class PluginAction(BaseAction, MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI, HearflowAPI): - """插件动作基类 + """插件动作基类(旧版兼容) 封装了主程序内部依赖,提供简化的API接口给插件开发者 + + ⚠️ 此类已弃用,建议使用新的插件系统: + - 新基类:src.plugin_system.base.BaseAction + - 新API:src.plugin_system.plugin_api + - 新注册:@register_component 装饰器 """ action_config_file_name: Optional[str] = None # 插件可以覆盖此属性来指定配置文件名 diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 29d571905..46d1666d2 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -10,7 +10,7 @@ from src.experimental.PFC.pfc_manager import PFCManager from src.chat.focus_chat.heartflow_message_processor import HeartFCMessageReceiver from src.chat.utils.prompt_builder import Prompt, global_prompt_manager from src.config.config import global_config -from src.chat.command.command_handler import command_manager # 导入命令管理器 +from src.plugin_system.core.component_registry import component_registry # 导入新插件系统 # 定义日志配置 @@ -47,6 +47,54 @@ class ChatBot: except Exception as e: logger.error(f"创建PFC聊天失败: {e}") + + async def _process_commands_with_new_system(self, message: MessageRecv): + """使用新插件系统处理命令""" + try: + if not message.processed_plain_text: + await message.process() + + text = message.processed_plain_text + + # 使用新的组件注册中心查找命令 + command_result = component_registry.find_command_by_text(text) + if command_result: + command_class, matched_groups = command_result + + # 创建命令实例 + command_instance = command_class(message) + command_instance.set_matched_groups(matched_groups) + + try: + # 执行命令 + success, response = await command_instance.execute() + + # 记录命令执行结果 + if success: + logger.info(f"命令执行成功: {command_class.__name__}") + else: + logger.warning(f"命令执行失败: {command_class.__name__} - {response}") + + return True, response, False # 找到命令,不继续处理 + + except Exception as e: + logger.error(f"执行命令时出错: {command_class.__name__} - {e}") + import traceback + logger.error(traceback.format_exc()) + + try: + await command_instance.send_reply(f"命令执行出错: {str(e)}") + except Exception as send_error: + logger.error(f"发送错误消息失败: {send_error}") + + return True, str(e), False # 命令出错,不继续处理 + + # 没有找到命令,继续处理消息 + return False, None, True + + except Exception as e: + logger.error(f"处理命令时出错: {e}") + return False, None, True # 出错时继续处理消息 async def message_process(self, message_data: Dict[str, Any]) -> None: """处理转化后的统一格式消息 @@ -90,10 +138,10 @@ class ChatBot: # 处理消息内容,生成纯文本 await message.process() - - # 命令处理 - 在消息处理的早期阶段检查并处理命令 - is_command, cmd_result, continue_process = await command_manager.process_command(message) - + + # 命令处理 - 使用新插件系统检查并处理命令 + is_command, cmd_result, continue_process = await self._process_commands_with_new_system(message) + # 如果是命令且不需要继续处理,则直接返回 if is_command and not continue_process: logger.info(f"命令处理完成,跳过后续消息处理: {cmd_result}") diff --git a/src/main.py b/src/main.py index 004b68ba2..3052c35ef 100644 --- a/src/main.py +++ b/src/main.py @@ -22,6 +22,11 @@ from .api.main import start_api_server # 导入actions模块,确保装饰器被执行 import src.chat.actions.default_actions # noqa +# 导入新的插件管理器 +from src.plugin_system.core.plugin_manager import plugin_manager +# 导入消息API和traceback模块 +from src.common.message import global_api +import traceback # 条件导入记忆系统 if global_config.memory.enable_memory: @@ -45,8 +50,6 @@ class MainSystem: self.individuality: Individuality = individuality # 使用消息API替代直接的FastAPI实例 - from src.common.message import global_api - self.app: MessageServer = global_api self.server: Server = global_server @@ -134,29 +137,15 @@ class MainSystem: raise def _load_all_actions(self): - """加载所有actions和commands,使用统一的插件加载器""" + """加载所有actions和commands,使用新的插件系统""" try: - # 导入统一的插件加载器 - from src.plugins.plugin_loader import plugin_loader - - # 使用统一的插件加载器加载所有插件组件 - loaded_actions, loaded_commands = plugin_loader.load_all_plugins() - - # 加载命令处理系统 - try: - # 导入命令处理系统 - - logger.success("命令处理系统加载成功") - except Exception as e: - logger.error(f"加载命令处理系统失败: {e}") - import traceback - - logger.error(traceback.format_exc()) - + # 使用新的插件管理器加载所有插件 + plugin_count, component_count = plugin_manager.load_all_plugins() + + logger.success(f"插件系统加载成功: {plugin_count} 个插件,{component_count} 个组件") + except Exception as e: logger.error(f"加载插件失败: {e}") - import traceback - logger.error(traceback.format_exc()) async def schedule_tasks(self): diff --git a/src/plugin_system/README.md b/src/plugin_system/README.md new file mode 100644 index 000000000..b8e943892 --- /dev/null +++ b/src/plugin_system/README.md @@ -0,0 +1,169 @@ +# MaiBot 插件系统 - 重构版 + +## 目录结构说明 + +经过重构,插件系统现在采用清晰的**系统核心**与**插件内容**分离的架构: + +``` +src/ +├── plugin_system/ # 🔧 系统核心 - 插件框架本身 +│ ├── __init__.py # 统一导出接口 +│ ├── core/ # 核心管理 +│ │ ├── plugin_manager.py +│ │ ├── component_registry.py +│ │ └── __init__.py +│ ├── apis/ # API接口 +│ │ ├── plugin_api.py # 统一API聚合 +│ │ ├── message_api.py +│ │ ├── llm_api.py +│ │ ├── database_api.py +│ │ ├── config_api.py +│ │ ├── utils_api.py +│ │ ├── stream_api.py +│ │ ├── hearflow_api.py +│ │ └── __init__.py +│ ├── base/ # 基础类 +│ │ ├── base_plugin.py +│ │ ├── base_action.py +│ │ ├── base_command.py +│ │ ├── component_types.py +│ │ └── __init__.py +│ └── registry/ # 注册相关(预留) +└── plugins/ # 🔌 插件内容 - 具体的插件实现 + ├── built_in/ # 内置插件 + │ ├── system_actions/ # 系统内置Action + │ └── system_commands/# 系统内置Command + └── examples/ # 示例插件 + └── simple_plugin/ + ├── plugin.py + └── config.toml +``` + +## 架构优势 + +### 1. 职责清晰 +- **`src/plugin_system/`** - 系统提供的框架、API和基础设施 +- **`src/plugins/`** - 用户开发或使用的具体插件 + +### 2. 导入简化 +```python +# 统一导入接口 +from src.plugin_system import ( + BasePlugin, register_plugin, BaseAction, BaseCommand, + ActionInfo, CommandInfo, PluginAPI +) +``` + +### 3. 模块化设计 +- 各个子模块都有清晰的职责和接口 +- 支持按需导入特定功能 +- 便于维护和扩展 + +## 快速开始 + +### 创建简单插件 + +```python +from src.plugin_system import BasePlugin, register_plugin, BaseAction, ActionInfo + +class MyAction(BaseAction): + async def execute(self): + return True, "Hello from my plugin!" + +@register_plugin +class MyPlugin(BasePlugin): + plugin_name = "my_plugin" + plugin_description = "我的第一个插件" + + def get_plugin_components(self): + return [( + ActionInfo(name="my_action", description="我的动作"), + MyAction + )] +``` + +### 使用系统API + +```python +class MyAction(BaseAction): + async def execute(self): + # 发送消息 + await self.api.send_text_to_group( + self.api.get_service("chat_stream"), + "Hello World!" + ) + + # 数据库操作 + data = await self.api.db_get("table", "key") + + # LLM调用 + response = await self.api.llm_text_request("你好") + + return True, response +``` + +## 兼容性迁移 + +### 现有Action迁移 +```python +# 旧方式 +from src.chat.actions.base_action import BaseAction, register_action + +# 新方式 +from src.plugin_system import BaseAction, register_plugin +from src.plugin_system.base.component_types import ActionInfo + +# 将Action封装到Plugin中 +@register_plugin +class MyActionPlugin(BasePlugin): + plugin_name = "my_action_plugin" + + def get_plugin_components(self): + return [(ActionInfo(...), MyAction)] +``` + +### 现有Command迁移 +```python +# 旧方式 +from src.chat.command.command_handler import BaseCommand, register_command + +# 新方式 +from src.plugin_system import BaseCommand, register_plugin +from src.plugin_system.base.component_types import CommandInfo + +# 将Command封装到Plugin中 +@register_plugin +class MyCommandPlugin(BasePlugin): + plugin_name = "my_command_plugin" + + def get_plugin_components(self): + return [(CommandInfo(...), MyCommand)] +``` + +## 扩展指南 + +### 添加新的组件类型 +1. 在 `component_types.py` 中定义新的组件类型 +2. 在 `component_registry.py` 中添加对应的注册逻辑 +3. 创建对应的基类 + +### 添加新的API +1. 在 `apis/` 目录下创建新的API模块 +2. 在 `plugin_api.py` 中集成新API +3. 更新 `__init__.py` 导出接口 + +## 最佳实践 + +1. **单一插件包含相关组件** - 一个插件可以包含多个相关的Action和Command +2. **使用配置文件** - 通过TOML配置文件管理插件行为 +3. **合理的组件命名** - 使用描述性的组件名称 +4. **充分的错误处理** - 在组件中妥善处理异常 +5. **详细的文档** - 为插件和组件编写清晰的文档 + +## 内置插件规划 + +- **系统核心插件** - 将现有的内置Action/Command迁移为系统插件 +- **工具插件** - 常用的工具和实用功能 +- **示例插件** - 帮助开发者学习的示例代码 + +这个重构保持了向后兼容性,同时提供了更清晰、更易维护的架构。 \ No newline at end of file diff --git a/src/plugin_system/__init__.py b/src/plugin_system/__init__.py new file mode 100644 index 000000000..ebb51ed79 --- /dev/null +++ b/src/plugin_system/__init__.py @@ -0,0 +1,47 @@ +""" +MaiBot 插件系统 + +提供统一的插件开发和管理框架 +""" + +# 导出主要的公共接口 +from src.plugin_system.base.base_plugin import BasePlugin, register_plugin +from src.plugin_system.base.base_action import BaseAction +from src.plugin_system.base.base_command import BaseCommand +from src.plugin_system.base.component_types import ( + ComponentType, ActionActivationType, ChatMode, + ComponentInfo, ActionInfo, CommandInfo, PluginInfo +) +from src.plugin_system.apis.plugin_api import PluginAPI, create_plugin_api, create_command_api +from src.plugin_system.core.plugin_manager import plugin_manager +from src.plugin_system.core.component_registry import component_registry + +__version__ = "1.0.0" + +__all__ = [ + # 基础类 + 'BasePlugin', + 'BaseAction', + 'BaseCommand', + + # 类型定义 + 'ComponentType', + 'ActionActivationType', + 'ChatMode', + 'ComponentInfo', + 'ActionInfo', + 'CommandInfo', + 'PluginInfo', + + # API接口 + 'PluginAPI', + 'create_plugin_api', + 'create_command_api', + + # 管理器 + 'plugin_manager', + 'component_registry', + + # 装饰器 + 'register_plugin', +] \ No newline at end of file diff --git a/src/plugin_system/apis/API使用指南.md b/src/plugin_system/apis/API使用指南.md new file mode 100644 index 000000000..c34f7d6da --- /dev/null +++ b/src/plugin_system/apis/API使用指南.md @@ -0,0 +1,172 @@ +# API使用指南 + +插件系统提供了多种API访问方式,根据使用场景选择合适的API类。 + +## 📊 API分类 + +### 🔗 ActionAPI - 需要Action依赖 +**适用场景**:在Action组件中使用,需要访问聊天上下文 +```python +from src.plugin_system.apis import ActionAPI + +class MyAction(BaseAction): + async def execute(self): + # Action已内置ActionAPI,可以直接使用 + await self.api.send_message("text", "Hello") + await self.api.store_action_info(action_prompt_display="执行了动作") +``` + +**包含功能**: +- ✅ 发送消息(需要chat_stream、expressor等) +- ✅ 数据库操作(需要thinking_id、action_data等) + +### 🔧 IndependentAPI - 独立功能 +**适用场景**:在Command组件中使用,或需要独立工具功能 +```python +from src.plugin_system.apis import IndependentAPI + +class MyCommand(BaseCommand): + async def execute(self): + # 创建独立API实例 + api = IndependentAPI(log_prefix="[MyCommand]") + + # 使用独立功能 + models = api.get_available_models() + config = api.get_global_config("some_key") + timestamp = api.get_timestamp() +``` + +**包含功能**: +- ✅ LLM模型调用 +- ✅ 配置读取 +- ✅ 工具函数(时间、文件、ID生成等) +- ✅ 聊天流查询 +- ✅ 心流状态控制 + +### ⚡ StaticAPI - 静态访问 +**适用场景**:简单工具调用,不需要实例化 +```python +from src.plugin_system.apis import StaticAPI + +# 直接调用静态方法 +models = StaticAPI.get_available_models() +config = StaticAPI.get_global_config("bot.nickname") +timestamp = StaticAPI.get_timestamp() +unique_id = StaticAPI.generate_unique_id() + +# 异步方法 +result = await StaticAPI.generate_with_model(prompt, model_config) +chat_stream = StaticAPI.get_chat_stream_by_group_id("123456") +``` + +## 🎯 使用建议 + +### Action组件开发 +```python +class MyAction(BaseAction): + # 激活条件直接在类中定义 + focus_activation_type = ActionActivationType.KEYWORD + activation_keywords = ["测试"] + + async def execute(self): + # 使用内置的ActionAPI + success = await self.api.send_message("text", "处理中...") + + # 存储执行记录 + await self.api.store_action_info( + action_prompt_display="执行了测试动作" + ) + + return True, "完成" +``` + +### Command组件开发 +```python +class MyCommand(BaseCommand): + # 命令模式直接在类中定义 + command_pattern = r"^/test\s+(?P\w+)$" + command_help = "测试命令" + + async def execute(self): + # 使用独立API + api = IndependentAPI(log_prefix="[TestCommand]") + + # 获取配置 + max_length = api.get_global_config("test.max_length", 100) + + # 生成内容(如果需要) + if api.get_available_models(): + models = api.get_available_models() + first_model = list(models.values())[0] + + success, response, _, _ = await api.generate_with_model( + "生成测试回复", first_model + ) + + if success: + await self.send_reply(response) +``` + +### 独立工具使用 +```python +# 不在插件环境中的独立使用 +from src.plugin_system.apis import StaticAPI + +def some_utility_function(): + # 获取配置 + bot_name = StaticAPI.get_global_config("bot.nickname", "Bot") + + # 生成ID + request_id = StaticAPI.generate_unique_id() + + # 格式化时间 + current_time = StaticAPI.format_time() + + return f"{bot_name}_{request_id}_{current_time}" +``` + +## 🔄 迁移指南 + +### 从原PluginAPI迁移 + +**原来的用法**: +```python +# 原来需要导入完整PluginAPI +from src.plugin_system.apis import PluginAPI + +api = PluginAPI(chat_stream=..., expressor=...) +await api.send_message("text", "Hello") +config = api.get_global_config("key") +``` + +**新的用法**: +```python +# 方式1:继续使用原PluginAPI(不变) +from src.plugin_system.apis import PluginAPI + +# 方式2:使用分类API(推荐) +from src.plugin_system.apis import ActionAPI, IndependentAPI + +# Action相关功能 +action_api = ActionAPI(chat_stream=..., expressor=...) +await action_api.send_message("text", "Hello") + +# 独立功能 +config = IndependentAPI().get_global_config("key") +# 或者 +config = StaticAPI.get_global_config("key") +``` + +## 📋 API对照表 + +| 功能类别 | 原PluginAPI | ActionAPI | IndependentAPI | StaticAPI | +|---------|-------------|-----------|----------------|-----------| +| 发送消息 | ✅ | ✅ | ❌ | ❌ | +| 数据库操作 | ✅ | ✅ | ❌ | ❌ | +| LLM调用 | ✅ | ❌ | ✅ | ✅ | +| 配置读取 | ✅ | ❌ | ✅ | ✅ | +| 工具函数 | ✅ | ❌ | ✅ | ✅ | +| 聊天流查询 | ✅ | ❌ | ✅ | ✅ | +| 心流控制 | ✅ | ❌ | ✅ | ✅ | + +这样的分类让插件开发者可以更明确地知道需要什么样的API,避免不必要的依赖注入。 \ No newline at end of file diff --git a/src/plugin_system/apis/__init__.py b/src/plugin_system/apis/__init__.py new file mode 100644 index 000000000..5c2948d12 --- /dev/null +++ b/src/plugin_system/apis/__init__.py @@ -0,0 +1,37 @@ +""" +插件API模块 + +提供插件可以使用的各种API接口 +""" + +from src.plugin_system.apis.plugin_api import PluginAPI, create_plugin_api, create_command_api +from src.plugin_system.apis.message_api import MessageAPI +from src.plugin_system.apis.llm_api import LLMAPI +from src.plugin_system.apis.database_api import DatabaseAPI +from src.plugin_system.apis.config_api import ConfigAPI +from src.plugin_system.apis.utils_api import UtilsAPI +from src.plugin_system.apis.stream_api import StreamAPI +from src.plugin_system.apis.hearflow_api import HearflowAPI + +# 新增:分类的API聚合 +from src.plugin_system.apis.action_apis import ActionAPI +from src.plugin_system.apis.independent_apis import IndependentAPI, StaticAPI + +__all__ = [ + # 原有统一API + 'PluginAPI', + 'create_plugin_api', + 'create_command_api', + # 原有单独API + 'MessageAPI', + 'LLMAPI', + 'DatabaseAPI', + 'ConfigAPI', + 'UtilsAPI', + 'StreamAPI', + 'HearflowAPI', + # 新增分类API + 'ActionAPI', # 需要Action依赖的API + 'IndependentAPI', # 独立API + 'StaticAPI', # 静态API +] \ No newline at end of file diff --git a/src/plugin_system/apis/action_apis.py b/src/plugin_system/apis/action_apis.py new file mode 100644 index 000000000..e926bda3c --- /dev/null +++ b/src/plugin_system/apis/action_apis.py @@ -0,0 +1,85 @@ +""" +Action相关API聚合模块 + +聚合了需要Action组件依赖的API,这些API需要通过Action初始化时注入的服务对象才能正常工作。 +包括:MessageAPI、DatabaseAPI等需要chat_stream、expressor等服务的API。 +""" + +from src.plugin_system.apis.message_api import MessageAPI +from src.plugin_system.apis.database_api import DatabaseAPI +from src.common.logger_manager import get_logger + +logger = get_logger("action_apis") + +class ActionAPI(MessageAPI, DatabaseAPI): + """ + Action相关API聚合类 + + 聚合了需要Action组件依赖的API功能。这些API需要以下依赖: + - _services: 包含chat_stream、expressor、replyer、observations等服务对象 + - log_prefix: 日志前缀 + - thinking_id: 思考ID + - cycle_timers: 计时器 + - action_data: Action数据 + + 使用场景: + - 在Action组件中使用,需要发送消息、存储数据等功能 + - 需要访问聊天上下文和执行环境的操作 + """ + + def __init__(self, + chat_stream=None, + expressor=None, + replyer=None, + observations=None, + log_prefix: str = "[ActionAPI]", + thinking_id: str = "", + cycle_timers: dict = None, + action_data: dict = None): + """ + 初始化Action相关API + + Args: + chat_stream: 聊天流对象 + expressor: 表达器对象 + replyer: 回复器对象 + observations: 观察列表 + log_prefix: 日志前缀 + thinking_id: 思考ID + cycle_timers: 计时器字典 + action_data: Action数据 + """ + # 存储依赖对象 + self._services = { + "chat_stream": chat_stream, + "expressor": expressor, + "replyer": replyer, + "observations": observations or [] + } + + self.log_prefix = log_prefix + self.thinking_id = thinking_id + self.cycle_timers = cycle_timers or {} + self.action_data = action_data or {} + + logger.debug(f"{self.log_prefix} ActionAPI 初始化完成") + + def set_chat_stream(self, chat_stream): + """设置聊天流对象""" + self._services["chat_stream"] = chat_stream + logger.debug(f"{self.log_prefix} 设置聊天流") + + def set_expressor(self, expressor): + """设置表达器对象""" + self._services["expressor"] = expressor + logger.debug(f"{self.log_prefix} 设置表达器") + + def set_replyer(self, replyer): + """设置回复器对象""" + self._services["replyer"] = replyer + logger.debug(f"{self.log_prefix} 设置回复器") + + def set_observations(self, observations): + """设置观察列表""" + self._services["observations"] = observations or [] + logger.debug(f"{self.log_prefix} 设置观察列表") \ No newline at end of file diff --git a/src/chat/actions/plugin_api/config_api.py b/src/plugin_system/apis/config_api.py similarity index 100% rename from src/chat/actions/plugin_api/config_api.py rename to src/plugin_system/apis/config_api.py diff --git a/src/chat/actions/plugin_api/database_api.py b/src/plugin_system/apis/database_api.py similarity index 100% rename from src/chat/actions/plugin_api/database_api.py rename to src/plugin_system/apis/database_api.py diff --git a/src/chat/actions/plugin_api/hearflow_api.py b/src/plugin_system/apis/hearflow_api.py similarity index 100% rename from src/chat/actions/plugin_api/hearflow_api.py rename to src/plugin_system/apis/hearflow_api.py diff --git a/src/plugin_system/apis/independent_apis.py b/src/plugin_system/apis/independent_apis.py new file mode 100644 index 000000000..971ed9c5d --- /dev/null +++ b/src/plugin_system/apis/independent_apis.py @@ -0,0 +1,132 @@ +""" +独立API聚合模块 + +聚合了不需要Action组件依赖的API,这些API可以独立使用,不需要注入服务对象。 +包括:LLMAPI、ConfigAPI、UtilsAPI、StreamAPI、HearflowAPI等独立功能的API。 +""" + +from src.plugin_system.apis.llm_api import LLMAPI +from src.plugin_system.apis.config_api import ConfigAPI +from src.plugin_system.apis.utils_api import UtilsAPI +from src.plugin_system.apis.stream_api import StreamAPI +from src.plugin_system.apis.hearflow_api import HearflowAPI +from src.common.logger_manager import get_logger + +logger = get_logger("independent_apis") + +class IndependentAPI(LLMAPI, ConfigAPI, UtilsAPI, StreamAPI, HearflowAPI): + """ + 独立API聚合类 + + 聚合了不需要Action组件依赖的API功能。这些API的特点: + - 不需要chat_stream、expressor等服务对象 + - 可以独立调用,不依赖Action执行上下文 + - 主要是工具类方法和配置查询方法 + + 包含的API: + - LLMAPI: LLM模型调用(仅需要全局配置) + - ConfigAPI: 配置读取(使用全局配置) + - UtilsAPI: 工具方法(文件操作、时间处理等) + - StreamAPI: 聊天流查询(使用ChatManager) + - HearflowAPI: 心流状态控制(使用heartflow) + + 使用场景: + - 在Command组件中使用 + - 独立的工具函数调用 + - 配置查询和系统状态检查 + """ + + def __init__(self, log_prefix: str = "[IndependentAPI]"): + """ + 初始化独立API + + Args: + log_prefix: 日志前缀,用于区分不同的调用来源 + """ + self.log_prefix = log_prefix + + logger.debug(f"{self.log_prefix} IndependentAPI 初始化完成") + +# 提供便捷的静态访问方式 +class StaticAPI: + """ + 静态API类 + + 提供完全静态的API访问方式,不需要实例化,适合简单的工具调用。 + """ + + # LLM相关 + @staticmethod + def get_available_models(): + """获取可用的LLM模型""" + api = LLMAPI() + return api.get_available_models() + + @staticmethod + async def generate_with_model(prompt: str, model_config: dict, **kwargs): + """使用LLM生成内容""" + api = LLMAPI() + api.log_prefix = "[StaticAPI]" + return await api.generate_with_model(prompt, model_config, **kwargs) + + # 配置相关 + @staticmethod + def get_global_config(key: str, default=None): + """获取全局配置""" + api = ConfigAPI() + return api.get_global_config(key, default) + + @staticmethod + async def get_user_id_by_name(person_name: str): + """根据用户名获取用户ID""" + api = ConfigAPI() + return await api.get_user_id_by_person_name(person_name) + + # 工具相关 + @staticmethod + def get_timestamp(): + """获取当前时间戳""" + api = UtilsAPI() + return api.get_timestamp() + + @staticmethod + def format_time(timestamp=None, format_str="%Y-%m-%d %H:%M:%S"): + """格式化时间""" + api = UtilsAPI() + return api.format_time(timestamp, format_str) + + @staticmethod + def generate_unique_id(): + """生成唯一ID""" + api = UtilsAPI() + return api.generate_unique_id() + + # 聊天流相关 + @staticmethod + def get_chat_stream_by_group_id(group_id: str, platform: str = "qq"): + """通过群ID获取聊天流""" + api = StreamAPI() + api.log_prefix = "[StaticAPI]" + return api.get_chat_stream_by_group_id(group_id, platform) + + @staticmethod + def get_all_group_chat_streams(platform: str = "qq"): + """获取所有群聊聊天流""" + api = StreamAPI() + api.log_prefix = "[StaticAPI]" + return api.get_all_group_chat_streams(platform) + + # 心流相关 + @staticmethod + async def get_sub_hearflow_by_chat_id(chat_id: str): + """获取子心流""" + api = HearflowAPI() + api.log_prefix = "[StaticAPI]" + return await api.get_sub_hearflow_by_chat_id(chat_id) + + @staticmethod + async def set_sub_hearflow_chat_state(chat_id: str, target_state): + """设置子心流状态""" + api = HearflowAPI() + api.log_prefix = "[StaticAPI]" + return await api.set_sub_hearflow_chat_state(chat_id, target_state) \ No newline at end of file diff --git a/src/chat/actions/plugin_api/llm_api.py b/src/plugin_system/apis/llm_api.py similarity index 100% rename from src/chat/actions/plugin_api/llm_api.py rename to src/plugin_system/apis/llm_api.py diff --git a/src/chat/actions/plugin_api/message_api.py b/src/plugin_system/apis/message_api.py similarity index 81% rename from src/chat/actions/plugin_api/message_api.py rename to src/plugin_system/apis/message_api.py index 0b4b97f1b..d022d0b6b 100644 --- a/src/chat/actions/plugin_api/message_api.py +++ b/src/plugin_system/apis/message_api.py @@ -173,15 +173,19 @@ class MessageAPI: bool: 是否发送成功 """ try: - expressor: DefaultExpressor = self._services.get("expressor") - chat_stream: ChatStream = self._services.get("chat_stream") + # 安全获取服务和日志前缀 + services = getattr(self, '_services', {}) + log_prefix = getattr(self, 'log_prefix', '[MessageAPI]') + + expressor: DefaultExpressor = services.get("expressor") + chat_stream: ChatStream = services.get("chat_stream") if not expressor or not chat_stream: - logger.error(f"{self.log_prefix} 无法发送消息:缺少必要的内部服务") + logger.error(f"{log_prefix} 无法发送消息:缺少必要的内部服务") return False # 获取锚定消息(如果有) - observations = self._services.get("observations", []) + observations = services.get("observations", []) if len(observations) > 0: chatting_observation: ChattingObservation = next( @@ -197,7 +201,7 @@ class MessageAPI: # 如果没有找到锚点消息,创建一个占位符 if not anchor_message: - logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符") + logger.info(f"{log_prefix} 未找到锚点消息,创建占位符") anchor_message = await create_empty_anchor_message( chat_stream.platform, chat_stream.group_info, chat_stream ) @@ -217,7 +221,8 @@ class MessageAPI: return success except Exception as e: - logger.error(f"{self.log_prefix} 发送消息时出错: {e}") + log_prefix = getattr(self, 'log_prefix', '[MessageAPI]') + logger.error(f"{log_prefix} 发送消息时出错: {e}") traceback.print_exc() return False @@ -231,18 +236,22 @@ class MessageAPI: Returns: bool: 是否发送成功 """ - expressor: DefaultExpressor = self._services.get("expressor") - chat_stream: ChatStream = self._services.get("chat_stream") + # 安全获取服务和日志前缀 + services = getattr(self, '_services', {}) + log_prefix = getattr(self, 'log_prefix', '[MessageAPI]') + + expressor: DefaultExpressor = services.get("expressor") + chat_stream: ChatStream = services.get("chat_stream") if not expressor or not chat_stream: - logger.error(f"{self.log_prefix} 无法发送消息:缺少必要的内部服务") + logger.error(f"{log_prefix} 无法发送消息:缺少必要的内部服务") return False # 构造简化的动作数据 reply_data = {"text": text, "target": target or "", "emojis": []} # 获取锚定消息(如果有) - observations = self._services.get("observations", []) + observations = services.get("observations", []) # 查找 ChattingObservation 实例 chatting_observation = None @@ -252,14 +261,14 @@ class MessageAPI: break if not chatting_observation: - logger.warning(f"{self.log_prefix} 未找到 ChattingObservation 实例,创建占位符") + logger.warning(f"{log_prefix} 未找到 ChattingObservation 实例,创建占位符") anchor_message = await create_empty_anchor_message( chat_stream.platform, chat_stream.group_info, chat_stream ) else: anchor_message = chatting_observation.search_message_by_text(reply_data["target"]) if not anchor_message: - logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符") + logger.info(f"{log_prefix} 未找到锚点消息,创建占位符") anchor_message = await create_empty_anchor_message( chat_stream.platform, chat_stream.group_info, chat_stream ) @@ -267,12 +276,16 @@ class MessageAPI: anchor_message.update_chat_stream(chat_stream) # 调用内部方法发送消息 + cycle_timers = getattr(self, 'cycle_timers', {}) + reasoning = getattr(self, 'reasoning', '插件生成') + thinking_id = getattr(self, 'thinking_id', 'plugin_thinking') + success, _ = await expressor.deal_reply( - cycle_timers=self.cycle_timers, + cycle_timers=cycle_timers, action_data=reply_data, anchor_message=anchor_message, - reasoning=self.reasoning, - thinking_id=self.thinking_id, + reasoning=reasoning, + thinking_id=thinking_id, ) return success @@ -289,18 +302,22 @@ class MessageAPI: Returns: bool: 是否发送成功 """ - replyer: DefaultReplyer = self._services.get("replyer") - chat_stream: ChatStream = self._services.get("chat_stream") + # 安全获取服务和日志前缀 + services = getattr(self, '_services', {}) + log_prefix = getattr(self, 'log_prefix', '[MessageAPI]') + + replyer: DefaultReplyer = services.get("replyer") + chat_stream: ChatStream = services.get("chat_stream") if not replyer or not chat_stream: - logger.error(f"{self.log_prefix} 无法发送消息:缺少必要的内部服务") + logger.error(f"{log_prefix} 无法发送消息:缺少必要的内部服务") return False # 构造简化的动作数据 reply_data = {"target": target or "", "extra_info_block": extra_info_block} # 获取锚定消息(如果有) - observations = self._services.get("observations", []) + observations = services.get("observations", []) # 查找 ChattingObservation 实例 chatting_observation = None @@ -310,14 +327,14 @@ class MessageAPI: break if not chatting_observation: - logger.warning(f"{self.log_prefix} 未找到 ChattingObservation 实例,创建占位符") + logger.warning(f"{log_prefix} 未找到 ChattingObservation 实例,创建占位符") anchor_message = await create_empty_anchor_message( chat_stream.platform, chat_stream.group_info, chat_stream ) else: anchor_message = chatting_observation.search_message_by_text(reply_data["target"]) if not anchor_message: - logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符") + logger.info(f"{log_prefix} 未找到锚点消息,创建占位符") anchor_message = await create_empty_anchor_message( chat_stream.platform, chat_stream.group_info, chat_stream ) @@ -325,12 +342,16 @@ class MessageAPI: anchor_message.update_chat_stream(chat_stream) # 调用内部方法发送消息 + cycle_timers = getattr(self, 'cycle_timers', {}) + reasoning = getattr(self, 'reasoning', '插件生成') + thinking_id = getattr(self, 'thinking_id', 'plugin_thinking') + success, _ = await replyer.deal_reply( - cycle_timers=self.cycle_timers, + cycle_timers=cycle_timers, action_data=reply_data, anchor_message=anchor_message, - reasoning=self.reasoning, - thinking_id=self.thinking_id, + reasoning=reasoning, + thinking_id=thinking_id, ) return success @@ -341,7 +362,8 @@ class MessageAPI: Returns: str: 聊天类型 ("group" 或 "private") """ - chat_stream: ChatStream = self._services.get("chat_stream") + services = getattr(self, '_services', {}) + chat_stream: ChatStream = services.get("chat_stream") if chat_stream and hasattr(chat_stream, "group_info"): return "group" if chat_stream.group_info else "private" return "unknown" @@ -356,7 +378,8 @@ class MessageAPI: List[Dict]: 消息列表,每个消息包含发送者、内容等信息 """ messages = [] - observations = self._services.get("observations", []) + services = getattr(self, '_services', {}) + observations = services.get("observations", []) if observations and len(observations) > 0: obs = observations[0] diff --git a/src/plugin_system/apis/plugin_api.py b/src/plugin_system/apis/plugin_api.py new file mode 100644 index 000000000..193df766e --- /dev/null +++ b/src/plugin_system/apis/plugin_api.py @@ -0,0 +1,158 @@ +# -*- coding: utf-8 -*- +""" +统一的插件API聚合模块 + +提供所有插件API功能的统一访问入口 +""" + +from typing import Dict, Any, Optional +from src.common.logger_manager import get_logger + +# 导入所有API模块 +from src.plugin_system.apis.message_api import MessageAPI +from src.plugin_system.apis.llm_api import LLMAPI +from src.plugin_system.apis.database_api import DatabaseAPI +from src.plugin_system.apis.config_api import ConfigAPI +from src.plugin_system.apis.utils_api import UtilsAPI +from src.plugin_system.apis.stream_api import StreamAPI +from src.plugin_system.apis.hearflow_api import HearflowAPI + +logger = get_logger("plugin_api") + + +class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI, HearflowAPI): + """ + 插件API聚合类 + + 集成了所有可供插件使用的API功能,提供统一的访问接口。 + 插件组件可以直接使用此API实例来访问各种功能。 + + 特性: + - 聚合所有API模块的功能 + - 支持依赖注入和配置 + - 提供统一的错误处理和日志记录 + """ + + def __init__(self, + chat_stream=None, + expressor=None, + replyer=None, + observations=None, + log_prefix: str = "[PluginAPI]"): + """ + 初始化插件API + + Args: + chat_stream: 聊天流对象 + expressor: 表达器对象 + replyer: 回复器对象 + observations: 观察列表 + log_prefix: 日志前缀 + """ + # 存储依赖对象 + self._services = { + "chat_stream": chat_stream, + "expressor": expressor, + "replyer": replyer, + "observations": observations or [] + } + + self.log_prefix = log_prefix + + # 调用所有父类的初始化 + super().__init__() + + logger.debug(f"{self.log_prefix} PluginAPI 初始化完成") + + def set_chat_stream(self, chat_stream): + """设置聊天流对象""" + self._services["chat_stream"] = chat_stream + logger.debug(f"{self.log_prefix} 设置聊天流: {getattr(chat_stream, 'stream_id', 'Unknown')}") + + def set_expressor(self, expressor): + """设置表达器对象""" + self._services["expressor"] = expressor + logger.debug(f"{self.log_prefix} 设置表达器") + + def set_replyer(self, replyer): + """设置回复器对象""" + self._services["replyer"] = replyer + logger.debug(f"{self.log_prefix} 设置回复器") + + def set_observations(self, observations): + """设置观察列表""" + self._services["observations"] = observations or [] + logger.debug(f"{self.log_prefix} 设置观察列表,数量: {len(observations or [])}") + + def get_service(self, service_name: str): + """获取指定的服务对象""" + return self._services.get(service_name) + + def has_service(self, service_name: str) -> bool: + """检查是否有指定的服务对象""" + return service_name in self._services and self._services[service_name] is not None + + +# 便捷的工厂函数 +def create_plugin_api(chat_stream=None, + expressor=None, + replyer=None, + observations=None, + log_prefix: str = "[Plugin]") -> PluginAPI: + """ + 创建插件API实例的便捷函数 + + Args: + chat_stream: 聊天流对象 + expressor: 表达器对象 + replyer: 回复器对象 + observations: 观察列表 + log_prefix: 日志前缀 + + Returns: + PluginAPI: 配置好的插件API实例 + """ + return PluginAPI( + chat_stream=chat_stream, + expressor=expressor, + replyer=replyer, + observations=observations, + log_prefix=log_prefix + ) + + +def create_command_api(message, log_prefix: str = "[Command]") -> PluginAPI: + """ + 为命令创建插件API实例的便捷函数 + + Args: + message: 消息对象,应该包含 chat_stream 等信息 + log_prefix: 日志前缀 + + Returns: + PluginAPI: 配置好的插件API实例 + """ + chat_stream = getattr(message, 'chat_stream', None) + + api = PluginAPI( + chat_stream=chat_stream, + log_prefix=log_prefix + ) + + return api + + +# 导出主要接口 +__all__ = [ + 'PluginAPI', + 'create_plugin_api', + 'create_command_api', + # 也可以导出各个API类供单独使用 + 'MessageAPI', + 'LLMAPI', + 'DatabaseAPI', + 'ConfigAPI', + 'UtilsAPI', + 'StreamAPI', + 'HearflowAPI' +] \ No newline at end of file diff --git a/src/chat/actions/plugin_api/stream_api.py b/src/plugin_system/apis/stream_api.py similarity index 100% rename from src/chat/actions/plugin_api/stream_api.py rename to src/plugin_system/apis/stream_api.py diff --git a/src/chat/actions/plugin_api/utils_api.py b/src/plugin_system/apis/utils_api.py similarity index 100% rename from src/chat/actions/plugin_api/utils_api.py rename to src/plugin_system/apis/utils_api.py diff --git a/src/plugin_system/base/__init__.py b/src/plugin_system/base/__init__.py new file mode 100644 index 000000000..16648443a --- /dev/null +++ b/src/plugin_system/base/__init__.py @@ -0,0 +1,27 @@ +""" +插件基础类模块 + +提供插件开发的基础类和类型定义 +""" + +from src.plugin_system.base.base_plugin import BasePlugin, register_plugin +from src.plugin_system.base.base_action import BaseAction +from src.plugin_system.base.base_command import BaseCommand +from src.plugin_system.base.component_types import ( + ComponentType, ActionActivationType, ChatMode, + ComponentInfo, ActionInfo, CommandInfo, PluginInfo +) + +__all__ = [ + 'BasePlugin', + 'BaseAction', + 'BaseCommand', + 'register_plugin', + 'ComponentType', + 'ActionActivationType', + 'ChatMode', + 'ComponentInfo', + 'ActionInfo', + 'CommandInfo', + 'PluginInfo', +] \ No newline at end of file diff --git a/src/plugin_system/base/base_action.py b/src/plugin_system/base/base_action.py new file mode 100644 index 000000000..558fbb3fb --- /dev/null +++ b/src/plugin_system/base/base_action.py @@ -0,0 +1,120 @@ +from abc import ABC, abstractmethod +from typing import Tuple, Dict, Any, Optional +from src.common.logger_manager import get_logger +from src.plugin_system.apis.plugin_api import PluginAPI +from src.plugin_system.base.component_types import ActionActivationType, ChatMode, ActionInfo, ComponentType + +logger = get_logger("base_action") + +class BaseAction(ABC): + """Action组件基类 + + Action是插件的一种组件类型,用于处理聊天中的动作逻辑 + + 子类可以通过类属性定义激活条件: + - focus_activation_type: 专注模式激活类型 + - normal_activation_type: 普通模式激活类型 + - activation_keywords: 激活关键词列表 + - keyword_case_sensitive: 关键词是否区分大小写 + - mode_enable: 启用的聊天模式 + - parallel_action: 是否允许并行执行 + - random_activation_probability: 随机激活概率 + - llm_judge_prompt: LLM判断提示词 + """ + + # 默认激活设置(子类可以覆盖) + focus_activation_type: ActionActivationType = ActionActivationType.NEVER + normal_activation_type: ActionActivationType = ActionActivationType.NEVER + activation_keywords: list = [] + keyword_case_sensitive: bool = False + mode_enable: ChatMode = ChatMode.ALL + parallel_action: bool = True + random_activation_probability: float = 0.0 + llm_judge_prompt: str = "" + + def __init__(self, + action_data: dict, + reasoning: str, + cycle_timers: dict, + thinking_id: str, + **kwargs): + """初始化Action组件 + + Args: + action_data: 动作数据 + reasoning: 执行该动作的理由 + cycle_timers: 计时器字典 + thinking_id: 思考ID + **kwargs: 其他参数(包含服务对象) + """ + self.action_data = action_data + self.reasoning = reasoning + self.cycle_timers = cycle_timers + self.thinking_id = thinking_id + + # 创建API实例 + self.api = PluginAPI( + chat_stream=kwargs.get("chat_stream"), + expressor=kwargs.get("expressor"), + replyer=kwargs.get("replyer"), + observations=kwargs.get("observations"), + log_prefix=kwargs.get("log_prefix", "") + ) + + self.log_prefix = kwargs.get("log_prefix", "") + + logger.debug(f"{self.log_prefix} Action组件初始化完成") + + async def send_reply(self, content: str) -> bool: + """发送回复消息 + + Args: + content: 回复内容 + + Returns: + bool: 是否发送成功 + """ + return await self.api.send_message("text", content) + + @classmethod + def get_action_info(cls, name: str = None, description: str = None) -> 'ActionInfo': + """从类属性生成ActionInfo + + Args: + name: Action名称,如果不提供则使用类名 + description: Action描述,如果不提供则使用类文档字符串 + + Returns: + ActionInfo: 生成的Action信息对象 + """ + + + # 自动生成名称和描述 + if name is None: + name = cls.__name__.lower().replace('action', '') + if description is None: + description = cls.__doc__ or f"{cls.__name__} Action组件" + description = description.strip().split('\n')[0] # 取第一行作为描述 + + return ActionInfo( + name=name, + component_type=ComponentType.ACTION, + description=description, + focus_activation_type=cls.focus_activation_type, + normal_activation_type=cls.normal_activation_type, + activation_keywords=cls.activation_keywords.copy() if cls.activation_keywords else [], + keyword_case_sensitive=cls.keyword_case_sensitive, + mode_enable=cls.mode_enable, + parallel_action=cls.parallel_action, + random_activation_probability=cls.random_activation_probability, + llm_judge_prompt=cls.llm_judge_prompt + ) + + @abstractmethod + async def execute(self) -> Tuple[bool, str]: + """执行Action的抽象方法,子类必须实现 + + Returns: + Tuple[bool, str]: (是否执行成功, 回复文本) + """ + pass \ No newline at end of file diff --git a/src/plugin_system/base/base_command.py b/src/plugin_system/base/base_command.py new file mode 100644 index 000000000..ac2446bed --- /dev/null +++ b/src/plugin_system/base/base_command.py @@ -0,0 +1,113 @@ +from abc import ABC, abstractmethod +from typing import Dict, Tuple, Optional, List +from src.common.logger_manager import get_logger +from src.plugin_system.apis.plugin_api import PluginAPI +from src.plugin_system.base.component_types import CommandInfo, ComponentType +from src.chat.message_receive.message import MessageRecv + +logger = get_logger("base_command") + +class BaseCommand(ABC): + """Command组件基类 + + Command是插件的一种组件类型,用于处理命令请求 + + 子类可以通过类属性定义命令模式: + - command_pattern: 命令匹配的正则表达式 + - command_help: 命令帮助信息 + - command_examples: 命令使用示例列表 + """ + + # 默认命令设置(子类可以覆盖) + command_pattern: str = "" + command_help: str = "" + command_examples: List[str] = [] + + def __init__(self, message: MessageRecv): + """初始化Command组件 + + Args: + message: 接收到的消息对象 + """ + self.message = message + self.matched_groups: Dict[str, str] = {} # 存储正则表达式匹配的命名组 + + # 创建API实例 + self.api = PluginAPI( + chat_stream=message.chat_stream, + log_prefix=f"[Command]" + ) + + self.log_prefix = f"[Command]" + + logger.debug(f"{self.log_prefix} Command组件初始化完成") + + def set_matched_groups(self, groups: Dict[str, str]) -> None: + """设置正则表达式匹配的命名组 + + Args: + groups: 正则表达式匹配的命名组 + """ + self.matched_groups = groups + + @abstractmethod + async def execute(self) -> Tuple[bool, Optional[str]]: + """执行Command的抽象方法,子类必须实现 + + Returns: + Tuple[bool, Optional[str]]: (是否执行成功, 可选的回复消息) + """ + pass + + async def send_reply(self, content: str) -> None: + """发送回复消息 + + Args: + content: 回复内容 + """ + # 获取聊天流信息 + chat_stream = self.message.chat_stream + + if chat_stream.group_info: + # 群聊 + await self.api.send_text_to_group( + text=content, + group_id=str(chat_stream.group_info.group_id), + platform=chat_stream.platform + ) + else: + # 私聊 + await self.api.send_text_to_user( + text=content, + user_id=str(chat_stream.user_info.user_id), + platform=chat_stream.platform + ) + + @classmethod + def get_command_info(cls, name: str = None, description: str = None) -> 'CommandInfo': + """从类属性生成CommandInfo + + Args: + name: Command名称,如果不提供则使用类名 + description: Command描述,如果不提供则使用类文档字符串 + + Returns: + CommandInfo: 生成的Command信息对象 + """ + + + # 自动生成名称和描述 + if name is None: + name = cls.__name__.lower().replace('command', '') + if description is None: + description = cls.__doc__ or f"{cls.__name__} Command组件" + description = description.strip().split('\n')[0] # 取第一行作为描述 + + return CommandInfo( + name=name, + component_type=ComponentType.COMMAND, + description=description, + command_pattern=cls.command_pattern, + command_help=cls.command_help, + command_examples=cls.command_examples.copy() if cls.command_examples else [] + ) \ No newline at end of file diff --git a/src/plugin_system/base/base_plugin.py b/src/plugin_system/base/base_plugin.py new file mode 100644 index 000000000..758edff07 --- /dev/null +++ b/src/plugin_system/base/base_plugin.py @@ -0,0 +1,226 @@ +from abc import ABC, abstractmethod +from typing import Dict, List, Type, Optional, Any +import os +import inspect +import toml +from src.common.logger_manager import get_logger +from src.plugin_system.base.component_types import ( + PluginInfo, ComponentInfo, ActionInfo, CommandInfo, + ComponentType, ActionActivationType, ChatMode +) +from src.plugin_system.core.component_registry import component_registry + +logger = get_logger("base_plugin") + +# 全局插件类注册表 +_plugin_classes: Dict[str, Type['BasePlugin']] = {} + +class BasePlugin(ABC): + """插件基类 + + 所有插件都应该继承这个基类,一个插件可以包含多种组件: + - Action组件:处理聊天中的动作 + - Command组件:处理命令请求 + - 未来可扩展:Scheduler、Listener等 + """ + + # 插件基本信息(子类必须定义) + plugin_name: str = "" # 插件名称 + plugin_description: str = "" # 插件描述 + plugin_version: str = "1.0.0" # 插件版本 + plugin_author: str = "" # 插件作者 + enable_plugin: bool = True # 是否启用插件 + dependencies: List[str] = [] # 依赖的其他插件 + config_file_name: Optional[str] = None # 配置文件名 + + def __init__(self, plugin_dir: str = None): + """初始化插件 + + Args: + plugin_dir: 插件目录路径,由插件管理器传递 + """ + self.config: Dict[str, Any] = {} # 插件配置 + self.plugin_dir = plugin_dir # 插件目录路径 + self.log_prefix = f"[Plugin:{self.plugin_name}]" + + # 验证插件信息 + self._validate_plugin_info() + + # 加载插件配置 + self._load_plugin_config() + + # 创建插件信息对象 + self.plugin_info = PluginInfo( + name=self.plugin_name, + description=self.plugin_description, + version=self.plugin_version, + author=self.plugin_author, + enabled=self.enable_plugin, + is_built_in=False, + config_file=self.config_file_name or "", + dependencies=self.dependencies.copy() + ) + + logger.debug(f"{self.log_prefix} 插件基类初始化完成") + + def _validate_plugin_info(self): + """验证插件基本信息""" + if not self.plugin_name: + raise ValueError(f"插件类 {self.__class__.__name__} 必须定义 plugin_name") + if not self.plugin_description: + raise ValueError(f"插件 {self.plugin_name} 必须定义 plugin_description") + + def _load_plugin_config(self): + """加载插件配置文件""" + if not self.config_file_name: + logger.debug(f"{self.log_prefix} 未指定配置文件,跳过加载") + return + + # 优先使用传入的插件目录路径 + if self.plugin_dir: + plugin_dir = self.plugin_dir + else: + # fallback:尝试从类的模块信息获取路径 + try: + plugin_module_path = inspect.getfile(self.__class__) + plugin_dir = os.path.dirname(plugin_module_path) + except (TypeError, OSError): + # 最后的fallback:从模块的__file__属性获取 + module = inspect.getmodule(self.__class__) + if module and hasattr(module, '__file__') and module.__file__: + plugin_dir = os.path.dirname(module.__file__) + else: + logger.warning(f"{self.log_prefix} 无法获取插件目录路径,跳过配置加载") + return + + config_file_path = os.path.join(plugin_dir, self.config_file_name) + + if not os.path.exists(config_file_path): + logger.warning(f"{self.log_prefix} 配置文件 {config_file_path} 不存在") + return + + file_ext = os.path.splitext(self.config_file_name)[1].lower() + + if file_ext == ".toml": + with open(config_file_path, "r", encoding="utf-8") as f: + self.config = toml.load(f) or {} + logger.info(f"{self.log_prefix} 配置已从 {config_file_path} 加载") + else: + logger.warning(f"{self.log_prefix} 不支持的配置文件格式: {file_ext},仅支持 .toml") + self.config = {} + + @abstractmethod + def get_plugin_components(self) -> List[tuple[ComponentInfo, Type]]: + """获取插件包含的组件列表 + + 子类必须实现此方法,返回组件信息和组件类的列表 + + Returns: + List[tuple[ComponentInfo, Type]]: [(组件信息, 组件类), ...] + """ + pass + + def register_plugin(self) -> bool: + """注册插件及其所有组件""" + if not self.enable_plugin: + logger.info(f"{self.log_prefix} 插件已禁用,跳过注册") + return False + + components = self.get_plugin_components() + + # 检查依赖 + if not self._check_dependencies(): + logger.error(f"{self.log_prefix} 依赖检查失败,跳过注册") + return False + + # 注册所有组件 + registered_components = [] + for component_info, component_class in components: + component_info.plugin_name = self.plugin_name + if component_registry.register_component(component_info, component_class): + registered_components.append(component_info) + else: + logger.warning(f"{self.log_prefix} 组件 {component_info.name} 注册失败") + + # 更新插件信息中的组件列表 + self.plugin_info.components = registered_components + + # 注册插件 + if component_registry.register_plugin(self.plugin_info): + logger.info(f"{self.log_prefix} 插件注册成功,包含 {len(registered_components)} 个组件") + return True + else: + logger.error(f"{self.log_prefix} 插件注册失败") + return False + + def _check_dependencies(self) -> bool: + """检查插件依赖""" + if not self.dependencies: + return True + + for dep in self.dependencies: + if not component_registry.get_plugin_info(dep): + logger.error(f"{self.log_prefix} 缺少依赖插件: {dep}") + return False + + return True + + def get_config(self, key: str, default: Any = None) -> Any: + """获取插件配置值 + + Args: + key: 配置键名 + default: 默认值 + + Returns: + Any: 配置值或默认值 + """ + return self.config.get(key, default) + + +def register_plugin(cls): + """插件注册装饰器 + + 用法: + @register_plugin + class MyPlugin(BasePlugin): + plugin_name = "my_plugin" + plugin_description = "我的插件" + ... + """ + if not issubclass(cls, BasePlugin): + logger.error(f"类 {cls.__name__} 不是 BasePlugin 的子类") + return cls + + # 只是注册插件类,不立即实例化 + # 插件管理器会负责实例化和注册 + plugin_name = cls.plugin_name or cls.__name__ + _plugin_classes[plugin_name] = cls + logger.debug(f"插件类已注册: {plugin_name}") + + return cls + + +def get_registered_plugin_classes() -> Dict[str, Type['BasePlugin']]: + """获取所有已注册的插件类""" + return _plugin_classes.copy() + + +def instantiate_and_register_plugin(plugin_class: Type['BasePlugin'], plugin_dir: str = None) -> bool: + """实例化并注册插件 + + Args: + plugin_class: 插件类 + plugin_dir: 插件目录路径 + + Returns: + bool: 是否成功 + """ + try: + plugin_instance = plugin_class(plugin_dir=plugin_dir) + return plugin_instance.register_plugin() + except Exception as e: + logger.error(f"注册插件 {plugin_class.__name__} 时出错: {e}") + import traceback + logger.error(traceback.format_exc()) + return False \ No newline at end of file diff --git a/src/plugin_system/base/component_types.py b/src/plugin_system/base/component_types.py new file mode 100644 index 000000000..985af121f --- /dev/null +++ b/src/plugin_system/base/component_types.py @@ -0,0 +1,104 @@ +from enum import Enum +from typing import Dict, Any, List +from dataclasses import dataclass + +# 组件类型枚举 +class ComponentType(Enum): + """组件类型枚举""" + ACTION = "action" # 动作组件 + COMMAND = "command" # 命令组件 + SCHEDULER = "scheduler" # 定时任务组件(预留) + LISTENER = "listener" # 事件监听组件(预留) + +# 动作激活类型枚举 +class ActionActivationType(Enum): + """动作激活类型枚举""" + NEVER = "never" # 从不激活(默认关闭) + ALWAYS = "always" # 默认参与到planner + LLM_JUDGE = "llm_judge" # LLM判定是否启动该action到planner + RANDOM = "random" # 随机启用action到planner + KEYWORD = "keyword" # 关键词触发启用action到planner + +# 聊天模式枚举 +class ChatMode(Enum): + """聊天模式枚举""" + FOCUS = "focus" # Focus聊天模式 + NORMAL = "normal" # Normal聊天模式 + ALL = "all" # 所有聊天模式 + +@dataclass +class ComponentInfo: + """组件信息""" + name: str # 组件名称 + component_type: ComponentType # 组件类型 + description: str # 组件描述 + enabled: bool = True # 是否启用 + plugin_name: str = "" # 所属插件名称 + is_built_in: bool = False # 是否为内置组件 + metadata: Dict[str, Any] = None # 额外元数据 + + def __post_init__(self): + if self.metadata is None: + self.metadata = {} + +@dataclass +class ActionInfo(ComponentInfo): + """动作组件信息""" + focus_activation_type: ActionActivationType = ActionActivationType.ALWAYS + normal_activation_type: ActionActivationType = ActionActivationType.ALWAYS + random_activation_probability: float = 0.3 + llm_judge_prompt: str = "" + activation_keywords: List[str] = None + keyword_case_sensitive: bool = False + mode_enable: ChatMode = ChatMode.ALL + parallel_action: bool = False + action_parameters: Dict[str, Any] = None + action_require: List[str] = None + associated_types: List[str] = None + + def __post_init__(self): + super().__post_init__() + if self.activation_keywords is None: + self.activation_keywords = [] + if self.action_parameters is None: + self.action_parameters = {} + if self.action_require is None: + self.action_require = [] + if self.associated_types is None: + self.associated_types = [] + self.component_type = ComponentType.ACTION + +@dataclass +class CommandInfo(ComponentInfo): + """命令组件信息""" + command_pattern: str = "" # 命令匹配模式(正则表达式) + command_help: str = "" # 命令帮助信息 + command_examples: List[str] = None # 命令使用示例 + + def __post_init__(self): + super().__post_init__() + if self.command_examples is None: + self.command_examples = [] + self.component_type = ComponentType.COMMAND + +@dataclass +class PluginInfo: + """插件信息""" + name: str # 插件名称 + description: str # 插件描述 + version: str = "1.0.0" # 插件版本 + author: str = "" # 插件作者 + enabled: bool = True # 是否启用 + is_built_in: bool = False # 是否为内置插件 + components: List[ComponentInfo] = None # 包含的组件列表 + dependencies: List[str] = None # 依赖的其他插件 + config_file: str = "" # 配置文件路径 + metadata: Dict[str, Any] = None # 额外元数据 + + def __post_init__(self): + if self.components is None: + self.components = [] + if self.dependencies is None: + self.dependencies = [] + if self.metadata is None: + self.metadata = {} \ No newline at end of file diff --git a/src/plugin_system/core/__init__.py b/src/plugin_system/core/__init__.py new file mode 100644 index 000000000..c4e9e7a2e --- /dev/null +++ b/src/plugin_system/core/__init__.py @@ -0,0 +1,13 @@ +""" +插件核心管理模块 + +提供插件的加载、注册和管理功能 +""" + +from src.plugin_system.core.plugin_manager import plugin_manager +from src.plugin_system.core.component_registry import component_registry + +__all__ = [ + 'plugin_manager', + 'component_registry', +] \ No newline at end of file diff --git a/src/plugin_system/core/component_registry.py b/src/plugin_system/core/component_registry.py new file mode 100644 index 000000000..abfd42103 --- /dev/null +++ b/src/plugin_system/core/component_registry.py @@ -0,0 +1,245 @@ +from typing import Dict, List, Type, Optional, Any, Pattern +from abc import ABC +import re +from src.common.logger_manager import get_logger +from src.plugin_system.base.component_types import ( + ComponentInfo, ActionInfo, CommandInfo, PluginInfo, + ComponentType, ActionActivationType, ChatMode +) + +logger = get_logger("component_registry") + +class ComponentRegistry: + """统一的组件注册中心 + + 负责管理所有插件组件的注册、查询和生命周期管理 + """ + + def __init__(self): + # 组件注册表 + self._components: Dict[str, ComponentInfo] = {} # 组件名 -> 组件信息 + self._components_by_type: Dict[ComponentType, Dict[str, ComponentInfo]] = { + ComponentType.ACTION: {}, + ComponentType.COMMAND: {}, + } + self._component_classes: Dict[str, Type] = {} # 组件名 -> 组件类 + + # 插件注册表 + self._plugins: Dict[str, PluginInfo] = {} # 插件名 -> 插件信息 + + # Action特定注册表 + self._action_registry: Dict[str, Type] = {} # action名 -> action类 + self._default_actions: Dict[str, str] = {} # 启用的action名 -> 描述 + + # Command特定注册表 + self._command_registry: Dict[str, Type] = {} # command名 -> command类 + self._command_patterns: Dict[Pattern, Type] = {} # 编译后的正则 -> command类 + + logger.info("组件注册中心初始化完成") + + # === 通用组件注册方法 === + + def register_component(self, component_info: ComponentInfo, component_class: Type) -> bool: + """注册组件 + + Args: + component_info: 组件信息 + component_class: 组件类 + + Returns: + bool: 是否注册成功 + """ + component_name = component_info.name + component_type = component_info.component_type + + if component_name in self._components: + logger.warning(f"组件 {component_name} 已存在,跳过注册") + return False + + # 注册到通用注册表 + self._components[component_name] = component_info + self._components_by_type[component_type][component_name] = component_info + self._component_classes[component_name] = component_class + + # 根据组件类型进行特定注册 + if component_type == ComponentType.ACTION: + self._register_action_component(component_info, component_class) + elif component_type == ComponentType.COMMAND: + self._register_command_component(component_info, component_class) + + logger.info(f"已注册{component_type.value}组件: {component_name} ({component_class.__name__})") + return True + + def _register_action_component(self, action_info: ActionInfo, action_class: Type): + """注册Action组件到Action特定注册表""" + action_name = action_info.name + self._action_registry[action_name] = action_class + + # 如果启用,添加到默认动作集 + if action_info.enabled: + self._default_actions[action_name] = action_info.description + + def _register_command_component(self, command_info: CommandInfo, command_class: Type): + """注册Command组件到Command特定注册表""" + command_name = command_info.name + self._command_registry[command_name] = command_class + + # 编译正则表达式并注册 + if command_info.command_pattern: + pattern = re.compile(command_info.command_pattern, re.IGNORECASE | re.DOTALL) + self._command_patterns[pattern] = command_class + + # === 组件查询方法 === + + def get_component_info(self, component_name: str) -> Optional[ComponentInfo]: + """获取组件信息""" + return self._components.get(component_name) + + def get_component_class(self, component_name: str) -> Optional[Type]: + """获取组件类""" + return self._component_classes.get(component_name) + + def get_components_by_type(self, component_type: ComponentType) -> Dict[str, ComponentInfo]: + """获取指定类型的所有组件""" + return self._components_by_type.get(component_type, {}).copy() + + def get_enabled_components_by_type(self, component_type: ComponentType) -> Dict[str, ComponentInfo]: + """获取指定类型的所有启用组件""" + components = self.get_components_by_type(component_type) + return {name: info for name, info in components.items() if info.enabled} + + # === Action特定查询方法 === + + def get_action_registry(self) -> Dict[str, Type]: + """获取Action注册表(用于兼容现有系统)""" + return self._action_registry.copy() + + def get_default_actions(self) -> Dict[str, str]: + """获取默认启用的Action列表(用于兼容现有系统)""" + return self._default_actions.copy() + + def get_action_info(self, action_name: str) -> Optional[ActionInfo]: + """获取Action信息""" + info = self.get_component_info(action_name) + return info if isinstance(info, ActionInfo) else None + + # === Command特定查询方法 === + + def get_command_registry(self) -> Dict[str, Type]: + """获取Command注册表(用于兼容现有系统)""" + return self._command_registry.copy() + + def get_command_patterns(self) -> Dict[Pattern, Type]: + """获取Command模式注册表(用于兼容现有系统)""" + return self._command_patterns.copy() + + def get_command_info(self, command_name: str) -> Optional[CommandInfo]: + """获取Command信息""" + info = self.get_component_info(command_name) + return info if isinstance(info, CommandInfo) else None + + def find_command_by_text(self, text: str) -> Optional[tuple[Type, dict]]: + """根据文本查找匹配的命令 + + Args: + text: 输入文本 + + Returns: + Optional[tuple[Type, dict]]: (命令类, 匹配的命名组) 或 None + """ + for pattern, command_class in self._command_patterns.items(): + match = pattern.match(text) + if match: + command_name = None + # 查找对应的组件信息 + for name, cls in self._command_registry.items(): + if cls == command_class: + command_name = name + break + + # 检查命令是否启用 + if command_name: + command_info = self.get_command_info(command_name) + if command_info and command_info.enabled: + return command_class, match.groupdict() + return None + + # === 插件管理方法 === + + def register_plugin(self, plugin_info: PluginInfo) -> bool: + """注册插件 + + Args: + plugin_info: 插件信息 + + Returns: + bool: 是否注册成功 + """ + plugin_name = plugin_info.name + + if plugin_name in self._plugins: + logger.warning(f"插件 {plugin_name} 已存在,跳过注册") + return False + + self._plugins[plugin_name] = plugin_info + logger.info(f"已注册插件: {plugin_name} (组件数量: {len(plugin_info.components)})") + return True + + def get_plugin_info(self, plugin_name: str) -> Optional[PluginInfo]: + """获取插件信息""" + return self._plugins.get(plugin_name) + + def get_all_plugins(self) -> Dict[str, PluginInfo]: + """获取所有插件""" + return self._plugins.copy() + + def get_enabled_plugins(self) -> Dict[str, PluginInfo]: + """获取所有启用的插件""" + return {name: info for name, info in self._plugins.items() if info.enabled} + + def get_plugin_components(self, plugin_name: str) -> List[ComponentInfo]: + """获取插件的所有组件""" + plugin_info = self.get_plugin_info(plugin_name) + return plugin_info.components if plugin_info else [] + + # === 状态管理方法 === + + def enable_component(self, component_name: str) -> bool: + """启用组件""" + if component_name in self._components: + self._components[component_name].enabled = True + # 如果是Action,更新默认动作集 + component_info = self._components[component_name] + if isinstance(component_info, ActionInfo): + self._default_actions[component_name] = component_info.description + logger.info(f"已启用组件: {component_name}") + return True + return False + + def disable_component(self, component_name: str) -> bool: + """禁用组件""" + if component_name in self._components: + self._components[component_name].enabled = False + # 如果是Action,从默认动作集中移除 + if component_name in self._default_actions: + del self._default_actions[component_name] + logger.info(f"已禁用组件: {component_name}") + return True + return False + + def get_registry_stats(self) -> Dict[str, Any]: + """获取注册中心统计信息""" + return { + "total_components": len(self._components), + "total_plugins": len(self._plugins), + "components_by_type": { + component_type.value: len(components) + for component_type, components in self._components_by_type.items() + }, + "enabled_components": len([c for c in self._components.values() if c.enabled]), + "enabled_plugins": len([p for p in self._plugins.values() if p.enabled]), + } + + +# 全局组件注册中心实例 +component_registry = ComponentRegistry() \ No newline at end of file diff --git a/src/plugin_system/core/plugin_manager.py b/src/plugin_system/core/plugin_manager.py new file mode 100644 index 000000000..1f6557f04 --- /dev/null +++ b/src/plugin_system/core/plugin_manager.py @@ -0,0 +1,223 @@ +from typing import Dict, List, Optional, Any +import os +import importlib +import importlib.util +from pathlib import Path +from src.common.logger_manager import get_logger +from src.plugin_system.core.component_registry import component_registry +from src.plugin_system.base.component_types import PluginInfo, ComponentType + +logger = get_logger("plugin_manager") + +class PluginManager: + """插件管理器 + + 负责加载、初始化和管理所有插件及其组件 + """ + + def __init__(self): + self.plugin_directories: List[str] = [] + self.loaded_plugins: Dict[str, Any] = {} + self.failed_plugins: Dict[str, str] = {} + + logger.info("插件管理器初始化完成") + + def add_plugin_directory(self, directory: str): + """添加插件目录""" + if os.path.exists(directory): + self.plugin_directories.append(directory) + logger.info(f"已添加插件目录: {directory}") + else: + logger.warning(f"插件目录不存在: {directory}") + + def load_all_plugins(self) -> tuple[int, int]: + """加载所有插件目录中的插件 + + Returns: + tuple[int, int]: (插件数量, 组件数量) + """ + logger.info("开始加载所有插件...") + + # 第一阶段:加载所有插件模块(注册插件类) + total_loaded_modules = 0 + total_failed_modules = 0 + + for directory in self.plugin_directories: + loaded, failed = self._load_plugin_modules_from_directory(directory) + total_loaded_modules += loaded + total_failed_modules += failed + + logger.info(f"插件模块加载完成 - 成功: {total_loaded_modules}, 失败: {total_failed_modules}") + + # 第二阶段:实例化所有已注册的插件类 + from src.plugin_system.base.base_plugin import get_registered_plugin_classes, instantiate_and_register_plugin + + plugin_classes = get_registered_plugin_classes() + total_registered = 0 + total_failed_registration = 0 + + for plugin_name, plugin_class in plugin_classes.items(): + # 尝试找到插件对应的目录 + plugin_dir = self._find_plugin_directory(plugin_class) + + if instantiate_and_register_plugin(plugin_class, plugin_dir): + total_registered += 1 + self.loaded_plugins[plugin_name] = plugin_class + else: + total_failed_registration += 1 + self.failed_plugins[plugin_name] = "插件注册失败" + + logger.info(f"插件注册完成 - 成功: {total_registered}, 失败: {total_failed_registration}") + + # 获取组件统计信息 + stats = component_registry.get_registry_stats() + logger.info(f"组件注册统计: {stats}") + + # 返回插件数量和组件数量 + return total_registered, stats.get('total_components', 0) + + def _find_plugin_directory(self, plugin_class) -> Optional[str]: + """查找插件类对应的目录路径""" + try: + import inspect + module = inspect.getmodule(plugin_class) + if module and hasattr(module, '__file__') and module.__file__: + return os.path.dirname(module.__file__) + except Exception: + pass + return None + + def _load_plugin_modules_from_directory(self, directory: str) -> tuple[int, int]: + """从指定目录加载插件模块""" + loaded_count = 0 + failed_count = 0 + + if not os.path.exists(directory): + logger.warning(f"插件目录不存在: {directory}") + return loaded_count, failed_count + + logger.info(f"正在扫描插件目录: {directory}") + + # 遍历目录中的所有Python文件和包 + for item in os.listdir(directory): + item_path = os.path.join(directory, item) + + if os.path.isfile(item_path) and item.endswith('.py') and item != '__init__.py': + # 单文件插件 + if self._load_plugin_module_file(item_path): + loaded_count += 1 + else: + failed_count += 1 + + elif os.path.isdir(item_path) and not item.startswith('.') and not item.startswith('__'): + # 插件包 + plugin_file = os.path.join(item_path, 'plugin.py') + if os.path.exists(plugin_file): + if self._load_plugin_module_file(plugin_file): + loaded_count += 1 + else: + failed_count += 1 + + return loaded_count, failed_count + + def _load_plugin_module_file(self, plugin_file: str) -> bool: + """加载单个插件模块文件""" + plugin_name = None + + # 生成模块名 + plugin_path = Path(plugin_file) + if plugin_path.parent.name != 'plugins': + # 插件包格式:parent_dir.plugin + module_name = f"plugins.{plugin_path.parent.name}.plugin" + else: + # 单文件格式:plugins.filename + module_name = f"plugins.{plugin_path.stem}" + + try: + # 动态导入插件模块 + spec = importlib.util.spec_from_file_location(module_name, plugin_file) + if spec is None or spec.loader is None: + logger.error(f"无法创建模块规范: {plugin_file}") + return False + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # 模块加载成功,插件类会自动通过装饰器注册 + plugin_name = plugin_path.parent.name if plugin_path.parent.name != 'plugins' else plugin_path.stem + + logger.debug(f"插件模块加载成功: {plugin_file}") + return True + + except Exception as e: + error_msg = f"加载插件模块 {plugin_file} 失败: {e}" + logger.error(error_msg) + if plugin_name: + self.failed_plugins[plugin_name] = error_msg + return False + + def get_loaded_plugins(self) -> List[PluginInfo]: + """获取所有已加载的插件信息""" + return list(component_registry.get_all_plugins().values()) + + def get_enabled_plugins(self) -> List[PluginInfo]: + """获取所有启用的插件信息""" + return list(component_registry.get_enabled_plugins().values()) + + def enable_plugin(self, plugin_name: str) -> bool: + """启用插件""" + plugin_info = component_registry.get_plugin_info(plugin_name) + if plugin_info: + plugin_info.enabled = True + # 启用插件的所有组件 + for component in plugin_info.components: + component_registry.enable_component(component.name) + logger.info(f"已启用插件: {plugin_name}") + return True + return False + + def disable_plugin(self, plugin_name: str) -> bool: + """禁用插件""" + plugin_info = component_registry.get_plugin_info(plugin_name) + if plugin_info: + plugin_info.enabled = False + # 禁用插件的所有组件 + for component in plugin_info.components: + component_registry.disable_component(component.name) + logger.info(f"已禁用插件: {plugin_name}") + return True + return False + + def get_plugin_stats(self) -> Dict[str, Any]: + """获取插件统计信息""" + all_plugins = component_registry.get_all_plugins() + enabled_plugins = component_registry.get_enabled_plugins() + + action_components = component_registry.get_components_by_type(ComponentType.ACTION) + command_components = component_registry.get_components_by_type(ComponentType.COMMAND) + + return { + "total_plugins": len(all_plugins), + "enabled_plugins": len(enabled_plugins), + "failed_plugins": len(self.failed_plugins), + "total_components": len(action_components) + len(command_components), + "action_components": len(action_components), + "command_components": len(command_components), + "loaded_plugin_files": len(self.loaded_plugins), + "failed_plugin_details": self.failed_plugins.copy() + } + + def reload_plugin(self, plugin_name: str) -> bool: + """重新加载插件(高级功能,需要谨慎使用)""" + # TODO: 实现插件热重载功能 + logger.warning("插件热重载功能尚未实现") + return False + + +# 全局插件管理器实例 +plugin_manager = PluginManager() + +# 默认插件目录 +plugin_manager.add_plugin_directory("src/plugins/built_in") +plugin_manager.add_plugin_directory("src/plugins/examples") +plugin_manager.add_plugin_directory("plugins") # 用户插件目录 \ No newline at end of file diff --git a/src/plugins/README.md b/src/plugins/README.md new file mode 100644 index 000000000..e1880b087 --- /dev/null +++ b/src/plugins/README.md @@ -0,0 +1,206 @@ +# MaiBot 插件系统架构 + +## 概述 + +MaiBot 插件系统采用组件化设计,支持插件包含多种组件类型: +- **Action组件**:处理聊天中的动作逻辑 +- **Command组件**:处理命令请求 +- **未来扩展**:Scheduler(定时任务)、Listener(事件监听)等 + +## 目录结构 + +``` +src/plugins/ +├── core/ # 插件核心管理 +│ ├── plugin_manager.py # 插件管理器 +│ ├── plugin_loader.py # 插件加载器(预留) +│ └── component_registry.py # 组件注册中心 +├── apis/ # API模块 +│ ├── plugin_api.py # 统一API聚合 +│ ├── message_api.py # 消息API +│ ├── llm_api.py # LLM API +│ ├── database_api.py # 数据库API +│ ├── config_api.py # 配置API +│ ├── utils_api.py # 工具API +│ ├── stream_api.py # 流API +│ └── hearflow_api.py # 心流API +├── base/ # 基础类 +│ ├── base_plugin.py # 插件基类 +│ ├── base_action.py # Action组件基类 +│ ├── base_command.py # Command组件基类 +│ └── component_types.py # 组件类型定义 +├── built_in/ # 内置组件 +│ ├── actions/ # 内置Action +│ └── commands/ # 内置Command +└── examples/ # 示例插件 + └── simple_plugin/ # 简单插件示例 + ├── plugin.py + └── config.toml +``` + +## 核心特性 + +### 1. 组件化设计 +- 插件可以包含多种组件类型 +- 每种组件有明确的职责和接口 +- 支持组件的独立启用/禁用 + +### 2. 统一的API访问 +- 所有插件组件通过 `PluginAPI` 访问系统功能 +- 包含消息发送、数据库操作、LLM调用等 +- 提供统一的错误处理和日志记录 + +### 3. 灵活的配置系统 +- 支持 TOML 格式的配置文件 +- 插件可以读取自定义配置 +- 支持全局配置和插件特定配置 + +### 4. 统一的注册管理 +- 组件注册中心管理所有组件 +- 支持组件的动态启用/禁用 +- 提供丰富的查询和统计接口 + +## 插件开发指南 + +### 创建基本插件 + +```python +from src.plugins.base.base_plugin import BasePlugin, register_plugin +from src.plugins.base.base_action import BaseAction +from src.plugins.base.component_types import ActionInfo, ActionActivationType + +class MyAction(BaseAction): + async def execute(self) -> tuple[bool, str]: + # 使用API发送消息 + response = "Hello from my plugin!" + return True, response + +@register_plugin +class MyPlugin(BasePlugin): + plugin_name = "my_plugin" + plugin_description = "我的第一个插件" + + def get_plugin_components(self): + action_info = ActionInfo( + name="my_action", + description="我的动作", + activation_keywords=["hello"] + ) + return [(action_info, MyAction)] +``` + +### 创建命令组件 + +```python +from src.plugins.base.base_command import BaseCommand +from src.plugins.base.component_types import CommandInfo + +class MyCommand(BaseCommand): + async def execute(self) -> tuple[bool, str]: + # 获取命令参数 + param = self.matched_groups.get("param", "") + + # 发送回复 + await self.send_reply(f"收到参数: {param}") + return True, f"处理完成: {param}" + +# 在插件中注册 +def get_plugin_components(self): + command_info = CommandInfo( + name="my_command", + description="我的命令", + command_pattern=r"^/mycmd\s+(?P\w+)$", + command_help="用法:/mycmd <参数>" + ) + return [(command_info, MyCommand)] +``` + +### 使用配置文件 + +```toml +# config.toml +[plugin] +name = "my_plugin" +enabled = true + +[my_settings] +max_items = 10 +default_message = "Hello World" +``` + +```python +class MyPlugin(BasePlugin): + config_file_name = "config.toml" + + def get_plugin_components(self): + # 读取配置 + max_items = self.get_config("my_settings.max_items", 5) + message = self.get_config("my_settings.default_message", "Hi") + + # 使用配置创建组件... +``` + +## API使用示例 + +### 消息操作 +```python +# 发送文本消息 +await self.api.send_text_to_group(chat_stream, "Hello!") + +# 发送图片 +await self.api.send_image_to_group(chat_stream, image_path) +``` + +### 数据库操作 +```python +# 查询数据 +data = await self.api.db_get("table_name", "key") + +# 保存数据 +await self.api.db_set("table_name", "key", "value") +``` + +### LLM调用 +```python +# 生成文本 +response = await self.api.llm_text_request("你好,请介绍一下自己") + +# 生成图片 +image_url = await self.api.llm_image_request("一只可爱的猫咪") +``` + +## 内置组件迁移 + +现有的内置Action和Command将迁移到新架构: + +### Action迁移 +- `reply_action.py` → `src/plugins/built_in/actions/reply_action.py` +- `emoji_action.py` → `src/plugins/built_in/actions/emoji_action.py` +- `no_reply_action.py` → `src/plugins/built_in/actions/no_reply_action.py` + +### Command迁移 +- 现有命令系统将封装为内置Command组件 +- 保持现有的命令模式和功能 + +## 兼容性 + +新插件系统保持与现有系统的兼容性: +- 现有的Action和Command继续工作 +- 提供兼容层和适配器 +- 逐步迁移到新架构 + +## 扩展性 + +系统设计支持未来扩展: +- 新的组件类型(Scheduler、Listener等) +- 插件间依赖和通信 +- 插件热重载 +- 插件市场和分发 + +## 最佳实践 + +1. **单一职责**:每个组件专注于特定功能 +2. **配置驱动**:通过配置文件控制行为 +3. **错误处理**:妥善处理异常情况 +4. **日志记录**:记录关键操作和错误 +5. **测试覆盖**:为插件编写单元测试 \ No newline at end of file diff --git a/src/plugins/example_command_plugin/README.md b/src/plugins/example_command_plugin/README.md deleted file mode 100644 index 4dd6bc83a..000000000 --- a/src/plugins/example_command_plugin/README.md +++ /dev/null @@ -1,105 +0,0 @@ -# 发送消息命令插件 - -这个插件提供了多个便捷的消息发送命令,允许管理员向指定群聊或用户发送消息。 - -## 命令列表 - -### 1. `/send` - 基础发送命令 -向指定群聊或用户发送文本消息。 - -**语法:** -``` -/send <消息内容> -``` - -**示例:** -``` -/send group 123456789 大家好! -/send user 987654321 私聊消息 -``` - -### 2. `/sendfull` - 增强发送命令 -支持多种消息类型和平台的发送命令。 - -**语法:** -``` -/sendfull <消息类型> <目标类型> [平台] <内容> -``` - -**消息类型:** -- `text` - 文本消息 -- `image` - 图片消息(提供图片URL) -- `emoji` - 表情消息 - -**示例:** -``` -/sendfull text group 123456789 qq 大家好!这是文本消息 -/sendfull image user 987654321 https://example.com/image.jpg -/sendfull emoji group 123456789 😄 -``` - -### 3. `/msg` - 快速群聊发送 -快速向群聊发送文本消息的简化命令。 - -**语法:** -``` -/msg <群ID> <消息内容> -``` - -**示例:** -``` -/msg 123456789 大家好! -/msg 987654321 这是一条快速消息 -``` - -### 4. `/pm` - 私聊发送 -快速向用户发送私聊消息的命令。 - -**语法:** -``` -/pm <用户ID> <消息内容> -``` - -**示例:** -``` -/pm 123456789 你好! -/pm 987654321 这是私聊消息 -``` - -## 使用前提 - -1. **目标存在**: 目标群聊或用户必须已经在机器人的数据库中存在对应的chat_stream记录 -2. **权限要求**: 机器人必须在目标群聊中有发言权限 -3. **管理员权限**: 这些命令通常需要管理员权限才能使用 - -## 错误处理 - -如果消息发送失败,可能的原因: - -1. **目标不存在**: 指定的群ID或用户ID在数据库中找不到对应记录 -2. **权限不足**: 机器人在目标群聊中没有发言权限 -3. **网络问题**: 网络连接异常 -4. **平台限制**: 目标平台的API限制 - -## 注意事项 - -1. **ID格式**: 群ID和用户ID必须是纯数字 -2. **消息长度**: 注意平台对消息长度的限制 -3. **图片格式**: 发送图片时需要提供有效的图片URL -4. **平台支持**: 目前主要支持QQ平台,其他平台可能需要额外配置 - -## 安全建议 - -1. 限制这些命令的使用权限,避免滥用 -2. 监控发送频率,防止刷屏 -3. 定期检查发送日志,确保合规使用 - -## 故障排除 - -查看日志文件中的详细错误信息: -``` -[INFO] [Command:send] 执行发送消息命令: group:123456789 -> 大家好!... -[ERROR] [Command:send] 发送群聊消息时出错: 未找到群ID为 123456789 的聊天流 -``` - -根据错误信息进行相应的处理。 \ No newline at end of file diff --git a/src/plugins/examples/simple_plugin/config.toml b/src/plugins/examples/simple_plugin/config.toml new file mode 100644 index 000000000..7529453f7 --- /dev/null +++ b/src/plugins/examples/simple_plugin/config.toml @@ -0,0 +1,30 @@ +# 完整示例插件配置文件 + +[plugin] +name = "simple_plugin" +version = "1.1.0" +enabled = true +description = "展示新插件系统完整功能的示例插件" + +[hello_action] +greeting_message = "你好,{username}!欢迎使用MaiBot新插件系统!" +enable_emoji = true +enable_llm_greeting = false # 是否使用LLM生成个性化问候 +default_username = "朋友" + +[status_command] +show_detailed_info = true +allowed_types = ["系统", "插件", "数据库", "内存", "网络"] +default_type = "系统" + +[echo_command] +max_message_length = 500 +enable_formatting = true + +[help_command] +show_extended_help = true +include_config_info = true + +[logging] +level = "INFO" +prefix = "[SimplePlugin]" \ No newline at end of file diff --git a/src/plugins/examples/simple_plugin/plugin.py b/src/plugins/examples/simple_plugin/plugin.py new file mode 100644 index 000000000..7d5516994 --- /dev/null +++ b/src/plugins/examples/simple_plugin/plugin.py @@ -0,0 +1,195 @@ +""" +完整示例插件 + +演示新插件系统的完整功能: +- 使用简化的导入接口 +- 展示Action和Command组件的定义 +- 展示插件配置的使用 +- 提供实用的示例功能 +- 演示API的多种使用方式 +""" + +from typing import List, Tuple, Type, Optional + +# 使用简化的导入接口 +from src.plugin_system import ( + BasePlugin, register_plugin, BaseAction, BaseCommand, + ComponentInfo, ActionInfo, CommandInfo, ActionActivationType, ChatMode +) +from src.common.logger_manager import get_logger + +logger = get_logger("simple_plugin") + + +class HelloAction(BaseAction): + """智能问候Action组件""" + + # ✅ 现在可以直接在类中定义激活条件! + focus_activation_type = ActionActivationType.KEYWORD + normal_activation_type = ActionActivationType.KEYWORD + activation_keywords = ["你好", "hello", "问候", "hi", "嗨"] + keyword_case_sensitive = False + mode_enable = ChatMode.ALL + parallel_action = False + + async def execute(self) -> Tuple[bool, str]: + """执行问候动作""" + username = self.action_data.get("username", "朋友") + + # 使用配置文件中的问候消息 + plugin_instance = SimplePlugin() + greeting_template = plugin_instance.get_config("hello_action.greeting_message", "你好,{username}!") + enable_emoji = plugin_instance.get_config("hello_action.enable_emoji", True) + enable_llm = plugin_instance.get_config("hello_action.enable_llm_greeting", False) + + # 如果启用LLM生成个性化问候 + if enable_llm: + try: + # 演示:使用LLM API生成个性化问候 + models = self.api.get_available_models() + if models: + first_model = list(models.values())[0] + prompt = f"为用户名叫{username}的朋友生成一句温暖的个性化问候语,不超过30字:" + + success, response, _, _ = await self.api.generate_with_model( + prompt=prompt, + model_config=first_model + ) + + if success: + logger.info(f"{self.log_prefix} 使用LLM生成问候: {response}") + return True, response + except Exception as e: + logger.warning(f"{self.log_prefix} LLM生成问候失败,使用默认模板: {e}") + + # 构建基础问候消息 + response = greeting_template.format(username=username) + if enable_emoji: + response += " 😊" + + # 演示:存储Action执行记录到数据库 + await self.api.store_action_info( + action_build_into_prompt=False, + action_prompt_display=f"问候了用户: {username}", + action_done=True + ) + + logger.info(f"{self.log_prefix} 执行问候动作: {username}") + return True, response + + +class EchoCommand(BaseCommand): + """回声命令 - 重复用户输入""" + + # ✅ 现在可以直接在类中定义命令模式! + command_pattern = r"^/echo\s+(?P.+)$" + command_help = "重复消息,用法:/echo <消息内容>" + command_examples = ["/echo Hello World", "/echo 你好世界"] + + async def execute(self) -> Tuple[bool, Optional[str]]: + """执行回声命令""" + # 获取匹配的参数 + message = self.matched_groups.get("message", "") + + if not message: + response = "请提供要重复的消息!用法:/echo <消息内容>" + else: + response = f"🔊 {message}" + + # 发送回复 + await self.send_reply(response) + + logger.info(f"{self.log_prefix} 执行回声命令: {message}") + return True, response + + +class StatusCommand(BaseCommand): + """状态查询Command组件""" + + # ✅ 直接定义命令模式 + command_pattern = r"^/status\s*(?P\w+)?$" + command_help = "查询系统状态,用法:/status [类型]" + command_examples = ["/status", "/status 系统", "/status 插件"] + + async def execute(self) -> Tuple[bool, Optional[str]]: + """执行状态查询命令""" + # 获取匹配的参数 + query_type = self.matched_groups.get("type", "系统") + + # 从配置文件获取设置 + plugin_instance = SimplePlugin() + show_detailed = plugin_instance.get_config("status_command.show_detailed_info", True) + allowed_types = plugin_instance.get_config("status_command.allowed_types", ["系统", "插件"]) + + if query_type not in allowed_types: + response = f"不支持的查询类型: {query_type}\n支持的类型: {', '.join(allowed_types)}" + elif show_detailed: + response = f"📊 {query_type}状态详情:\n✅ 运行正常\n🔧 版本: 1.0.0\n⚡ 性能: 良好" + else: + response = f"✅ {query_type}状态:正常" + + # 发送回复 + await self.send_reply(response) + + logger.info(f"{self.log_prefix} 执行状态查询: {query_type}") + return True, response + + +class HelpCommand(BaseCommand): + """帮助命令 - 显示插件功能""" + + # ✅ 直接定义命令模式 + command_pattern = r"^/help$" + command_help = "显示插件帮助信息" + command_examples = ["/help"] + + async def execute(self) -> Tuple[bool, Optional[str]]: + """执行帮助命令""" + help_text = """ +🤖 简单示例插件帮助 + +📝 可用命令: +• /echo <消息> - 重复你的消息 +• /status [类型] - 查询系统状态 +• /help - 显示此帮助信息 + +🎯 智能功能: +• 自动问候 - 当消息包含"你好"、"hello"等关键词时触发 + +⚙️ 配置: +本插件支持通过config.toml文件进行个性化配置 + +💡 这是新插件系统的完整示例,展示了Action和Command的结合使用。 + """.strip() + + await self.send_reply(help_text) + + logger.info(f"{self.log_prefix} 显示帮助信息") + return True, "已显示帮助信息" + + +@register_plugin +class SimplePlugin(BasePlugin): + """完整示例插件 + + 包含多个Action和Command组件,展示插件系统的完整功能 + """ + + # 插件基本信息 + plugin_name = "simple_plugin" + plugin_description = "完整的示例插件,展示新插件系统的各种功能" + plugin_version = "1.1.0" + plugin_author = "MaiBot开发团队" + enable_plugin = True + config_file_name = "config.toml" # 配置文件 + + def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: + """返回插件包含的组件列表""" + + # ✅ 现在可以直接从类属性生成组件信息! + return [ + (HelloAction.get_action_info("hello_action", "智能问候动作,支持自定义消息和表情"), HelloAction), + (EchoCommand.get_command_info("echo_command", "回声命令,重复用户输入的消息"), EchoCommand), + (StatusCommand.get_command_info("status_command", "状态查询命令,支持多种查询类型"), StatusCommand), + (HelpCommand.get_command_info("help_command", "帮助命令,显示插件功能说明"), HelpCommand) + ] \ No newline at end of file diff --git a/src/plugins/plugin_loader.py b/src/plugins/plugin_loader.py deleted file mode 100644 index 107150570..000000000 --- a/src/plugins/plugin_loader.py +++ /dev/null @@ -1,304 +0,0 @@ -import importlib -import pkgutil -import os -from typing import Dict, Tuple -from src.common.logger_manager import get_logger - -logger = get_logger("plugin_loader") - - -class PluginLoader: - """统一的插件加载器,负责加载插件的所有组件(actions、commands等)""" - - def __init__(self): - self.loaded_actions = 0 - self.loaded_commands = 0 - self.plugin_stats: Dict[str, Dict[str, int]] = {} # 统计每个插件加载的组件数量 - self.plugin_sources: Dict[str, str] = {} # 记录每个插件来自哪个路径 - - def load_all_plugins(self) -> Tuple[int, int]: - """加载所有插件的所有组件 - - Returns: - Tuple[int, int]: (加载的动作数量, 加载的命令数量) - """ - # 定义插件搜索路径(优先级从高到低) - plugin_paths = [ - ("plugins", "plugins"), # 项目根目录的plugins文件夹 - ("src.plugins", os.path.join("src", "plugins")), # src下的plugins文件夹 - ] - - total_plugins_found = 0 - - for plugin_import_path, plugin_dir_path in plugin_paths: - try: - plugins_loaded = self._load_plugins_from_path(plugin_import_path, plugin_dir_path) - total_plugins_found += plugins_loaded - - except Exception as e: - logger.error(f"从路径 {plugin_dir_path} 加载插件失败: {e}") - import traceback - - logger.error(traceback.format_exc()) - - if total_plugins_found == 0: - logger.info("未找到任何插件目录或插件") - - # 输出加载统计 - self._log_loading_stats() - - return self.loaded_actions, self.loaded_commands - - def _load_plugins_from_path(self, plugin_import_path: str, plugin_dir_path: str) -> int: - """从指定路径加载插件 - - Args: - plugin_import_path: 插件的导入路径 (如 "plugins" 或 "src.plugins") - plugin_dir_path: 插件目录的文件系统路径 - - Returns: - int: 找到的插件包数量 - """ - # 检查插件目录是否存在 - if not os.path.exists(plugin_dir_path): - logger.debug(f"插件目录 {plugin_dir_path} 不存在,跳过") - return 0 - - logger.info(f"正在从 {plugin_dir_path} 加载插件...") - - # 导入插件包 - try: - plugins_package = importlib.import_module(plugin_import_path) - logger.info(f"成功导入插件包: {plugin_import_path}") - except ImportError as e: - logger.warning(f"导入插件包 {plugin_import_path} 失败: {e}") - return 0 - - # 遍历插件包中的所有子包 - plugins_found = 0 - for _, plugin_name, is_pkg in pkgutil.iter_modules(plugins_package.__path__, plugins_package.__name__ + "."): - if not is_pkg: - continue - - logger.debug(f"检测到插件: {plugin_name}") - # 记录插件来源 - self.plugin_sources[plugin_name] = plugin_dir_path - self._load_single_plugin(plugin_name) - plugins_found += 1 - - if plugins_found > 0: - logger.info(f"从 {plugin_dir_path} 找到 {plugins_found} 个插件包") - else: - logger.debug(f"从 {plugin_dir_path} 未找到任何插件包") - - return plugins_found - - def _load_single_plugin(self, plugin_name: str) -> None: - """加载单个插件的所有组件 - - Args: - plugin_name: 插件名称 - """ - plugin_stats = {"actions": 0, "commands": 0} - - # 加载动作组件 - actions_count = self._load_plugin_actions(plugin_name) - plugin_stats["actions"] = actions_count - self.loaded_actions += actions_count - - # 加载命令组件 - commands_count = self._load_plugin_commands(plugin_name) - plugin_stats["commands"] = commands_count - self.loaded_commands += commands_count - - # 记录插件统计信息 - if actions_count > 0 or commands_count > 0: - self.plugin_stats[plugin_name] = plugin_stats - logger.info(f"插件 {plugin_name} 加载完成: {actions_count} 个动作, {commands_count} 个命令") - - def _load_plugin_actions(self, plugin_name: str) -> int: - """加载插件的动作组件 - - Args: - plugin_name: 插件名称 - - Returns: - int: 加载的动作数量 - """ - loaded_count = 0 - - # 优先检查插件是否有actions子包 - plugin_actions_path = f"{plugin_name}.actions" - plugin_actions_dir = plugin_name.replace(".", os.path.sep) + os.path.sep + "actions" - - actions_loaded_from_subdir = False - - # 首先尝试从actions子目录加载 - if os.path.exists(plugin_actions_dir): - loaded_count += self._load_from_actions_subdir(plugin_name, plugin_actions_path, plugin_actions_dir) - if loaded_count > 0: - actions_loaded_from_subdir = True - - # 如果actions子目录不存在或加载失败,尝试从插件根目录加载 - if not actions_loaded_from_subdir: - loaded_count += self._load_actions_from_root_dir(plugin_name) - - return loaded_count - - def _load_plugin_commands(self, plugin_name: str) -> int: - """加载插件的命令组件 - - Args: - plugin_name: 插件名称 - - Returns: - int: 加载的命令数量 - """ - loaded_count = 0 - - # 优先检查插件是否有commands子包 - plugin_commands_path = f"{plugin_name}.commands" - plugin_commands_dir = plugin_name.replace(".", os.path.sep) + os.path.sep + "commands" - - commands_loaded_from_subdir = False - - # 首先尝试从commands子目录加载 - if os.path.exists(plugin_commands_dir): - loaded_count += self._load_from_commands_subdir(plugin_name, plugin_commands_path, plugin_commands_dir) - if loaded_count > 0: - commands_loaded_from_subdir = True - - # 如果commands子目录不存在或加载失败,尝试从插件根目录加载 - if not commands_loaded_from_subdir: - loaded_count += self._load_commands_from_root_dir(plugin_name) - - return loaded_count - - def _load_from_actions_subdir(self, plugin_name: str, plugin_actions_path: str, plugin_actions_dir: str) -> int: - """从actions子目录加载动作""" - loaded_count = 0 - - try: - # 尝试导入插件的actions包 - actions_module = importlib.import_module(plugin_actions_path) - logger.debug(f"成功加载插件动作模块: {plugin_actions_path}") - - # 遍历actions目录中的所有Python文件 - actions_dir = os.path.dirname(actions_module.__file__) - for file in os.listdir(actions_dir): - if file.endswith(".py") and file != "__init__.py": - action_module_name = f"{plugin_actions_path}.{file[:-3]}" - try: - importlib.import_module(action_module_name) - logger.info(f"成功加载动作: {action_module_name}") - loaded_count += 1 - except Exception as e: - logger.error(f"加载动作失败: {action_module_name}, 错误: {e}") - - except ImportError as e: - logger.debug(f"插件 {plugin_name} 的actions子包导入失败: {e}") - - return loaded_count - - def _load_from_commands_subdir(self, plugin_name: str, plugin_commands_path: str, plugin_commands_dir: str) -> int: - """从commands子目录加载命令""" - loaded_count = 0 - - try: - # 尝试导入插件的commands包 - commands_module = importlib.import_module(plugin_commands_path) - logger.debug(f"成功加载插件命令模块: {plugin_commands_path}") - - # 遍历commands目录中的所有Python文件 - commands_dir = os.path.dirname(commands_module.__file__) - for file in os.listdir(commands_dir): - if file.endswith(".py") and file != "__init__.py": - command_module_name = f"{plugin_commands_path}.{file[:-3]}" - try: - importlib.import_module(command_module_name) - logger.info(f"成功加载命令: {command_module_name}") - loaded_count += 1 - except Exception as e: - logger.error(f"加载命令失败: {command_module_name}, 错误: {e}") - - except ImportError as e: - logger.debug(f"插件 {plugin_name} 的commands子包导入失败: {e}") - - return loaded_count - - def _load_actions_from_root_dir(self, plugin_name: str) -> int: - """从插件根目录加载动作文件""" - loaded_count = 0 - - try: - # 导入插件包本身 - plugin_module = importlib.import_module(plugin_name) - logger.debug(f"尝试从插件根目录加载动作: {plugin_name}") - - # 遍历插件根目录中的所有Python文件 - plugin_dir = os.path.dirname(plugin_module.__file__) - for file in os.listdir(plugin_dir): - if file.endswith(".py") and file != "__init__.py": - # 跳过非动作文件(根据命名约定) - if not (file.endswith("_action.py") or file.endswith("_actions.py") or "action" in file): - continue - - action_module_name = f"{plugin_name}.{file[:-3]}" - try: - importlib.import_module(action_module_name) - logger.info(f"成功加载动作: {action_module_name}") - loaded_count += 1 - except Exception as e: - logger.error(f"加载动作失败: {action_module_name}, 错误: {e}") - - except ImportError as e: - logger.debug(f"插件 {plugin_name} 导入失败: {e}") - - return loaded_count - - def _load_commands_from_root_dir(self, plugin_name: str) -> int: - """从插件根目录加载命令文件""" - loaded_count = 0 - - try: - # 导入插件包本身 - plugin_module = importlib.import_module(plugin_name) - logger.debug(f"尝试从插件根目录加载命令: {plugin_name}") - - # 遍历插件根目录中的所有Python文件 - plugin_dir = os.path.dirname(plugin_module.__file__) - for file in os.listdir(plugin_dir): - if file.endswith(".py") and file != "__init__.py": - # 跳过非命令文件(根据命名约定) - if not (file.endswith("_command.py") or file.endswith("_commands.py") or "command" in file): - continue - - command_module_name = f"{plugin_name}.{file[:-3]}" - try: - importlib.import_module(command_module_name) - logger.info(f"成功加载命令: {command_module_name}") - loaded_count += 1 - except Exception as e: - logger.error(f"加载命令失败: {command_module_name}, 错误: {e}") - - except ImportError as e: - logger.debug(f"插件 {plugin_name} 导入失败: {e}") - - return loaded_count - - def _log_loading_stats(self) -> None: - """输出加载统计信息""" - logger.success(f"插件加载完成: 总计 {self.loaded_actions} 个动作, {self.loaded_commands} 个命令") - - if self.plugin_stats: - logger.info("插件加载详情:") - for plugin_name, stats in self.plugin_stats.items(): - plugin_display_name = plugin_name.split(".")[-1] # 只显示插件名称,不显示完整路径 - source_path = self.plugin_sources.get(plugin_name, "未知路径") - logger.info( - f" {plugin_display_name} (来源: {source_path}): {stats['actions']} 动作, {stats['commands']} 命令" - ) - - -# 创建全局插件加载器实例 -plugin_loader = PluginLoader()