feat:重构插件api

This commit is contained in:
SengokuCola
2025-06-10 15:28:36 +08:00
parent 2edece11ea
commit 4d32b3052f
30 changed files with 2429 additions and 471 deletions

View File

@@ -0,0 +1,27 @@
"""
插件基础类模块
提供插件开发的基础类和类型定义
"""
from src.plugin_system.base.base_plugin import BasePlugin, register_plugin
from src.plugin_system.base.base_action import BaseAction
from src.plugin_system.base.base_command import BaseCommand
from src.plugin_system.base.component_types import (
ComponentType, ActionActivationType, ChatMode,
ComponentInfo, ActionInfo, CommandInfo, PluginInfo
)
__all__ = [
'BasePlugin',
'BaseAction',
'BaseCommand',
'register_plugin',
'ComponentType',
'ActionActivationType',
'ChatMode',
'ComponentInfo',
'ActionInfo',
'CommandInfo',
'PluginInfo',
]

View File

@@ -0,0 +1,120 @@
from abc import ABC, abstractmethod
from typing import Tuple, Dict, Any, Optional
from src.common.logger_manager import get_logger
from src.plugin_system.apis.plugin_api import PluginAPI
from src.plugin_system.base.component_types import ActionActivationType, ChatMode, ActionInfo, ComponentType
logger = get_logger("base_action")
class BaseAction(ABC):
"""Action组件基类
Action是插件的一种组件类型用于处理聊天中的动作逻辑
子类可以通过类属性定义激活条件:
- focus_activation_type: 专注模式激活类型
- normal_activation_type: 普通模式激活类型
- activation_keywords: 激活关键词列表
- keyword_case_sensitive: 关键词是否区分大小写
- mode_enable: 启用的聊天模式
- parallel_action: 是否允许并行执行
- random_activation_probability: 随机激活概率
- llm_judge_prompt: LLM判断提示词
"""
# 默认激活设置(子类可以覆盖)
focus_activation_type: ActionActivationType = ActionActivationType.NEVER
normal_activation_type: ActionActivationType = ActionActivationType.NEVER
activation_keywords: list = []
keyword_case_sensitive: bool = False
mode_enable: ChatMode = ChatMode.ALL
parallel_action: bool = True
random_activation_probability: float = 0.0
llm_judge_prompt: str = ""
def __init__(self,
action_data: dict,
reasoning: str,
cycle_timers: dict,
thinking_id: str,
**kwargs):
"""初始化Action组件
Args:
action_data: 动作数据
reasoning: 执行该动作的理由
cycle_timers: 计时器字典
thinking_id: 思考ID
**kwargs: 其他参数(包含服务对象)
"""
self.action_data = action_data
self.reasoning = reasoning
self.cycle_timers = cycle_timers
self.thinking_id = thinking_id
# 创建API实例
self.api = PluginAPI(
chat_stream=kwargs.get("chat_stream"),
expressor=kwargs.get("expressor"),
replyer=kwargs.get("replyer"),
observations=kwargs.get("observations"),
log_prefix=kwargs.get("log_prefix", "")
)
self.log_prefix = kwargs.get("log_prefix", "")
logger.debug(f"{self.log_prefix} Action组件初始化完成")
async def send_reply(self, content: str) -> bool:
"""发送回复消息
Args:
content: 回复内容
Returns:
bool: 是否发送成功
"""
return await self.api.send_message("text", content)
@classmethod
def get_action_info(cls, name: str = None, description: str = None) -> 'ActionInfo':
"""从类属性生成ActionInfo
Args:
name: Action名称如果不提供则使用类名
description: Action描述如果不提供则使用类文档字符串
Returns:
ActionInfo: 生成的Action信息对象
"""
# 自动生成名称和描述
if name is None:
name = cls.__name__.lower().replace('action', '')
if description is None:
description = cls.__doc__ or f"{cls.__name__} Action组件"
description = description.strip().split('\n')[0] # 取第一行作为描述
return ActionInfo(
name=name,
component_type=ComponentType.ACTION,
description=description,
focus_activation_type=cls.focus_activation_type,
normal_activation_type=cls.normal_activation_type,
activation_keywords=cls.activation_keywords.copy() if cls.activation_keywords else [],
keyword_case_sensitive=cls.keyword_case_sensitive,
mode_enable=cls.mode_enable,
parallel_action=cls.parallel_action,
random_activation_probability=cls.random_activation_probability,
llm_judge_prompt=cls.llm_judge_prompt
)
@abstractmethod
async def execute(self) -> Tuple[bool, str]:
"""执行Action的抽象方法子类必须实现
Returns:
Tuple[bool, str]: (是否执行成功, 回复文本)
"""
pass

View File

@@ -0,0 +1,113 @@
from abc import ABC, abstractmethod
from typing import Dict, Tuple, Optional, List
from src.common.logger_manager import get_logger
from src.plugin_system.apis.plugin_api import PluginAPI
from src.plugin_system.base.component_types import CommandInfo, ComponentType
from src.chat.message_receive.message import MessageRecv
logger = get_logger("base_command")
class BaseCommand(ABC):
"""Command组件基类
Command是插件的一种组件类型用于处理命令请求
子类可以通过类属性定义命令模式:
- command_pattern: 命令匹配的正则表达式
- command_help: 命令帮助信息
- command_examples: 命令使用示例列表
"""
# 默认命令设置(子类可以覆盖)
command_pattern: str = ""
command_help: str = ""
command_examples: List[str] = []
def __init__(self, message: MessageRecv):
"""初始化Command组件
Args:
message: 接收到的消息对象
"""
self.message = message
self.matched_groups: Dict[str, str] = {} # 存储正则表达式匹配的命名组
# 创建API实例
self.api = PluginAPI(
chat_stream=message.chat_stream,
log_prefix=f"[Command]"
)
self.log_prefix = f"[Command]"
logger.debug(f"{self.log_prefix} Command组件初始化完成")
def set_matched_groups(self, groups: Dict[str, str]) -> None:
"""设置正则表达式匹配的命名组
Args:
groups: 正则表达式匹配的命名组
"""
self.matched_groups = groups
@abstractmethod
async def execute(self) -> Tuple[bool, Optional[str]]:
"""执行Command的抽象方法子类必须实现
Returns:
Tuple[bool, Optional[str]]: (是否执行成功, 可选的回复消息)
"""
pass
async def send_reply(self, content: str) -> None:
"""发送回复消息
Args:
content: 回复内容
"""
# 获取聊天流信息
chat_stream = self.message.chat_stream
if chat_stream.group_info:
# 群聊
await self.api.send_text_to_group(
text=content,
group_id=str(chat_stream.group_info.group_id),
platform=chat_stream.platform
)
else:
# 私聊
await self.api.send_text_to_user(
text=content,
user_id=str(chat_stream.user_info.user_id),
platform=chat_stream.platform
)
@classmethod
def get_command_info(cls, name: str = None, description: str = None) -> 'CommandInfo':
"""从类属性生成CommandInfo
Args:
name: Command名称如果不提供则使用类名
description: Command描述如果不提供则使用类文档字符串
Returns:
CommandInfo: 生成的Command信息对象
"""
# 自动生成名称和描述
if name is None:
name = cls.__name__.lower().replace('command', '')
if description is None:
description = cls.__doc__ or f"{cls.__name__} Command组件"
description = description.strip().split('\n')[0] # 取第一行作为描述
return CommandInfo(
name=name,
component_type=ComponentType.COMMAND,
description=description,
command_pattern=cls.command_pattern,
command_help=cls.command_help,
command_examples=cls.command_examples.copy() if cls.command_examples else []
)

View File

@@ -0,0 +1,226 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Type, Optional, Any
import os
import inspect
import toml
from src.common.logger_manager import get_logger
from src.plugin_system.base.component_types import (
PluginInfo, ComponentInfo, ActionInfo, CommandInfo,
ComponentType, ActionActivationType, ChatMode
)
from src.plugin_system.core.component_registry import component_registry
logger = get_logger("base_plugin")
# 全局插件类注册表
_plugin_classes: Dict[str, Type['BasePlugin']] = {}
class BasePlugin(ABC):
"""插件基类
所有插件都应该继承这个基类,一个插件可以包含多种组件:
- Action组件处理聊天中的动作
- Command组件处理命令请求
- 未来可扩展Scheduler、Listener等
"""
# 插件基本信息(子类必须定义)
plugin_name: str = "" # 插件名称
plugin_description: str = "" # 插件描述
plugin_version: str = "1.0.0" # 插件版本
plugin_author: str = "" # 插件作者
enable_plugin: bool = True # 是否启用插件
dependencies: List[str] = [] # 依赖的其他插件
config_file_name: Optional[str] = None # 配置文件名
def __init__(self, plugin_dir: str = None):
"""初始化插件
Args:
plugin_dir: 插件目录路径,由插件管理器传递
"""
self.config: Dict[str, Any] = {} # 插件配置
self.plugin_dir = plugin_dir # 插件目录路径
self.log_prefix = f"[Plugin:{self.plugin_name}]"
# 验证插件信息
self._validate_plugin_info()
# 加载插件配置
self._load_plugin_config()
# 创建插件信息对象
self.plugin_info = PluginInfo(
name=self.plugin_name,
description=self.plugin_description,
version=self.plugin_version,
author=self.plugin_author,
enabled=self.enable_plugin,
is_built_in=False,
config_file=self.config_file_name or "",
dependencies=self.dependencies.copy()
)
logger.debug(f"{self.log_prefix} 插件基类初始化完成")
def _validate_plugin_info(self):
"""验证插件基本信息"""
if not self.plugin_name:
raise ValueError(f"插件类 {self.__class__.__name__} 必须定义 plugin_name")
if not self.plugin_description:
raise ValueError(f"插件 {self.plugin_name} 必须定义 plugin_description")
def _load_plugin_config(self):
"""加载插件配置文件"""
if not self.config_file_name:
logger.debug(f"{self.log_prefix} 未指定配置文件,跳过加载")
return
# 优先使用传入的插件目录路径
if self.plugin_dir:
plugin_dir = self.plugin_dir
else:
# fallback尝试从类的模块信息获取路径
try:
plugin_module_path = inspect.getfile(self.__class__)
plugin_dir = os.path.dirname(plugin_module_path)
except (TypeError, OSError):
# 最后的fallback从模块的__file__属性获取
module = inspect.getmodule(self.__class__)
if module and hasattr(module, '__file__') and module.__file__:
plugin_dir = os.path.dirname(module.__file__)
else:
logger.warning(f"{self.log_prefix} 无法获取插件目录路径,跳过配置加载")
return
config_file_path = os.path.join(plugin_dir, self.config_file_name)
if not os.path.exists(config_file_path):
logger.warning(f"{self.log_prefix} 配置文件 {config_file_path} 不存在")
return
file_ext = os.path.splitext(self.config_file_name)[1].lower()
if file_ext == ".toml":
with open(config_file_path, "r", encoding="utf-8") as f:
self.config = toml.load(f) or {}
logger.info(f"{self.log_prefix} 配置已从 {config_file_path} 加载")
else:
logger.warning(f"{self.log_prefix} 不支持的配置文件格式: {file_ext},仅支持 .toml")
self.config = {}
@abstractmethod
def get_plugin_components(self) -> List[tuple[ComponentInfo, Type]]:
"""获取插件包含的组件列表
子类必须实现此方法,返回组件信息和组件类的列表
Returns:
List[tuple[ComponentInfo, Type]]: [(组件信息, 组件类), ...]
"""
pass
def register_plugin(self) -> bool:
"""注册插件及其所有组件"""
if not self.enable_plugin:
logger.info(f"{self.log_prefix} 插件已禁用,跳过注册")
return False
components = self.get_plugin_components()
# 检查依赖
if not self._check_dependencies():
logger.error(f"{self.log_prefix} 依赖检查失败,跳过注册")
return False
# 注册所有组件
registered_components = []
for component_info, component_class in components:
component_info.plugin_name = self.plugin_name
if component_registry.register_component(component_info, component_class):
registered_components.append(component_info)
else:
logger.warning(f"{self.log_prefix} 组件 {component_info.name} 注册失败")
# 更新插件信息中的组件列表
self.plugin_info.components = registered_components
# 注册插件
if component_registry.register_plugin(self.plugin_info):
logger.info(f"{self.log_prefix} 插件注册成功,包含 {len(registered_components)} 个组件")
return True
else:
logger.error(f"{self.log_prefix} 插件注册失败")
return False
def _check_dependencies(self) -> bool:
"""检查插件依赖"""
if not self.dependencies:
return True
for dep in self.dependencies:
if not component_registry.get_plugin_info(dep):
logger.error(f"{self.log_prefix} 缺少依赖插件: {dep}")
return False
return True
def get_config(self, key: str, default: Any = None) -> Any:
"""获取插件配置值
Args:
key: 配置键名
default: 默认值
Returns:
Any: 配置值或默认值
"""
return self.config.get(key, default)
def register_plugin(cls):
"""插件注册装饰器
用法:
@register_plugin
class MyPlugin(BasePlugin):
plugin_name = "my_plugin"
plugin_description = "我的插件"
...
"""
if not issubclass(cls, BasePlugin):
logger.error(f"{cls.__name__} 不是 BasePlugin 的子类")
return cls
# 只是注册插件类,不立即实例化
# 插件管理器会负责实例化和注册
plugin_name = cls.plugin_name or cls.__name__
_plugin_classes[plugin_name] = cls
logger.debug(f"插件类已注册: {plugin_name}")
return cls
def get_registered_plugin_classes() -> Dict[str, Type['BasePlugin']]:
"""获取所有已注册的插件类"""
return _plugin_classes.copy()
def instantiate_and_register_plugin(plugin_class: Type['BasePlugin'], plugin_dir: str = None) -> bool:
"""实例化并注册插件
Args:
plugin_class: 插件类
plugin_dir: 插件目录路径
Returns:
bool: 是否成功
"""
try:
plugin_instance = plugin_class(plugin_dir=plugin_dir)
return plugin_instance.register_plugin()
except Exception as e:
logger.error(f"注册插件 {plugin_class.__name__} 时出错: {e}")
import traceback
logger.error(traceback.format_exc())
return False

View File

@@ -0,0 +1,104 @@
from enum import Enum
from typing import Dict, Any, List
from dataclasses import dataclass
# 组件类型枚举
class ComponentType(Enum):
"""组件类型枚举"""
ACTION = "action" # 动作组件
COMMAND = "command" # 命令组件
SCHEDULER = "scheduler" # 定时任务组件(预留)
LISTENER = "listener" # 事件监听组件(预留)
# 动作激活类型枚举
class ActionActivationType(Enum):
"""动作激活类型枚举"""
NEVER = "never" # 从不激活(默认关闭)
ALWAYS = "always" # 默认参与到planner
LLM_JUDGE = "llm_judge" # LLM判定是否启动该action到planner
RANDOM = "random" # 随机启用action到planner
KEYWORD = "keyword" # 关键词触发启用action到planner
# 聊天模式枚举
class ChatMode(Enum):
"""聊天模式枚举"""
FOCUS = "focus" # Focus聊天模式
NORMAL = "normal" # Normal聊天模式
ALL = "all" # 所有聊天模式
@dataclass
class ComponentInfo:
"""组件信息"""
name: str # 组件名称
component_type: ComponentType # 组件类型
description: str # 组件描述
enabled: bool = True # 是否启用
plugin_name: str = "" # 所属插件名称
is_built_in: bool = False # 是否为内置组件
metadata: Dict[str, Any] = None # 额外元数据
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
@dataclass
class ActionInfo(ComponentInfo):
"""动作组件信息"""
focus_activation_type: ActionActivationType = ActionActivationType.ALWAYS
normal_activation_type: ActionActivationType = ActionActivationType.ALWAYS
random_activation_probability: float = 0.3
llm_judge_prompt: str = ""
activation_keywords: List[str] = None
keyword_case_sensitive: bool = False
mode_enable: ChatMode = ChatMode.ALL
parallel_action: bool = False
action_parameters: Dict[str, Any] = None
action_require: List[str] = None
associated_types: List[str] = None
def __post_init__(self):
super().__post_init__()
if self.activation_keywords is None:
self.activation_keywords = []
if self.action_parameters is None:
self.action_parameters = {}
if self.action_require is None:
self.action_require = []
if self.associated_types is None:
self.associated_types = []
self.component_type = ComponentType.ACTION
@dataclass
class CommandInfo(ComponentInfo):
"""命令组件信息"""
command_pattern: str = "" # 命令匹配模式(正则表达式)
command_help: str = "" # 命令帮助信息
command_examples: List[str] = None # 命令使用示例
def __post_init__(self):
super().__post_init__()
if self.command_examples is None:
self.command_examples = []
self.component_type = ComponentType.COMMAND
@dataclass
class PluginInfo:
"""插件信息"""
name: str # 插件名称
description: str # 插件描述
version: str = "1.0.0" # 插件版本
author: str = "" # 插件作者
enabled: bool = True # 是否启用
is_built_in: bool = False # 是否为内置插件
components: List[ComponentInfo] = None # 包含的组件列表
dependencies: List[str] = None # 依赖的其他插件
config_file: str = "" # 配置文件路径
metadata: Dict[str, Any] = None # 额外元数据
def __post_init__(self):
if self.components is None:
self.components = []
if self.dependencies is None:
self.dependencies = []
if self.metadata is None:
self.metadata = {}