refactor(plugin): 引入旧版Command兼容层并重构基类

为了平滑过渡到新的`PlusCommand`插件架构,本次重构引入了一个兼容层。

`BaseCommand`现在继承自`PlusCommand`,并剥离了大部分重复的功能实现(如消息发送、配置获取等),转而依赖`PlusCommand`的基类实现。这大大简化了`BaseCommand`,使其专注于作为旧版插件的兼容适配器。

在组件注册流程中,增加了对旧版`BaseCommand`的识别。当检测到旧版命令时,会自动使用`create_legacy_command_adapter`工厂函数将其包装成一个标准的`PlusCommand`实例。这使得旧插件无需修改代码即可在新架构下运行,同时会在启动时打印警告,鼓励开发者迁移。
This commit is contained in:
minecraft1024a
2025-11-01 16:12:02 +08:00
parent 5e7f17ebf9
commit 0033de2d32
3 changed files with 91 additions and 240 deletions

View File

@@ -1,10 +1,10 @@
from abc import ABC, abstractmethod from abc import abstractmethod
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from src.common.data_models.database_data_model import DatabaseMessages from src.common.data_models.database_data_model import DatabaseMessages
from src.common.logger import get_logger from src.common.logger import get_logger
from src.plugin_system.apis import send_api
from src.plugin_system.base.component_types import ChatType, CommandInfo, ComponentType from src.plugin_system.base.component_types import ChatType, CommandInfo, ComponentType
from src.plugin_system.base.plus_command import PlusCommand
if TYPE_CHECKING: if TYPE_CHECKING:
from src.chat.message_receive.chat_stream import ChatStream from src.chat.message_receive.chat_stream import ChatStream
@@ -12,17 +12,18 @@ if TYPE_CHECKING:
logger = get_logger("base_command") logger = get_logger("base_command")
class BaseCommand(ABC): class BaseCommand(PlusCommand):
"""Command组件基类 """旧版Command组件基类(兼容层)
Command是插件的一种组件类型用于处理命令请求 此类作为旧版插件的兼容层新的插件开发请使用PlusCommand
子类可以通过类属性定义命令模式: 子类可以通过类属性定义命令模式:
- command_pattern: 命令匹配的正则表达式 - command_pattern: 命令匹配的正则表达式
- command_help: 命令帮助信息
- command_examples: 命令使用示例列表
""" """
# 旧版命令标识
_is_legacy: bool = True
command_name: str = "" command_name: str = ""
"""Command组件的名称""" """Command组件的名称"""
command_description: str = "" command_description: str = ""
@@ -30,237 +31,35 @@ class BaseCommand(ABC):
# 默认命令设置 # 默认命令设置
command_pattern: str = r"" command_pattern: str = r""
"""命令匹配的正则表达式""" """命令匹配的正则表达式"""
chat_type_allow: ChatType = ChatType.ALL
"""允许的聊天类型,默认为所有类型""" # 用于存储正则匹配组
matched_groups: dict[str, str] = {}
def __init__(self, message: DatabaseMessages, plugin_config: dict | None = None): def __init__(self, message: DatabaseMessages, plugin_config: dict | None = None):
"""初始化Command组件 """初始化Command组件"""
# 调用PlusCommand的初始化
Args: super().__init__(message, plugin_config)
message: 接收到的消息对象DatabaseMessages
plugin_config: 插件配置字典
"""
self.message = message
self.matched_groups: dict[str, str] = {} # 存储正则表达式匹配的命名组
self.plugin_config = plugin_config or {} # 直接存储插件配置字典
# 旧版属性兼容
self.log_prefix = "[Command]" self.log_prefix = "[Command]"
self.matched_groups = {} # 初始化为空
# chat_stream 会在运行时被 bot.py 设置
self.chat_stream: "ChatStream | None" = None
# 从类属性获取chat_type_allow设置
self.chat_type_allow = getattr(self.__class__, "chat_type_allow", ChatType.ALL)
logger.debug(f"{self.log_prefix} Command组件初始化完成")
# 验证聊天类型限制
if not self._validate_chat_type():
is_group = message.group_info is not None
logger.warning(
f"{self.log_prefix} Command '{self.command_name}' 不支持当前聊天类型: "
f"{'群聊' if is_group else '私聊'}, 允许类型: {self.chat_type_allow.value}"
)
def set_matched_groups(self, groups: dict[str, str]) -> None: def set_matched_groups(self, groups: dict[str, str]) -> None:
"""设置正则表达式匹配的命名组 """设置正则表达式匹配的命名组"""
Args:
groups: 正则表达式匹配的命名组
"""
self.matched_groups = groups self.matched_groups = groups
def _validate_chat_type(self) -> bool:
"""验证当前聊天类型是否允许执行此Command
Returns:
bool: 如果允许执行返回True否则返回False
"""
if self.chat_type_allow == ChatType.ALL:
return True
# 检查是否为群聊消息DatabaseMessages使用group_info来判断
is_group = self.message.group_info is not None
if self.chat_type_allow == ChatType.GROUP and is_group:
return True
elif self.chat_type_allow == ChatType.PRIVATE and not is_group:
return True
else:
return False
def is_chat_type_allowed(self) -> bool:
"""检查当前聊天类型是否允许执行此Command
这是一个公开的方法,供外部调用检查聊天类型限制
Returns:
bool: 如果允许执行返回True否则返回False
"""
return self._validate_chat_type()
@abstractmethod @abstractmethod
async def execute(self) -> tuple[bool, str | None, bool]: async def execute(self) -> tuple[bool, str | None, bool]:
"""执行Command的抽象方法子类必须实现 """执行Command的抽象方法子类必须实现
Returns: Returns:
Tuple[bool, Optional[str], bool]: (是否执行成功, 可选的回复消息, 是否拦截消息 不进行 后续处理) Tuple[bool, Optional[str], bool]: (是否执行成功, 可选的回复消息, 是否拦截消息)
""" """
pass pass
def get_config(self, key: str, default=None):
"""获取插件配置值,使用嵌套键访问
Args:
key: 配置键名,使用嵌套访问如 "section.subsection.key"
default: 默认值
Returns:
Any: 配置值或默认值
"""
if not self.plugin_config:
return default
# 支持嵌套键访问
keys = key.split(".")
current = self.plugin_config
for k in keys:
if isinstance(current, dict) and k in current:
current = current[k]
else:
return default
return current
async def send_text(self, content: str, reply_to: str = "") -> bool:
"""发送回复消息
Args:
content: 回复内容
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
# 获取聊天流信息
if not self.chat_stream or not hasattr(self.chat_stream, "stream_id"):
logger.error(f"{self.log_prefix} 缺少聊天流或stream_id")
return False
return await send_api.text_to_stream(text=content, stream_id=self.chat_stream.stream_id, reply_to=reply_to)
async def send_type(
self, message_type: str, content: str, display_message: str = "", typing: bool = False, reply_to: str = ""
) -> bool:
"""发送指定类型的回复消息到当前聊天环境
Args:
message_type: 消息类型,如"text""image""emoji"
content: 消息内容
display_message: 显示消息(可选)
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
Returns:
bool: 是否发送成功
"""
# 获取聊天流信息
if not self.chat_stream or not hasattr(self.chat_stream, "stream_id"):
logger.error(f"{self.log_prefix} 缺少聊天流或stream_id")
return False
return await send_api.custom_to_stream(
message_type=message_type,
content=content,
stream_id=self.chat_stream.stream_id,
display_message=display_message,
typing=typing,
reply_to=reply_to,
)
async def send_command(
self, command_name: str, args: dict | None = None, display_message: str = "", storage_message: bool = True
) -> bool:
"""发送命令消息
Args:
command_name: 命令名称
args: 命令参数
display_message: 显示消息
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
try:
# 获取聊天流信息
if not self.chat_stream or not hasattr(self.chat_stream, "stream_id"):
logger.error(f"{self.log_prefix} 缺少聊天流或stream_id")
return False
# 构造命令数据
command_data = {"name": command_name, "args": args or {}}
success = await send_api.command_to_stream(
command=command_data,
stream_id=self.chat_stream.stream_id,
storage_message=storage_message,
display_message=display_message,
)
if success:
logger.info(f"{self.log_prefix} 成功发送命令: {command_name}")
else:
logger.error(f"{self.log_prefix} 发送命令失败: {command_name}")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送命令时出错: {e}")
return False
async def send_emoji(self, emoji_base64: str) -> bool:
"""发送表情包
Args:
emoji_base64: 表情包的base64编码
Returns:
bool: 是否发送成功
"""
if not self.chat_stream or not hasattr(self.chat_stream, "stream_id"):
logger.error(f"{self.log_prefix} 缺少聊天流或stream_id")
return False
return await send_api.emoji_to_stream(emoji_base64, self.chat_stream.stream_id)
async def send_image(self, image_base64: str) -> bool:
"""发送图片
Args:
image_base64: 图片的base64编码
Returns:
bool: 是否发送成功
"""
if not self.chat_stream or not hasattr(self.chat_stream, "stream_id"):
logger.error(f"{self.log_prefix} 缺少聊天流或stream_id")
return False
return await send_api.image_to_stream(image_base64, self.chat_stream.stream_id)
@classmethod @classmethod
def get_command_info(cls) -> "CommandInfo": def get_command_info(cls) -> "CommandInfo":
"""从类属性生成CommandInfo """从类属性生成CommandInfo"""
Args:
name: Command名称如果不提供则使用类名
description: Command描述如果不提供则使用类文档字符串
Returns:
CommandInfo: 生成的Command信息对象
"""
if "." in cls.command_name: if "." in cls.command_name:
logger.error(f"Command名称 '{cls.command_name}' 包含非法字符 '.',请使用下划线替代") logger.error(f"Command名称 '{cls.command_name}' 包含非法字符 '.',请使用下划线替代")
raise ValueError(f"Command名称 '{cls.command_name}' 包含非法字符 '.',请使用下划线替代") raise ValueError(f"Command名称 '{cls.command_name}' 包含非法字符 '.',请使用下划线替代")

View File

@@ -435,3 +435,62 @@ def create_plus_command_adapter(plus_command_class):
return AdapterClass return AdapterClass
def create_legacy_command_adapter(legacy_command_class):
"""为旧版BaseCommand创建适配器的工厂函数
Args:
legacy_command_class: BaseCommand的子类
Returns:
适配器类继承自PlusCommand
"""
class LegacyAdapter(PlusCommand):
# 从旧命令类中继承元数据
command_name = legacy_command_class.command_name
command_description = legacy_command_class.command_description
chat_type_allow = getattr(legacy_command_class, "chat_type_allow", ChatType.ALL)
intercept_message = False # 旧命令默认为False
def __init__(self, message: DatabaseMessages, plugin_config: dict | None = None):
super().__init__(message, plugin_config)
# 实例化旧命令
self.legacy_command = legacy_command_class(message, plugin_config)
# 将chat_stream传递给旧命令实例
self.legacy_command.chat_stream = self.chat_stream
def is_command_match(self) -> bool:
"""使用旧命令的正则表达式进行匹配"""
if not self.message.processed_plain_text:
return False
pattern = getattr(self.legacy_command, "command_pattern", "")
if not pattern:
return False
match = re.match(pattern, self.message.processed_plain_text)
if match:
# 存储匹配组以便旧命令的execute可以访问
self.legacy_command.set_matched_groups(match.groupdict())
return True
return False
async def execute(self, args: CommandArgs) -> tuple[bool, str | None, bool]:
"""执行旧命令的execute方法"""
# 检查聊天类型
if not self.legacy_command.is_chat_type_allowed():
return False, "不支持当前聊天类型", self.intercept_message
# 执行旧命令
try:
# 旧的execute不接收args参数
return await self.legacy_command.execute()
except Exception as e:
logger.error(f"执行旧版命令 '{self.command_name}' 时出错: {e}", exc_info=True)
return False, f"命令执行出错: {e!s}", self.intercept_message
return LegacyAdapter

View File

@@ -26,7 +26,7 @@ from src.plugin_system.base.component_types import (
PromptInfo, PromptInfo,
ToolInfo, ToolInfo,
) )
from src.plugin_system.base.plus_command import PlusCommand from src.plugin_system.base.plus_command import PlusCommand, create_legacy_command_adapter
logger = get_logger("component_registry") logger = get_logger("component_registry")
@@ -221,25 +221,18 @@ class ComponentRegistry:
def _register_command_component(self, command_info: CommandInfo, command_class: type[BaseCommand]) -> bool: def _register_command_component(self, command_info: CommandInfo, command_class: type[BaseCommand]) -> bool:
"""注册Command组件到Command特定注册表""" """注册Command组件到Command特定注册表"""
if not (command_name := command_info.name): # 检查是否为旧版Command
logger.error(f"Command组件 {command_class.__name__} 必须指定名称") if getattr(command_class, "_is_legacy", False):
return False logger.warning(
if not isinstance(command_info, CommandInfo) or not issubclass(command_class, BaseCommand): f"检测到旧版Command组件 '{command_class.command_name}' (来自插件: {command_info.plugin_name})。"
logger.error(f"注册失败: {command_name} 不是有效的Command") "它将通过兼容层运行但建议尽快迁移到PlusCommand以获得更好的性能和功能。"
return False )
_assign_plugin_attrs( # 使用适配器将其转换为PlusCommand
command_class, command_info.plugin_name, self.get_plugin_config(command_info.plugin_name) or {} adapted_class = create_legacy_command_adapter(command_class)
) plus_command_info = adapted_class.get_plus_command_info()
self._command_registry[command_name] = command_class plus_command_info.plugin_name = command_info.plugin_name # 继承插件名
if command_info.enabled and command_info.command_pattern:
pattern = re.compile(command_info.command_pattern, re.IGNORECASE | re.DOTALL) return self._register_plus_command_component(plus_command_info, adapted_class)
if pattern not in self._command_patterns:
self._command_patterns[pattern] = command_name
else:
logger.warning(
f"'{command_name}' 对应的命令模式与 '{self._command_patterns[pattern]}' 重复,忽略此命令"
)
return True
def _register_plus_command_component( def _register_plus_command_component(
self, plus_command_info: PlusCommandInfo, plus_command_class: type[PlusCommand] self, plus_command_info: PlusCommandInfo, plus_command_class: type[PlusCommand]