This commit is contained in:
春河晴
2025-06-10 16:43:45 +09:00
parent cf39f2fe84
commit b0c553703f
18 changed files with 500 additions and 489 deletions

View File

@@ -28,7 +28,7 @@ class PluginAction(BaseAction, MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, Utils
"""插件动作基类(旧版兼容)
封装了主程序内部依赖提供简化的API接口给插件开发者
⚠️ 此类已弃用,建议使用新的插件系统:
- 新基类src.plugin_system.base.BaseAction
- 新APIsrc.plugin_system.plugin_api

View File

@@ -47,51 +47,52 @@ class ChatBot:
except Exception as e:
logger.error(f"创建PFC聊天失败: {e}")
async def _process_commands_with_new_system(self, message: MessageRecv):
"""使用新插件系统处理命令"""
try:
if not message.processed_plain_text:
await message.process()
text = message.processed_plain_text
# 使用新的组件注册中心查找命令
command_result = component_registry.find_command_by_text(text)
if command_result:
command_class, matched_groups = command_result
# 创建命令实例
command_instance = command_class(message)
command_instance.set_matched_groups(matched_groups)
try:
# 执行命令
success, response = await command_instance.execute()
# 记录命令执行结果
if success:
logger.info(f"命令执行成功: {command_class.__name__}")
else:
logger.warning(f"命令执行失败: {command_class.__name__} - {response}")
return True, response, False # 找到命令,不继续处理
except Exception as e:
logger.error(f"执行命令时出错: {command_class.__name__} - {e}")
import traceback
logger.error(traceback.format_exc())
try:
await command_instance.send_reply(f"命令执行出错: {str(e)}")
except Exception as send_error:
logger.error(f"发送错误消息失败: {send_error}")
return True, str(e), False # 命令出错,不继续处理
# 没有找到命令,继续处理消息
return False, None, True
except Exception as e:
logger.error(f"处理命令时出错: {e}")
return False, None, True # 出错时继续处理消息
@@ -138,10 +139,10 @@ class ChatBot:
# 处理消息内容,生成纯文本
await message.process()
# 命令处理 - 使用新插件系统检查并处理命令
is_command, cmd_result, continue_process = await self._process_commands_with_new_system(message)
# 如果是命令且不需要继续处理,则直接返回
if is_command and not continue_process:
logger.info(f"命令处理完成,跳过后续消息处理: {cmd_result}")

View File

@@ -22,8 +22,10 @@ from .api.main import start_api_server
# 导入actions模块确保装饰器被执行
import src.chat.actions.default_actions # noqa
# 导入新的插件管理器
from src.plugin_system.core.plugin_manager import plugin_manager
# 导入消息API和traceback模块
from src.common.message import global_api
import traceback
@@ -141,9 +143,9 @@ class MainSystem:
try:
# 使用新的插件管理器加载所有插件
plugin_count, component_count = plugin_manager.load_all_plugins()
logger.success(f"插件系统加载成功: {plugin_count} 个插件,{component_count} 个组件")
except Exception as e:
logger.error(f"加载插件失败: {e}")
logger.error(traceback.format_exc())

View File

@@ -9,8 +9,13 @@ from src.plugin_system.base.base_plugin import BasePlugin, register_plugin
from src.plugin_system.base.base_action import BaseAction
from src.plugin_system.base.base_command import BaseCommand
from src.plugin_system.base.component_types import (
ComponentType, ActionActivationType, ChatMode,
ComponentInfo, ActionInfo, CommandInfo, PluginInfo
ComponentType,
ActionActivationType,
ChatMode,
ComponentInfo,
ActionInfo,
CommandInfo,
PluginInfo,
)
from src.plugin_system.apis.plugin_api import PluginAPI, create_plugin_api, create_command_api
from src.plugin_system.core.plugin_manager import plugin_manager
@@ -20,28 +25,24 @@ __version__ = "1.0.0"
__all__ = [
# 基础类
'BasePlugin',
'BaseAction',
'BaseCommand',
"BasePlugin",
"BaseAction",
"BaseCommand",
# 类型定义
'ComponentType',
'ActionActivationType',
'ChatMode',
'ComponentInfo',
'ActionInfo',
'CommandInfo',
'PluginInfo',
"ComponentType",
"ActionActivationType",
"ChatMode",
"ComponentInfo",
"ActionInfo",
"CommandInfo",
"PluginInfo",
# API接口
'PluginAPI',
'create_plugin_api',
'create_command_api',
"PluginAPI",
"create_plugin_api",
"create_command_api",
# 管理器
'plugin_manager',
'component_registry',
"plugin_manager",
"component_registry",
# 装饰器
'register_plugin',
]
"register_plugin",
]

View File

@@ -19,19 +19,19 @@ from src.plugin_system.apis.independent_apis import IndependentAPI, StaticAPI
__all__ = [
# 原有统一API
'PluginAPI',
'create_plugin_api',
'create_command_api',
"PluginAPI",
"create_plugin_api",
"create_command_api",
# 原有单独API
'MessageAPI',
'LLMAPI',
'DatabaseAPI',
'ConfigAPI',
'UtilsAPI',
'StreamAPI',
'HearflowAPI',
"MessageAPI",
"LLMAPI",
"DatabaseAPI",
"ConfigAPI",
"UtilsAPI",
"StreamAPI",
"HearflowAPI",
# 新增分类API
'ActionAPI', # 需要Action依赖的API
'IndependentAPI', # 独立API
'StaticAPI', # 静态API
]
"ActionAPI", # 需要Action依赖的API
"IndependentAPI", # 独立API
"StaticAPI", # 静态API
]

View File

@@ -11,37 +11,40 @@ from src.common.logger_manager import get_logger
logger = get_logger("action_apis")
class ActionAPI(MessageAPI, DatabaseAPI):
"""
Action相关API聚合类
聚合了需要Action组件依赖的API功能。这些API需要以下依赖
- _services: 包含chat_stream、expressor、replyer、observations等服务对象
- log_prefix: 日志前缀
- thinking_id: 思考ID
- cycle_timers: 计时器
- action_data: Action数据
使用场景:
- 在Action组件中使用需要发送消息、存储数据等功能
- 需要访问聊天上下文和执行环境的操作
"""
def __init__(self,
chat_stream=None,
expressor=None,
replyer=None,
observations=None,
log_prefix: str = "[ActionAPI]",
thinking_id: str = "",
cycle_timers: dict = None,
action_data: dict = None):
def __init__(
self,
chat_stream=None,
expressor=None,
replyer=None,
observations=None,
log_prefix: str = "[ActionAPI]",
thinking_id: str = "",
cycle_timers: dict = None,
action_data: dict = None,
):
"""
初始化Action相关API
Args:
chat_stream: 聊天流对象
expressor: 表达器对象
expressor: 表达器对象
replyer: 回复器对象
observations: 观察列表
log_prefix: 日志前缀
@@ -54,32 +57,32 @@ class ActionAPI(MessageAPI, DatabaseAPI):
"chat_stream": chat_stream,
"expressor": expressor,
"replyer": replyer,
"observations": observations or []
"observations": observations or [],
}
self.log_prefix = log_prefix
self.thinking_id = thinking_id
self.cycle_timers = cycle_timers or {}
self.action_data = action_data or {}
logger.debug(f"{self.log_prefix} ActionAPI 初始化完成")
def set_chat_stream(self, chat_stream):
"""设置聊天流对象"""
self._services["chat_stream"] = chat_stream
logger.debug(f"{self.log_prefix} 设置聊天流")
def set_expressor(self, expressor):
"""设置表达器对象"""
self._services["expressor"] = expressor
logger.debug(f"{self.log_prefix} 设置表达器")
def set_replyer(self, replyer):
"""设置回复器对象"""
self._services["replyer"] = replyer
logger.debug(f"{self.log_prefix} 设置回复器")
def set_observations(self, observations):
"""设置观察列表"""
self._services["observations"] = observations or []
logger.debug(f"{self.log_prefix} 设置观察列表")
logger.debug(f"{self.log_prefix} 设置观察列表")

View File

@@ -14,93 +14,95 @@ from src.common.logger_manager import get_logger
logger = get_logger("independent_apis")
class IndependentAPI(LLMAPI, ConfigAPI, UtilsAPI, StreamAPI, HearflowAPI):
"""
独立API聚合类
聚合了不需要Action组件依赖的API功能。这些API的特点
- 不需要chat_stream、expressor等服务对象
- 可以独立调用不依赖Action执行上下文
- 主要是工具类方法和配置查询方法
包含的API
- LLMAPI: LLM模型调用仅需要全局配置
- ConfigAPI: 配置读取(使用全局配置)
- UtilsAPI: 工具方法(文件操作、时间处理等)
- StreamAPI: 聊天流查询使用ChatManager
- HearflowAPI: 心流状态控制使用heartflow
使用场景:
- 在Command组件中使用
- 独立的工具函数调用
- 配置查询和系统状态检查
"""
def __init__(self, log_prefix: str = "[IndependentAPI]"):
"""
初始化独立API
Args:
log_prefix: 日志前缀,用于区分不同的调用来源
"""
self.log_prefix = log_prefix
logger.debug(f"{self.log_prefix} IndependentAPI 初始化完成")
# 提供便捷的静态访问方式
class StaticAPI:
"""
静态API类
提供完全静态的API访问方式不需要实例化适合简单的工具调用。
"""
# LLM相关
@staticmethod
def get_available_models():
"""获取可用的LLM模型"""
api = LLMAPI()
return api.get_available_models()
@staticmethod
async def generate_with_model(prompt: str, model_config: dict, **kwargs):
"""使用LLM生成内容"""
api = LLMAPI()
api.log_prefix = "[StaticAPI]"
return await api.generate_with_model(prompt, model_config, **kwargs)
# 配置相关
@staticmethod
def get_global_config(key: str, default=None):
"""获取全局配置"""
api = ConfigAPI()
return api.get_global_config(key, default)
@staticmethod
async def get_user_id_by_name(person_name: str):
"""根据用户名获取用户ID"""
api = ConfigAPI()
return await api.get_user_id_by_person_name(person_name)
# 工具相关
@staticmethod
def get_timestamp():
"""获取当前时间戳"""
api = UtilsAPI()
return api.get_timestamp()
@staticmethod
def format_time(timestamp=None, format_str="%Y-%m-%d %H:%M:%S"):
"""格式化时间"""
api = UtilsAPI()
return api.format_time(timestamp, format_str)
@staticmethod
def generate_unique_id():
"""生成唯一ID"""
api = UtilsAPI()
return api.generate_unique_id()
# 聊天流相关
@staticmethod
def get_chat_stream_by_group_id(group_id: str, platform: str = "qq"):
@@ -108,14 +110,14 @@ class StaticAPI:
api = StreamAPI()
api.log_prefix = "[StaticAPI]"
return api.get_chat_stream_by_group_id(group_id, platform)
@staticmethod
def get_all_group_chat_streams(platform: str = "qq"):
"""获取所有群聊聊天流"""
api = StreamAPI()
api.log_prefix = "[StaticAPI]"
return api.get_all_group_chat_streams(platform)
# 心流相关
@staticmethod
async def get_sub_hearflow_by_chat_id(chat_id: str):
@@ -123,10 +125,10 @@ class StaticAPI:
api = HearflowAPI()
api.log_prefix = "[StaticAPI]"
return await api.get_sub_hearflow_by_chat_id(chat_id)
@staticmethod
async def set_sub_hearflow_chat_state(chat_id: str, target_state):
"""设置子心流状态"""
api = HearflowAPI()
api.log_prefix = "[StaticAPI]"
return await api.set_sub_hearflow_chat_state(chat_id, target_state)
return await api.set_sub_hearflow_chat_state(chat_id, target_state)

View File

@@ -174,9 +174,9 @@ class MessageAPI:
"""
try:
# 安全获取服务和日志前缀
services = getattr(self, '_services', {})
log_prefix = getattr(self, 'log_prefix', '[MessageAPI]')
services = getattr(self, "_services", {})
log_prefix = getattr(self, "log_prefix", "[MessageAPI]")
expressor: DefaultExpressor = services.get("expressor")
chat_stream: ChatStream = services.get("chat_stream")
@@ -221,7 +221,7 @@ class MessageAPI:
return success
except Exception as e:
log_prefix = getattr(self, 'log_prefix', '[MessageAPI]')
log_prefix = getattr(self, "log_prefix", "[MessageAPI]")
logger.error(f"{log_prefix} 发送消息时出错: {e}")
traceback.print_exc()
return False
@@ -237,9 +237,9 @@ class MessageAPI:
bool: 是否发送成功
"""
# 安全获取服务和日志前缀
services = getattr(self, '_services', {})
log_prefix = getattr(self, 'log_prefix', '[MessageAPI]')
services = getattr(self, "_services", {})
log_prefix = getattr(self, "log_prefix", "[MessageAPI]")
expressor: DefaultExpressor = services.get("expressor")
chat_stream: ChatStream = services.get("chat_stream")
@@ -276,10 +276,10 @@ class MessageAPI:
anchor_message.update_chat_stream(chat_stream)
# 调用内部方法发送消息
cycle_timers = getattr(self, 'cycle_timers', {})
reasoning = getattr(self, 'reasoning', '插件生成')
thinking_id = getattr(self, 'thinking_id', 'plugin_thinking')
cycle_timers = getattr(self, "cycle_timers", {})
reasoning = getattr(self, "reasoning", "插件生成")
thinking_id = getattr(self, "thinking_id", "plugin_thinking")
success, _ = await expressor.deal_reply(
cycle_timers=cycle_timers,
action_data=reply_data,
@@ -303,9 +303,9 @@ class MessageAPI:
bool: 是否发送成功
"""
# 安全获取服务和日志前缀
services = getattr(self, '_services', {})
log_prefix = getattr(self, 'log_prefix', '[MessageAPI]')
services = getattr(self, "_services", {})
log_prefix = getattr(self, "log_prefix", "[MessageAPI]")
replyer: DefaultReplyer = services.get("replyer")
chat_stream: ChatStream = services.get("chat_stream")
@@ -342,10 +342,10 @@ class MessageAPI:
anchor_message.update_chat_stream(chat_stream)
# 调用内部方法发送消息
cycle_timers = getattr(self, 'cycle_timers', {})
reasoning = getattr(self, 'reasoning', '插件生成')
thinking_id = getattr(self, 'thinking_id', 'plugin_thinking')
cycle_timers = getattr(self, "cycle_timers", {})
reasoning = getattr(self, "reasoning", "插件生成")
thinking_id = getattr(self, "thinking_id", "plugin_thinking")
success, _ = await replyer.deal_reply(
cycle_timers=cycle_timers,
action_data=reply_data,
@@ -362,7 +362,7 @@ class MessageAPI:
Returns:
str: 聊天类型 ("group""private")
"""
services = getattr(self, '_services', {})
services = getattr(self, "_services", {})
chat_stream: ChatStream = services.get("chat_stream")
if chat_stream and hasattr(chat_stream, "group_info"):
return "group" if chat_stream.group_info else "private"
@@ -378,7 +378,7 @@ class MessageAPI:
List[Dict]: 消息列表,每个消息包含发送者、内容等信息
"""
messages = []
services = getattr(self, '_services', {})
services = getattr(self, "_services", {})
observations = services.get("observations", [])
if observations and len(observations) > 0:

View File

@@ -5,7 +5,6 @@
提供所有插件API功能的统一访问入口
"""
from typing import Dict, Any, Optional
from src.common.logger_manager import get_logger
# 导入所有API模块
@@ -23,28 +22,25 @@ logger = get_logger("plugin_api")
class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI, HearflowAPI):
"""
插件API聚合类
集成了所有可供插件使用的API功能提供统一的访问接口。
插件组件可以直接使用此API实例来访问各种功能。
特性:
- 聚合所有API模块的功能
- 支持依赖注入和配置
- 提供统一的错误处理和日志记录
"""
def __init__(self,
chat_stream=None,
expressor=None,
replyer=None,
observations=None,
log_prefix: str = "[PluginAPI]"):
def __init__(
self, chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[PluginAPI]"
):
"""
初始化插件API
Args:
chat_stream: 聊天流对象
expressor: 表达器对象
expressor: 表达器对象
replyer: 回复器对象
observations: 观察列表
log_prefix: 日志前缀
@@ -54,105 +50,96 @@ class PluginAPI(MessageAPI, LLMAPI, DatabaseAPI, ConfigAPI, UtilsAPI, StreamAPI,
"chat_stream": chat_stream,
"expressor": expressor,
"replyer": replyer,
"observations": observations or []
"observations": observations or [],
}
self.log_prefix = log_prefix
# 调用所有父类的初始化
super().__init__()
logger.debug(f"{self.log_prefix} PluginAPI 初始化完成")
def set_chat_stream(self, chat_stream):
"""设置聊天流对象"""
self._services["chat_stream"] = chat_stream
logger.debug(f"{self.log_prefix} 设置聊天流: {getattr(chat_stream, 'stream_id', 'Unknown')}")
def set_expressor(self, expressor):
"""设置表达器对象"""
self._services["expressor"] = expressor
logger.debug(f"{self.log_prefix} 设置表达器")
def set_replyer(self, replyer):
"""设置回复器对象"""
self._services["replyer"] = replyer
logger.debug(f"{self.log_prefix} 设置回复器")
def set_observations(self, observations):
"""设置观察列表"""
self._services["observations"] = observations or []
logger.debug(f"{self.log_prefix} 设置观察列表,数量: {len(observations or [])}")
def get_service(self, service_name: str):
"""获取指定的服务对象"""
return self._services.get(service_name)
def has_service(self, service_name: str) -> bool:
"""检查是否有指定的服务对象"""
return service_name in self._services and self._services[service_name] is not None
# 便捷的工厂函数
def create_plugin_api(chat_stream=None,
expressor=None,
replyer=None,
observations=None,
log_prefix: str = "[Plugin]") -> PluginAPI:
def create_plugin_api(
chat_stream=None, expressor=None, replyer=None, observations=None, log_prefix: str = "[Plugin]"
) -> PluginAPI:
"""
创建插件API实例的便捷函数
Args:
chat_stream: 聊天流对象
expressor: 表达器对象
replyer: 回复器对象
replyer: 回复器对象
observations: 观察列表
log_prefix: 日志前缀
Returns:
PluginAPI: 配置好的插件API实例
"""
return PluginAPI(
chat_stream=chat_stream,
expressor=expressor,
replyer=replyer,
observations=observations,
log_prefix=log_prefix
chat_stream=chat_stream, expressor=expressor, replyer=replyer, observations=observations, log_prefix=log_prefix
)
def create_command_api(message, log_prefix: str = "[Command]") -> PluginAPI:
"""
为命令创建插件API实例的便捷函数
Args:
message: 消息对象,应该包含 chat_stream 等信息
log_prefix: 日志前缀
Returns:
PluginAPI: 配置好的插件API实例
"""
chat_stream = getattr(message, 'chat_stream', None)
api = PluginAPI(
chat_stream=chat_stream,
log_prefix=log_prefix
)
chat_stream = getattr(message, "chat_stream", None)
api = PluginAPI(chat_stream=chat_stream, log_prefix=log_prefix)
return api
# 导出主要接口
__all__ = [
'PluginAPI',
'create_plugin_api',
'create_command_api',
"PluginAPI",
"create_plugin_api",
"create_command_api",
# 也可以导出各个API类供单独使用
'MessageAPI',
'LLMAPI',
'DatabaseAPI',
'ConfigAPI',
'UtilsAPI',
'StreamAPI',
'HearflowAPI'
]
"MessageAPI",
"LLMAPI",
"DatabaseAPI",
"ConfigAPI",
"UtilsAPI",
"StreamAPI",
"HearflowAPI",
]

View File

@@ -8,20 +8,25 @@ from src.plugin_system.base.base_plugin import BasePlugin, register_plugin
from src.plugin_system.base.base_action import BaseAction
from src.plugin_system.base.base_command import BaseCommand
from src.plugin_system.base.component_types import (
ComponentType, ActionActivationType, ChatMode,
ComponentInfo, ActionInfo, CommandInfo, PluginInfo
ComponentType,
ActionActivationType,
ChatMode,
ComponentInfo,
ActionInfo,
CommandInfo,
PluginInfo,
)
__all__ = [
'BasePlugin',
'BaseAction',
'BaseCommand',
'register_plugin',
'ComponentType',
'ActionActivationType',
'ChatMode',
'ComponentInfo',
'ActionInfo',
'CommandInfo',
'PluginInfo',
]
"BasePlugin",
"BaseAction",
"BaseCommand",
"register_plugin",
"ComponentType",
"ActionActivationType",
"ChatMode",
"ComponentInfo",
"ActionInfo",
"CommandInfo",
"PluginInfo",
]

View File

@@ -1,16 +1,17 @@
from abc import ABC, abstractmethod
from typing import Tuple, Dict, Any, Optional
from typing import Tuple
from src.common.logger_manager import get_logger
from src.plugin_system.apis.plugin_api import PluginAPI
from src.plugin_system.base.component_types import ActionActivationType, ChatMode, ActionInfo, ComponentType
logger = get_logger("base_action")
class BaseAction(ABC):
"""Action组件基类
Action是插件的一种组件类型用于处理聊天中的动作逻辑
子类可以通过类属性定义激活条件:
- focus_activation_type: 专注模式激活类型
- normal_activation_type: 普通模式激活类型
@@ -21,7 +22,7 @@ class BaseAction(ABC):
- random_activation_probability: 随机激活概率
- llm_judge_prompt: LLM判断提示词
"""
# 默认激活设置(子类可以覆盖)
focus_activation_type: ActionActivationType = ActionActivationType.NEVER
normal_activation_type: ActionActivationType = ActionActivationType.NEVER
@@ -31,15 +32,10 @@ class BaseAction(ABC):
parallel_action: bool = True
random_activation_probability: float = 0.0
llm_judge_prompt: str = ""
def __init__(self,
action_data: dict,
reasoning: str,
cycle_timers: dict,
thinking_id: str,
**kwargs):
def __init__(self, action_data: dict, reasoning: str, cycle_timers: dict, thinking_id: str, **kwargs):
"""初始化Action组件
Args:
action_data: 动作数据
reasoning: 执行该动作的理由
@@ -51,51 +47,50 @@ class BaseAction(ABC):
self.reasoning = reasoning
self.cycle_timers = cycle_timers
self.thinking_id = thinking_id
# 创建API实例
self.api = PluginAPI(
chat_stream=kwargs.get("chat_stream"),
expressor=kwargs.get("expressor"),
expressor=kwargs.get("expressor"),
replyer=kwargs.get("replyer"),
observations=kwargs.get("observations"),
log_prefix=kwargs.get("log_prefix", "")
log_prefix=kwargs.get("log_prefix", ""),
)
self.log_prefix = kwargs.get("log_prefix", "")
logger.debug(f"{self.log_prefix} Action组件初始化完成")
async def send_reply(self, content: str) -> bool:
"""发送回复消息
Args:
content: 回复内容
Returns:
bool: 是否发送成功
"""
return await self.api.send_message("text", content)
@classmethod
def get_action_info(cls, name: str = None, description: str = None) -> 'ActionInfo':
def get_action_info(cls, name: str = None, description: str = None) -> "ActionInfo":
"""从类属性生成ActionInfo
Args:
name: Action名称如果不提供则使用类名
description: Action描述如果不提供则使用类文档字符串
Returns:
ActionInfo: 生成的Action信息对象
"""
# 自动生成名称和描述
if name is None:
name = cls.__name__.lower().replace('action', '')
name = cls.__name__.lower().replace("action", "")
if description is None:
description = cls.__doc__ or f"{cls.__name__} Action组件"
description = description.strip().split('\n')[0] # 取第一行作为描述
description = description.strip().split("\n")[0] # 取第一行作为描述
return ActionInfo(
name=name,
component_type=ComponentType.ACTION,
@@ -107,14 +102,14 @@ class BaseAction(ABC):
mode_enable=cls.mode_enable,
parallel_action=cls.parallel_action,
random_activation_probability=cls.random_activation_probability,
llm_judge_prompt=cls.llm_judge_prompt
llm_judge_prompt=cls.llm_judge_prompt,
)
@abstractmethod
async def execute(self) -> Tuple[bool, str]:
"""执行Action的抽象方法子类必须实现
Returns:
Tuple[bool, str]: (是否执行成功, 回复文本)
"""
pass
pass

View File

@@ -7,107 +7,100 @@ from src.chat.message_receive.message import MessageRecv
logger = get_logger("base_command")
class BaseCommand(ABC):
"""Command组件基类
Command是插件的一种组件类型用于处理命令请求
子类可以通过类属性定义命令模式:
- command_pattern: 命令匹配的正则表达式
- command_help: 命令帮助信息
- command_examples: 命令使用示例列表
"""
# 默认命令设置(子类可以覆盖)
command_pattern: str = ""
command_help: str = ""
command_examples: List[str] = []
def __init__(self, message: MessageRecv):
"""初始化Command组件
Args:
message: 接收到的消息对象
"""
self.message = message
self.matched_groups: Dict[str, str] = {} # 存储正则表达式匹配的命名组
# 创建API实例
self.api = PluginAPI(
chat_stream=message.chat_stream,
log_prefix=f"[Command]"
)
self.log_prefix = f"[Command]"
self.api = PluginAPI(chat_stream=message.chat_stream, log_prefix="[Command]")
self.log_prefix = "[Command]"
logger.debug(f"{self.log_prefix} Command组件初始化完成")
def set_matched_groups(self, groups: Dict[str, str]) -> None:
"""设置正则表达式匹配的命名组
Args:
groups: 正则表达式匹配的命名组
"""
self.matched_groups = groups
@abstractmethod
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行Command的抽象方法子类必须实现
Returns:
Tuple[bool, Optional[str]]: (是否执行成功, 可选的回复消息)
"""
pass
async def send_reply(self, content: str) -> None:
"""发送回复消息
Args:
content: 回复内容
"""
# 获取聊天流信息
chat_stream = self.message.chat_stream
if chat_stream.group_info:
# 群聊
await self.api.send_text_to_group(
text=content,
group_id=str(chat_stream.group_info.group_id),
platform=chat_stream.platform
text=content, group_id=str(chat_stream.group_info.group_id), platform=chat_stream.platform
)
else:
# 私聊
await self.api.send_text_to_user(
text=content,
user_id=str(chat_stream.user_info.user_id),
platform=chat_stream.platform
text=content, user_id=str(chat_stream.user_info.user_id), platform=chat_stream.platform
)
@classmethod
def get_command_info(cls, name: str = None, description: str = None) -> 'CommandInfo':
def get_command_info(cls, name: str = None, description: str = None) -> "CommandInfo":
"""从类属性生成CommandInfo
Args:
name: Command名称如果不提供则使用类名
description: Command描述如果不提供则使用类文档字符串
Returns:
CommandInfo: 生成的Command信息对象
"""
# 自动生成名称和描述
if name is None:
name = cls.__name__.lower().replace('command', '')
name = cls.__name__.lower().replace("command", "")
if description is None:
description = cls.__doc__ or f"{cls.__name__} Command组件"
description = description.strip().split('\n')[0] # 取第一行作为描述
description = description.strip().split("\n")[0] # 取第一行作为描述
return CommandInfo(
name=name,
component_type=ComponentType.COMMAND,
description=description,
command_pattern=cls.command_pattern,
command_help=cls.command_help,
command_examples=cls.command_examples.copy() if cls.command_examples else []
)
command_examples=cls.command_examples.copy() if cls.command_examples else [],
)

View File

@@ -5,50 +5,51 @@ import inspect
import toml
from src.common.logger_manager import get_logger
from src.plugin_system.base.component_types import (
PluginInfo, ComponentInfo, ActionInfo, CommandInfo,
ComponentType, ActionActivationType, ChatMode
PluginInfo,
ComponentInfo,
)
from src.plugin_system.core.component_registry import component_registry
logger = get_logger("base_plugin")
# 全局插件类注册表
_plugin_classes: Dict[str, Type['BasePlugin']] = {}
_plugin_classes: Dict[str, Type["BasePlugin"]] = {}
class BasePlugin(ABC):
"""插件基类
所有插件都应该继承这个基类,一个插件可以包含多种组件:
- Action组件处理聊天中的动作
- Command组件处理命令请求
- 未来可扩展Scheduler、Listener等
"""
# 插件基本信息(子类必须定义)
plugin_name: str = "" # 插件名称
plugin_description: str = "" # 插件描述
plugin_version: str = "1.0.0" # 插件版本
plugin_author: str = "" # 插件作者
enable_plugin: bool = True # 是否启用插件
dependencies: List[str] = [] # 依赖的其他插件
plugin_name: str = "" # 插件名称
plugin_description: str = "" # 插件描述
plugin_version: str = "1.0.0" # 插件版本
plugin_author: str = "" # 插件作者
enable_plugin: bool = True # 是否启用插件
dependencies: List[str] = [] # 依赖的其他插件
config_file_name: Optional[str] = None # 配置文件名
def __init__(self, plugin_dir: str = None):
"""初始化插件
Args:
plugin_dir: 插件目录路径,由插件管理器传递
"""
self.config: Dict[str, Any] = {} # 插件配置
self.plugin_dir = plugin_dir # 插件目录路径
self.config: Dict[str, Any] = {} # 插件配置
self.plugin_dir = plugin_dir # 插件目录路径
self.log_prefix = f"[Plugin:{self.plugin_name}]"
# 验证插件信息
self._validate_plugin_info()
# 加载插件配置
self._load_plugin_config()
# 创建插件信息对象
self.plugin_info = PluginInfo(
name=self.plugin_name,
@@ -58,24 +59,24 @@ class BasePlugin(ABC):
enabled=self.enable_plugin,
is_built_in=False,
config_file=self.config_file_name or "",
dependencies=self.dependencies.copy()
dependencies=self.dependencies.copy(),
)
logger.debug(f"{self.log_prefix} 插件基类初始化完成")
def _validate_plugin_info(self):
"""验证插件基本信息"""
if not self.plugin_name:
raise ValueError(f"插件类 {self.__class__.__name__} 必须定义 plugin_name")
if not self.plugin_description:
raise ValueError(f"插件 {self.plugin_name} 必须定义 plugin_description")
def _load_plugin_config(self):
"""加载插件配置文件"""
if not self.config_file_name:
logger.debug(f"{self.log_prefix} 未指定配置文件,跳过加载")
return
# 优先使用传入的插件目录路径
if self.plugin_dir:
plugin_dir = self.plugin_dir
@@ -87,20 +88,20 @@ class BasePlugin(ABC):
except (TypeError, OSError):
# 最后的fallback从模块的__file__属性获取
module = inspect.getmodule(self.__class__)
if module and hasattr(module, '__file__') and module.__file__:
if module and hasattr(module, "__file__") and module.__file__:
plugin_dir = os.path.dirname(module.__file__)
else:
logger.warning(f"{self.log_prefix} 无法获取插件目录路径,跳过配置加载")
return
config_file_path = os.path.join(plugin_dir, self.config_file_name)
if not os.path.exists(config_file_path):
logger.warning(f"{self.log_prefix} 配置文件 {config_file_path} 不存在")
return
file_ext = os.path.splitext(self.config_file_name)[1].lower()
if file_ext == ".toml":
with open(config_file_path, "r", encoding="utf-8") as f:
self.config = toml.load(f) or {}
@@ -108,31 +109,31 @@ class BasePlugin(ABC):
else:
logger.warning(f"{self.log_prefix} 不支持的配置文件格式: {file_ext},仅支持 .toml")
self.config = {}
@abstractmethod
def get_plugin_components(self) -> List[tuple[ComponentInfo, Type]]:
"""获取插件包含的组件列表
子类必须实现此方法,返回组件信息和组件类的列表
Returns:
List[tuple[ComponentInfo, Type]]: [(组件信息, 组件类), ...]
"""
pass
def register_plugin(self) -> bool:
"""注册插件及其所有组件"""
if not self.enable_plugin:
logger.info(f"{self.log_prefix} 插件已禁用,跳过注册")
return False
components = self.get_plugin_components()
# 检查依赖
if not self._check_dependencies():
logger.error(f"{self.log_prefix} 依赖检查失败,跳过注册")
return False
# 注册所有组件
registered_components = []
for component_info, component_class in components:
@@ -141,10 +142,10 @@ class BasePlugin(ABC):
registered_components.append(component_info)
else:
logger.warning(f"{self.log_prefix} 组件 {component_info.name} 注册失败")
# 更新插件信息中的组件列表
self.plugin_info.components = registered_components
# 注册插件
if component_registry.register_plugin(self.plugin_info):
logger.info(f"{self.log_prefix} 插件注册成功,包含 {len(registered_components)} 个组件")
@@ -152,26 +153,26 @@ class BasePlugin(ABC):
else:
logger.error(f"{self.log_prefix} 插件注册失败")
return False
def _check_dependencies(self) -> bool:
"""检查插件依赖"""
if not self.dependencies:
return True
for dep in self.dependencies:
if not component_registry.get_plugin_info(dep):
logger.error(f"{self.log_prefix} 缺少依赖插件: {dep}")
return False
return True
def get_config(self, key: str, default: Any = None) -> Any:
"""获取插件配置值
Args:
key: 配置键名
default: 默认值
Returns:
Any: 配置值或默认值
"""
@@ -180,7 +181,7 @@ class BasePlugin(ABC):
def register_plugin(cls):
"""插件注册装饰器
用法:
@register_plugin
class MyPlugin(BasePlugin):
@@ -191,28 +192,28 @@ def register_plugin(cls):
if not issubclass(cls, BasePlugin):
logger.error(f"{cls.__name__} 不是 BasePlugin 的子类")
return cls
# 只是注册插件类,不立即实例化
# 插件管理器会负责实例化和注册
plugin_name = cls.plugin_name or cls.__name__
_plugin_classes[plugin_name] = cls
logger.debug(f"插件类已注册: {plugin_name}")
return cls
def get_registered_plugin_classes() -> Dict[str, Type['BasePlugin']]:
def get_registered_plugin_classes() -> Dict[str, Type["BasePlugin"]]:
"""获取所有已注册的插件类"""
return _plugin_classes.copy()
def instantiate_and_register_plugin(plugin_class: Type['BasePlugin'], plugin_dir: str = None) -> bool:
def instantiate_and_register_plugin(plugin_class: Type["BasePlugin"], plugin_dir: str = None) -> bool:
"""实例化并注册插件
Args:
plugin_class: 插件类
plugin_dir: 插件目录路径
Returns:
bool: 是否成功
"""
@@ -222,5 +223,6 @@ def instantiate_and_register_plugin(plugin_class: Type['BasePlugin'], plugin_dir
except Exception as e:
logger.error(f"注册插件 {plugin_class.__name__} 时出错: {e}")
import traceback
logger.error(traceback.format_exc())
return False
return False

View File

@@ -2,48 +2,58 @@ from enum import Enum
from typing import Dict, Any, List
from dataclasses import dataclass
# 组件类型枚举
class ComponentType(Enum):
"""组件类型枚举"""
ACTION = "action" # 动作组件
COMMAND = "command" # 命令组件
SCHEDULER = "scheduler" # 定时任务组件(预留)
LISTENER = "listener" # 事件监听组件(预留)
ACTION = "action" # 动作组件
COMMAND = "command" # 命令组件
SCHEDULER = "scheduler" # 定时任务组件(预留)
LISTENER = "listener" # 事件监听组件(预留)
# 动作激活类型枚举
class ActionActivationType(Enum):
"""动作激活类型枚举"""
NEVER = "never" # 从不激活(默认关闭)
ALWAYS = "always" # 默认参与到planner
LLM_JUDGE = "llm_judge" # LLM判定是否启动该action到planner
RANDOM = "random" # 随机启用action到planner
KEYWORD = "keyword" # 关键词触发启用action到planner
NEVER = "never" # 从不激活(默认关闭)
ALWAYS = "always" # 默认参与到planner
LLM_JUDGE = "llm_judge" # LLM判定是否启动该action到planner
RANDOM = "random" # 随机启用action到planner
KEYWORD = "keyword" # 关键词触发启用action到planner
# 聊天模式枚举
class ChatMode(Enum):
"""聊天模式枚举"""
FOCUS = "focus" # Focus聊天模式
FOCUS = "focus" # Focus聊天模式
NORMAL = "normal" # Normal聊天模式
ALL = "all" # 所有聊天模式
ALL = "all" # 所有聊天模式
@dataclass
class ComponentInfo:
"""组件信息"""
name: str # 组件名称
component_type: ComponentType # 组件类型
description: str # 组件描述
enabled: bool = True # 是否启用
plugin_name: str = "" # 所属插件名称
is_built_in: bool = False # 是否为内置组件
metadata: Dict[str, Any] = None # 额外元数据
name: str # 组件名称
component_type: ComponentType # 组件类型
description: str # 组件描述
enabled: bool = True # 是否启用
plugin_name: str = "" # 所属插件名称
is_built_in: bool = False # 是否为内置组件
metadata: Dict[str, Any] = None # 额外元数据
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
@dataclass
class ActionInfo(ComponentInfo):
"""动作组件信息"""
focus_activation_type: ActionActivationType = ActionActivationType.ALWAYS
normal_activation_type: ActionActivationType = ActionActivationType.ALWAYS
random_activation_probability: float = 0.3
@@ -55,7 +65,7 @@ class ActionInfo(ComponentInfo):
action_parameters: Dict[str, Any] = None
action_require: List[str] = None
associated_types: List[str] = None
def __post_init__(self):
super().__post_init__()
if self.activation_keywords is None:
@@ -68,37 +78,41 @@ class ActionInfo(ComponentInfo):
self.associated_types = []
self.component_type = ComponentType.ACTION
@dataclass
class CommandInfo(ComponentInfo):
"""命令组件信息"""
command_pattern: str = "" # 命令匹配模式(正则表达式)
command_help: str = "" # 命令帮助信息
command_pattern: str = "" # 命令匹配模式(正则表达式)
command_help: str = "" # 命令帮助信息
command_examples: List[str] = None # 命令使用示例
def __post_init__(self):
super().__post_init__()
if self.command_examples is None:
self.command_examples = []
self.component_type = ComponentType.COMMAND
@dataclass
class PluginInfo:
"""插件信息"""
name: str # 插件名称
description: str # 插件描述
version: str = "1.0.0" # 插件版本
author: str = "" # 插件作者
enabled: bool = True # 是否启用
is_built_in: bool = False # 是否为内置插件
name: str # 插件名称
description: str # 插件描述
version: str = "1.0.0" # 插件版本
author: str = "" # 插件作者
enabled: bool = True # 是否启用
is_built_in: bool = False # 是否为内置插件
components: List[ComponentInfo] = None # 包含的组件列表
dependencies: List[str] = None # 依赖的其他插件
config_file: str = "" # 配置文件路径
metadata: Dict[str, Any] = None # 额外元数据
dependencies: List[str] = None # 依赖的其他插件
config_file: str = "" # 配置文件路径
metadata: Dict[str, Any] = None # 额外元数据
def __post_init__(self):
if self.components is None:
self.components = []
if self.dependencies is None:
self.dependencies = []
if self.metadata is None:
self.metadata = {}
self.metadata = {}

View File

@@ -8,6 +8,6 @@ from src.plugin_system.core.plugin_manager import plugin_manager
from src.plugin_system.core.component_registry import component_registry
__all__ = [
'plugin_manager',
'component_registry',
]
"plugin_manager",
"component_registry",
]

View File

@@ -1,149 +1,152 @@
from typing import Dict, List, Type, Optional, Any, Pattern
from abc import ABC
import re
from src.common.logger_manager import get_logger
from src.plugin_system.base.component_types import (
ComponentInfo, ActionInfo, CommandInfo, PluginInfo,
ComponentType, ActionActivationType, ChatMode
ComponentInfo,
ActionInfo,
CommandInfo,
PluginInfo,
ComponentType,
)
logger = get_logger("component_registry")
class ComponentRegistry:
"""统一的组件注册中心
负责管理所有插件组件的注册、查询和生命周期管理
"""
def __init__(self):
# 组件注册表
self._components: Dict[str, ComponentInfo] = {} # 组件名 -> 组件信息
self._components: Dict[str, ComponentInfo] = {} # 组件名 -> 组件信息
self._components_by_type: Dict[ComponentType, Dict[str, ComponentInfo]] = {
ComponentType.ACTION: {},
ComponentType.COMMAND: {},
}
self._component_classes: Dict[str, Type] = {} # 组件名 -> 组件类
self._component_classes: Dict[str, Type] = {} # 组件名 -> 组件类
# 插件注册表
self._plugins: Dict[str, PluginInfo] = {} # 插件名 -> 插件信息
self._plugins: Dict[str, PluginInfo] = {} # 插件名 -> 插件信息
# Action特定注册表
self._action_registry: Dict[str, Type] = {} # action名 -> action类
self._default_actions: Dict[str, str] = {} # 启用的action名 -> 描述
# Command特定注册表
self._command_registry: Dict[str, Type] = {} # command名 -> command类
self._command_patterns: Dict[Pattern, Type] = {} # 编译后的正则 -> command类
self._action_registry: Dict[str, Type] = {} # action名 -> action类
self._default_actions: Dict[str, str] = {} # 启用的action名 -> 描述
# Command特定注册表
self._command_registry: Dict[str, Type] = {} # command名 -> command类
self._command_patterns: Dict[Pattern, Type] = {} # 编译后的正则 -> command类
logger.info("组件注册中心初始化完成")
# === 通用组件注册方法 ===
def register_component(self, component_info: ComponentInfo, component_class: Type) -> bool:
"""注册组件
Args:
component_info: 组件信息
component_class: 组件类
Returns:
bool: 是否注册成功
"""
component_name = component_info.name
component_type = component_info.component_type
if component_name in self._components:
logger.warning(f"组件 {component_name} 已存在,跳过注册")
return False
# 注册到通用注册表
self._components[component_name] = component_info
self._components_by_type[component_type][component_name] = component_info
self._component_classes[component_name] = component_class
# 根据组件类型进行特定注册
if component_type == ComponentType.ACTION:
self._register_action_component(component_info, component_class)
elif component_type == ComponentType.COMMAND:
self._register_command_component(component_info, component_class)
logger.info(f"已注册{component_type.value}组件: {component_name} ({component_class.__name__})")
return True
def _register_action_component(self, action_info: ActionInfo, action_class: Type):
"""注册Action组件到Action特定注册表"""
action_name = action_info.name
self._action_registry[action_name] = action_class
# 如果启用,添加到默认动作集
if action_info.enabled:
self._default_actions[action_name] = action_info.description
def _register_command_component(self, command_info: CommandInfo, command_class: Type):
"""注册Command组件到Command特定注册表"""
command_name = command_info.name
self._command_registry[command_name] = command_class
# 编译正则表达式并注册
if command_info.command_pattern:
pattern = re.compile(command_info.command_pattern, re.IGNORECASE | re.DOTALL)
self._command_patterns[pattern] = command_class
# === 组件查询方法 ===
def get_component_info(self, component_name: str) -> Optional[ComponentInfo]:
"""获取组件信息"""
return self._components.get(component_name)
def get_component_class(self, component_name: str) -> Optional[Type]:
"""获取组件类"""
return self._component_classes.get(component_name)
def get_components_by_type(self, component_type: ComponentType) -> Dict[str, ComponentInfo]:
"""获取指定类型的所有组件"""
return self._components_by_type.get(component_type, {}).copy()
def get_enabled_components_by_type(self, component_type: ComponentType) -> Dict[str, ComponentInfo]:
"""获取指定类型的所有启用组件"""
components = self.get_components_by_type(component_type)
return {name: info for name, info in components.items() if info.enabled}
# === Action特定查询方法 ===
def get_action_registry(self) -> Dict[str, Type]:
"""获取Action注册表用于兼容现有系统"""
return self._action_registry.copy()
def get_default_actions(self) -> Dict[str, str]:
"""获取默认启用的Action列表用于兼容现有系统"""
return self._default_actions.copy()
def get_action_info(self, action_name: str) -> Optional[ActionInfo]:
"""获取Action信息"""
info = self.get_component_info(action_name)
return info if isinstance(info, ActionInfo) else None
# === Command特定查询方法 ===
def get_command_registry(self) -> Dict[str, Type]:
"""获取Command注册表用于兼容现有系统"""
return self._command_registry.copy()
def get_command_patterns(self) -> Dict[Pattern, Type]:
"""获取Command模式注册表用于兼容现有系统"""
return self._command_patterns.copy()
def get_command_info(self, command_name: str) -> Optional[CommandInfo]:
"""获取Command信息"""
info = self.get_component_info(command_name)
return info if isinstance(info, CommandInfo) else None
def find_command_by_text(self, text: str) -> Optional[tuple[Type, dict]]:
"""根据文本查找匹配的命令
Args:
text: 输入文本
Returns:
Optional[tuple[Type, dict]]: (命令类, 匹配的命名组) 或 None
"""
@@ -156,54 +159,54 @@ class ComponentRegistry:
if cls == command_class:
command_name = name
break
# 检查命令是否启用
if command_name:
command_info = self.get_command_info(command_name)
if command_info and command_info.enabled:
return command_class, match.groupdict()
return None
# === 插件管理方法 ===
def register_plugin(self, plugin_info: PluginInfo) -> bool:
"""注册插件
Args:
plugin_info: 插件信息
Returns:
bool: 是否注册成功
"""
plugin_name = plugin_info.name
if plugin_name in self._plugins:
logger.warning(f"插件 {plugin_name} 已存在,跳过注册")
return False
self._plugins[plugin_name] = plugin_info
logger.info(f"已注册插件: {plugin_name} (组件数量: {len(plugin_info.components)})")
return True
def get_plugin_info(self, plugin_name: str) -> Optional[PluginInfo]:
"""获取插件信息"""
return self._plugins.get(plugin_name)
def get_all_plugins(self) -> Dict[str, PluginInfo]:
"""获取所有插件"""
return self._plugins.copy()
def get_enabled_plugins(self) -> Dict[str, PluginInfo]:
"""获取所有启用的插件"""
return {name: info for name, info in self._plugins.items() if info.enabled}
def get_plugin_components(self, plugin_name: str) -> List[ComponentInfo]:
"""获取插件的所有组件"""
plugin_info = self.get_plugin_info(plugin_name)
return plugin_info.components if plugin_info else []
# === 状态管理方法 ===
def enable_component(self, component_name: str) -> bool:
"""启用组件"""
if component_name in self._components:
@@ -215,7 +218,7 @@ class ComponentRegistry:
logger.info(f"已启用组件: {component_name}")
return True
return False
def disable_component(self, component_name: str) -> bool:
"""禁用组件"""
if component_name in self._components:
@@ -226,15 +229,14 @@ class ComponentRegistry:
logger.info(f"已禁用组件: {component_name}")
return True
return False
def get_registry_stats(self) -> Dict[str, Any]:
"""获取注册中心统计信息"""
return {
"total_components": len(self._components),
"total_plugins": len(self._plugins),
"components_by_type": {
component_type.value: len(components)
for component_type, components in self._components_by_type.items()
component_type.value: len(components) for component_type, components in self._components_by_type.items()
},
"enabled_components": len([c for c in self._components.values() if c.enabled]),
"enabled_plugins": len([p for p in self._plugins.values() if p.enabled]),
@@ -242,4 +244,4 @@ class ComponentRegistry:
# 全局组件注册中心实例
component_registry = ComponentRegistry()
component_registry = ComponentRegistry()

View File

@@ -9,19 +9,20 @@ from src.plugin_system.base.component_types import PluginInfo, ComponentType
logger = get_logger("plugin_manager")
class PluginManager:
"""插件管理器
负责加载、初始化和管理所有插件及其组件
"""
def __init__(self):
self.plugin_directories: List[str] = []
self.loaded_plugins: Dict[str, Any] = {}
self.failed_plugins: Dict[str, str] = {}
logger.info("插件管理器初始化完成")
def add_plugin_directory(self, directory: str):
"""添加插件目录"""
if os.path.exists(directory):
@@ -29,141 +30,142 @@ class PluginManager:
logger.info(f"已添加插件目录: {directory}")
else:
logger.warning(f"插件目录不存在: {directory}")
def load_all_plugins(self) -> tuple[int, int]:
"""加载所有插件目录中的插件
Returns:
tuple[int, int]: (插件数量, 组件数量)
"""
logger.info("开始加载所有插件...")
# 第一阶段:加载所有插件模块(注册插件类)
total_loaded_modules = 0
total_failed_modules = 0
for directory in self.plugin_directories:
loaded, failed = self._load_plugin_modules_from_directory(directory)
total_loaded_modules += loaded
total_failed_modules += failed
logger.info(f"插件模块加载完成 - 成功: {total_loaded_modules}, 失败: {total_failed_modules}")
# 第二阶段:实例化所有已注册的插件类
from src.plugin_system.base.base_plugin import get_registered_plugin_classes, instantiate_and_register_plugin
plugin_classes = get_registered_plugin_classes()
total_registered = 0
total_failed_registration = 0
for plugin_name, plugin_class in plugin_classes.items():
# 尝试找到插件对应的目录
plugin_dir = self._find_plugin_directory(plugin_class)
if instantiate_and_register_plugin(plugin_class, plugin_dir):
total_registered += 1
self.loaded_plugins[plugin_name] = plugin_class
else:
total_failed_registration += 1
self.failed_plugins[plugin_name] = "插件注册失败"
logger.info(f"插件注册完成 - 成功: {total_registered}, 失败: {total_failed_registration}")
# 获取组件统计信息
stats = component_registry.get_registry_stats()
logger.info(f"组件注册统计: {stats}")
# 返回插件数量和组件数量
return total_registered, stats.get('total_components', 0)
return total_registered, stats.get("total_components", 0)
def _find_plugin_directory(self, plugin_class) -> Optional[str]:
"""查找插件类对应的目录路径"""
try:
import inspect
module = inspect.getmodule(plugin_class)
if module and hasattr(module, '__file__') and module.__file__:
if module and hasattr(module, "__file__") and module.__file__:
return os.path.dirname(module.__file__)
except Exception:
pass
return None
def _load_plugin_modules_from_directory(self, directory: str) -> tuple[int, int]:
"""从指定目录加载插件模块"""
loaded_count = 0
failed_count = 0
if not os.path.exists(directory):
logger.warning(f"插件目录不存在: {directory}")
return loaded_count, failed_count
logger.info(f"正在扫描插件目录: {directory}")
# 遍历目录中的所有Python文件和包
for item in os.listdir(directory):
item_path = os.path.join(directory, item)
if os.path.isfile(item_path) and item.endswith('.py') and item != '__init__.py':
if os.path.isfile(item_path) and item.endswith(".py") and item != "__init__.py":
# 单文件插件
if self._load_plugin_module_file(item_path):
loaded_count += 1
else:
failed_count += 1
elif os.path.isdir(item_path) and not item.startswith('.') and not item.startswith('__'):
elif os.path.isdir(item_path) and not item.startswith(".") and not item.startswith("__"):
# 插件包
plugin_file = os.path.join(item_path, 'plugin.py')
plugin_file = os.path.join(item_path, "plugin.py")
if os.path.exists(plugin_file):
if self._load_plugin_module_file(plugin_file):
loaded_count += 1
else:
failed_count += 1
return loaded_count, failed_count
def _load_plugin_module_file(self, plugin_file: str) -> bool:
"""加载单个插件模块文件"""
plugin_name = None
# 生成模块名
plugin_path = Path(plugin_file)
if plugin_path.parent.name != 'plugins':
if plugin_path.parent.name != "plugins":
# 插件包格式parent_dir.plugin
module_name = f"plugins.{plugin_path.parent.name}.plugin"
else:
# 单文件格式plugins.filename
module_name = f"plugins.{plugin_path.stem}"
try:
# 动态导入插件模块
spec = importlib.util.spec_from_file_location(module_name, plugin_file)
if spec is None or spec.loader is None:
logger.error(f"无法创建模块规范: {plugin_file}")
return False
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# 模块加载成功,插件类会自动通过装饰器注册
plugin_name = plugin_path.parent.name if plugin_path.parent.name != 'plugins' else plugin_path.stem
plugin_name = plugin_path.parent.name if plugin_path.parent.name != "plugins" else plugin_path.stem
logger.debug(f"插件模块加载成功: {plugin_file}")
return True
except Exception as e:
error_msg = f"加载插件模块 {plugin_file} 失败: {e}"
logger.error(error_msg)
if plugin_name:
self.failed_plugins[plugin_name] = error_msg
return False
def get_loaded_plugins(self) -> List[PluginInfo]:
"""获取所有已加载的插件信息"""
return list(component_registry.get_all_plugins().values())
def get_enabled_plugins(self) -> List[PluginInfo]:
"""获取所有启用的插件信息"""
return list(component_registry.get_enabled_plugins().values())
def enable_plugin(self, plugin_name: str) -> bool:
"""启用插件"""
plugin_info = component_registry.get_plugin_info(plugin_name)
@@ -175,7 +177,7 @@ class PluginManager:
logger.info(f"已启用插件: {plugin_name}")
return True
return False
def disable_plugin(self, plugin_name: str) -> bool:
"""禁用插件"""
plugin_info = component_registry.get_plugin_info(plugin_name)
@@ -187,15 +189,15 @@ class PluginManager:
logger.info(f"已禁用插件: {plugin_name}")
return True
return False
def get_plugin_stats(self) -> Dict[str, Any]:
"""获取插件统计信息"""
all_plugins = component_registry.get_all_plugins()
enabled_plugins = component_registry.get_enabled_plugins()
action_components = component_registry.get_components_by_type(ComponentType.ACTION)
command_components = component_registry.get_components_by_type(ComponentType.COMMAND)
return {
"total_plugins": len(all_plugins),
"enabled_plugins": len(enabled_plugins),
@@ -204,9 +206,9 @@ class PluginManager:
"action_components": len(action_components),
"command_components": len(command_components),
"loaded_plugin_files": len(self.loaded_plugins),
"failed_plugin_details": self.failed_plugins.copy()
"failed_plugin_details": self.failed_plugins.copy(),
}
def reload_plugin(self, plugin_name: str) -> bool:
"""重新加载插件(高级功能,需要谨慎使用)"""
# TODO: 实现插件热重载功能
@@ -219,5 +221,5 @@ plugin_manager = PluginManager()
# 默认插件目录
plugin_manager.add_plugin_directory("src/plugins/built_in")
plugin_manager.add_plugin_directory("src/plugins/examples")
plugin_manager.add_plugin_directory("plugins") # 用户插件目录
plugin_manager.add_plugin_directory("src/plugins/examples")
plugin_manager.add_plugin_directory("plugins") # 用户插件目录

View File

@@ -13,8 +13,13 @@ from typing import List, Tuple, Type, Optional
# 使用简化的导入接口
from src.plugin_system import (
BasePlugin, register_plugin, BaseAction, BaseCommand,
ComponentInfo, ActionInfo, CommandInfo, ActionActivationType, ChatMode
BasePlugin,
register_plugin,
BaseAction,
BaseCommand,
ComponentInfo,
ActionActivationType,
ChatMode,
)
from src.common.logger_manager import get_logger
@@ -23,7 +28,7 @@ logger = get_logger("simple_plugin")
class HelloAction(BaseAction):
"""智能问候Action组件"""
# ✅ 现在可以直接在类中定义激活条件!
focus_activation_type = ActionActivationType.KEYWORD
normal_activation_type = ActionActivationType.KEYWORD
@@ -31,17 +36,17 @@ class HelloAction(BaseAction):
keyword_case_sensitive = False
mode_enable = ChatMode.ALL
parallel_action = False
async def execute(self) -> Tuple[bool, str]:
"""执行问候动作"""
username = self.action_data.get("username", "朋友")
# 使用配置文件中的问候消息
plugin_instance = SimplePlugin()
greeting_template = plugin_instance.get_config("hello_action.greeting_message", "你好,{username}")
enable_emoji = plugin_instance.get_config("hello_action.enable_emoji", True)
enable_llm = plugin_instance.get_config("hello_action.enable_llm_greeting", False)
# 如果启用LLM生成个性化问候
if enable_llm:
try:
@@ -50,99 +55,96 @@ class HelloAction(BaseAction):
if models:
first_model = list(models.values())[0]
prompt = f"为用户名叫{username}的朋友生成一句温暖的个性化问候语不超过30字"
success, response, _, _ = await self.api.generate_with_model(
prompt=prompt,
model_config=first_model
prompt=prompt, model_config=first_model
)
if success:
logger.info(f"{self.log_prefix} 使用LLM生成问候: {response}")
return True, response
except Exception as e:
logger.warning(f"{self.log_prefix} LLM生成问候失败使用默认模板: {e}")
# 构建基础问候消息
response = greeting_template.format(username=username)
if enable_emoji:
response += " 😊"
# 演示存储Action执行记录到数据库
await self.api.store_action_info(
action_build_into_prompt=False,
action_prompt_display=f"问候了用户: {username}",
action_done=True
action_build_into_prompt=False, action_prompt_display=f"问候了用户: {username}", action_done=True
)
logger.info(f"{self.log_prefix} 执行问候动作: {username}")
return True, response
class EchoCommand(BaseCommand):
"""回声命令 - 重复用户输入"""
# ✅ 现在可以直接在类中定义命令模式!
command_pattern = r"^/echo\s+(?P<message>.+)$"
command_help = "重复消息,用法:/echo <消息内容>"
command_examples = ["/echo Hello World", "/echo 你好世界"]
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行回声命令"""
# 获取匹配的参数
message = self.matched_groups.get("message", "")
if not message:
response = "请提供要重复的消息!用法:/echo <消息内容>"
else:
response = f"🔊 {message}"
# 发送回复
await self.send_reply(response)
logger.info(f"{self.log_prefix} 执行回声命令: {message}")
return True, response
class StatusCommand(BaseCommand):
"""状态查询Command组件"""
# ✅ 直接定义命令模式
command_pattern = r"^/status\s*(?P<type>\w+)?$"
command_help = "查询系统状态,用法:/status [类型]"
command_examples = ["/status", "/status 系统", "/status 插件"]
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行状态查询命令"""
# 获取匹配的参数
query_type = self.matched_groups.get("type", "系统")
# 从配置文件获取设置
plugin_instance = SimplePlugin()
show_detailed = plugin_instance.get_config("status_command.show_detailed_info", True)
allowed_types = plugin_instance.get_config("status_command.allowed_types", ["系统", "插件"])
if query_type not in allowed_types:
response = f"不支持的查询类型: {query_type}\n支持的类型: {', '.join(allowed_types)}"
elif show_detailed:
response = f"📊 {query_type}状态详情:\n✅ 运行正常\n🔧 版本: 1.0.0\n⚡ 性能: 良好"
else:
response = f"{query_type}状态:正常"
# 发送回复
await self.send_reply(response)
logger.info(f"{self.log_prefix} 执行状态查询: {query_type}")
return True, response
class HelpCommand(BaseCommand):
"""帮助命令 - 显示插件功能"""
# ✅ 直接定义命令模式
command_pattern = r"^/help$"
command_help = "显示插件帮助信息"
command_examples = ["/help"]
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行帮助命令"""
help_text = """
@@ -161,9 +163,9 @@ class HelpCommand(BaseCommand):
💡 这是新插件系统的完整示例展示了Action和Command的结合使用。
""".strip()
await self.send_reply(help_text)
logger.info(f"{self.log_prefix} 显示帮助信息")
return True, "已显示帮助信息"
@@ -171,10 +173,10 @@ class HelpCommand(BaseCommand):
@register_plugin
class SimplePlugin(BasePlugin):
"""完整示例插件
包含多个Action和Command组件展示插件系统的完整功能
"""
# 插件基本信息
plugin_name = "simple_plugin"
plugin_description = "完整的示例插件,展示新插件系统的各种功能"
@@ -182,14 +184,14 @@ class SimplePlugin(BasePlugin):
plugin_author = "MaiBot开发团队"
enable_plugin = True
config_file_name = "config.toml" # 配置文件
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
"""返回插件包含的组件列表"""
# ✅ 现在可以直接从类属性生成组件信息!
return [
(HelloAction.get_action_info("hello_action", "智能问候动作,支持自定义消息和表情"), HelloAction),
(EchoCommand.get_command_info("echo_command", "回声命令,重复用户输入的消息"), EchoCommand),
(StatusCommand.get_command_info("status_command", "状态查询命令,支持多种查询类型"), StatusCommand),
(HelpCommand.get_command_info("help_command", "帮助命令,显示插件功能说明"), HelpCommand)
]
(HelpCommand.get_command_info("help_command", "帮助命令,显示插件功能说明"), HelpCommand),
]