This commit is contained in:
SengokuCola
2025-07-23 02:26:57 +08:00
20 changed files with 515 additions and 411 deletions

View File

@@ -28,6 +28,7 @@ from .core import (
component_registry,
dependency_manager,
events_manager,
global_announcement_manager,
)
# 导入工具模块
@@ -67,6 +68,7 @@ __all__ = [
"component_registry",
"dependency_manager",
"events_manager",
"global_announcement_manager",
# 装饰器
"register_plugin",
"ConfigField",

View File

@@ -28,7 +28,6 @@ def register_plugin(cls):
if "." in plugin_name:
logger.error(f"插件名称 '{plugin_name}' 包含非法字符 '.',请使用下划线替代")
raise ValueError(f"插件名称 '{plugin_name}' 包含非法字符 '.',请使用下划线替代")
plugin_manager.plugin_classes[plugin_name] = cls
splitted_name = cls.__module__.split(".")
root_path = Path(__file__)
@@ -40,6 +39,7 @@ def register_plugin(cls):
logger.error(f"注册 {plugin_name} 无法找到项目根目录")
return cls
plugin_manager.plugin_classes[plugin_name] = cls
plugin_manager.plugin_paths[plugin_name] = str(Path(root_path, *splitted_name).resolve())
logger.debug(f"插件类已注册: {plugin_name}, 路径: {plugin_manager.plugin_paths[plugin_name]}")

View File

@@ -65,21 +65,28 @@ class BaseAction(ABC):
self.thinking_id = thinking_id
self.log_prefix = log_prefix
# 保存插件配置
self.plugin_config = plugin_config or {}
"""对应的插件配置"""
# 设置动作基本信息实例属性
self.action_name: str = getattr(self, "action_name", self.__class__.__name__.lower().replace("action", ""))
"""Action的名字"""
self.action_description: str = getattr(self, "action_description", self.__doc__ or "Action组件")
"""Action的描述"""
self.action_parameters: dict = getattr(self.__class__, "action_parameters", {}).copy()
self.action_require: list[str] = getattr(self.__class__, "action_require", []).copy()
# 设置激活类型实例属性(从类属性复制,提供默认值)
self.focus_activation_type = getattr(self.__class__, "focus_activation_type", ActionActivationType.ALWAYS)
"""FOCUS模式下的激活类型"""
self.normal_activation_type = getattr(self.__class__, "normal_activation_type", ActionActivationType.ALWAYS)
"""NORMAL模式下的激活类型"""
self.random_activation_probability: float = getattr(self.__class__, "random_activation_probability", 0.0)
"""当激活类型为RANDOM时的概率"""
self.llm_judge_prompt: str = getattr(self.__class__, "llm_judge_prompt", "")
"""协助LLM进行判断的Prompt"""
self.activation_keywords: list[str] = getattr(self.__class__, "activation_keywords", []).copy()
"""激活类型为KEYWORD时的KEYWORDS列表"""
self.keyword_case_sensitive: bool = getattr(self.__class__, "keyword_case_sensitive", False)
self.mode_enable: ChatMode = getattr(self.__class__, "mode_enable", ChatMode.ALL)
self.parallel_action: bool = getattr(self.__class__, "parallel_action", True)

View File

@@ -21,13 +21,18 @@ class BaseCommand(ABC):
"""
command_name: str = ""
"""Command组件的名称"""
command_description: str = ""
"""Command组件的描述"""
# 默认命令设置(子类可以覆盖)
command_pattern: str = ""
"""命令匹配的正则表达式"""
command_help: str = ""
"""命令帮助信息"""
command_examples: List[str] = []
intercept_message: bool = True # 默认拦截消息,不继续处理
intercept_message: bool = True
"""是否拦截信息,默认拦截,不进行后续处理"""
def __init__(self, message: MessageRecv, plugin_config: Optional[dict] = None):
"""初始化Command组件

View File

@@ -13,16 +13,23 @@ class BaseEventHandler(ABC):
所有事件处理器都应该继承这个基类,提供事件处理的基本接口
"""
event_type: EventType = EventType.UNKNOWN # 事件类型,默认为未知
handler_name: str = "" # 处理器名称
event_type: EventType = EventType.UNKNOWN
"""事件类型,默认为未知"""
handler_name: str = ""
"""处理器名称"""
handler_description: str = ""
weight: int = 0 # 权重,数值越大优先级越高
intercept_message: bool = False # 是否拦截消息,默认为否
"""处理器描述"""
weight: int = 0
"""处理器权重,越大权重越高"""
intercept_message: bool = False
"""是否拦截消息,默认为否"""
def __init__(self):
self.log_prefix = "[EventHandler]"
self.plugin_name = "" # 对应插件名
self.plugin_config: Optional[Dict] = None # 插件配置字典
self.plugin_name = ""
"""对应插件名"""
self.plugin_config: Optional[Dict] = None
"""插件配置字典"""
if self.event_type == EventType.UNKNOWN:
raise NotImplementedError("事件处理器必须指定 event_type")

View File

@@ -8,10 +8,12 @@ from src.plugin_system.core.plugin_manager import plugin_manager
from src.plugin_system.core.component_registry import component_registry
from src.plugin_system.core.dependency_manager import dependency_manager
from src.plugin_system.core.events_manager import events_manager
from src.plugin_system.core.global_announcement_manager import global_announcement_manager
__all__ = [
"plugin_manager",
"component_registry",
"dependency_manager",
"events_manager",
"global_announcement_manager",
]

View File

@@ -27,7 +27,7 @@ class ComponentRegistry:
def __init__(self):
# 组件注册表
self._components: Dict[str, ComponentInfo] = {} # 命名空间式组件名 -> 组件信息
# 类型 -> 命名空间式名称 -> 组件信息
# 类型 -> 组件原名称 -> 组件信息
self._components_by_type: Dict[ComponentType, Dict[str, ComponentInfo]] = {types: {} for types in ComponentType}
# 命名空间式组件名 -> 组件类
self._components_classes: Dict[str, Type[Union[BaseCommand, BaseAction, BaseEventHandler]]] = {}
@@ -110,11 +110,17 @@ class ComponentRegistry:
# 根据组件类型进行特定注册(使用原始名称)
match component_type:
case ComponentType.ACTION:
ret = self._register_action_component(component_info, component_class) # type: ignore
assert isinstance(component_info, ActionInfo)
assert issubclass(component_class, BaseAction)
ret = self._register_action_component(component_info, component_class)
case ComponentType.COMMAND:
ret = self._register_command_component(component_info, component_class) # type: ignore
assert isinstance(component_info, CommandInfo)
assert issubclass(component_class, BaseCommand)
ret = self._register_command_component(component_info, component_class)
case ComponentType.EVENT_HANDLER:
ret = self._register_event_handler_component(component_info, component_class) # type: ignore
assert isinstance(component_info, EventHandlerInfo)
assert issubclass(component_class, BaseEventHandler)
ret = self._register_event_handler_component(component_info, component_class)
case _:
logger.warning(f"未知组件类型: {component_type}")
@@ -160,7 +166,9 @@ class ComponentRegistry:
if pattern not in self._command_patterns:
self._command_patterns[pattern] = command_name
else:
logger.warning(f"'{command_name}' 对应的命令模式与 '{self._command_patterns[pattern]}' 重复,忽略此命令")
logger.warning(
f"'{command_name}' 对应的命令模式与 '{self._command_patterns[pattern]}' 重复,忽略此命令"
)
return True
@@ -176,6 +184,10 @@ class ComponentRegistry:
self._event_handler_registry[handler_name] = handler_class
if not handler_info.enabled:
logger.warning(f"EventHandler组件 {handler_name} 未启用")
return True # 未启用,但是也是注册成功
from .events_manager import events_manager # 延迟导入防止循环导入问题
if events_manager.register_event_subscriber(handler_info, handler_class):
@@ -185,6 +197,98 @@ class ComponentRegistry:
logger.error(f"注册事件处理器 {handler_name} 失败")
return False
# === 组件移除相关 ===
async def remove_component(self, component_name: str, component_type: ComponentType):
target_component_class = self.get_component_class(component_name, component_type)
if not target_component_class:
logger.warning(f"组件 {component_name} 未注册,无法移除")
return
match component_type:
case ComponentType.ACTION:
self._action_registry.pop(component_name, None)
self._default_actions.pop(component_name, None)
case ComponentType.COMMAND:
self._command_registry.pop(component_name, None)
keys_to_remove = [k for k, v in self._command_patterns.items() if v == component_name]
for key in keys_to_remove:
self._command_patterns.pop(key, None)
case ComponentType.EVENT_HANDLER:
from .events_manager import events_manager # 延迟导入防止循环导入问题
self._event_handler_registry.pop(component_name, None)
self._enabled_event_handlers.pop(component_name, None)
await events_manager.unregister_event_subscriber(component_name)
self._components.pop(component_name, None)
self._components_by_type[component_type].pop(component_name, None)
self._components_classes.pop(component_name, None)
logger.info(f"组件 {component_name} 已移除")
# === 组件全局启用/禁用方法 ===
def enable_component(self, component_name: str, component_type: ComponentType) -> bool:
"""全局的启用某个组件
Parameters:
component_name: 组件名称
component_type: 组件类型
Returns:
bool: 启用成功返回True失败返回False
"""
target_component_class = self.get_component_class(component_name, component_type)
target_component_info = self.get_component_info(component_name, component_type)
if not target_component_class or not target_component_info:
logger.warning(f"组件 {component_name} 未注册,无法启用")
return False
target_component_info.enabled = True
match component_type:
case ComponentType.ACTION:
assert isinstance(target_component_info, ActionInfo)
self._default_actions[component_name] = target_component_info
case ComponentType.COMMAND:
assert isinstance(target_component_info, CommandInfo)
pattern = target_component_info.command_pattern
self._command_patterns[re.compile(pattern)] = component_name
case ComponentType.EVENT_HANDLER:
assert isinstance(target_component_info, EventHandlerInfo)
assert issubclass(target_component_class, BaseEventHandler)
self._enabled_event_handlers[component_name] = target_component_class
from .events_manager import events_manager # 延迟导入防止循环导入问题
events_manager.register_event_subscriber(target_component_info, target_component_class)
self._components[component_name].enabled = True
self._components_by_type[component_type][component_name].enabled = True
logger.info(f"组件 {component_name} 已启用")
return True
async def disable_component(self, component_name: str, component_type: ComponentType) -> bool:
"""全局的禁用某个组件
Parameters:
component_name: 组件名称
component_type: 组件类型
Returns:
bool: 禁用成功返回True失败返回False
"""
target_component_class = self.get_component_class(component_name, component_type)
target_component_info = self.get_component_info(component_name, component_type)
if not target_component_class or not target_component_info:
logger.warning(f"组件 {component_name} 未注册,无法禁用")
return False
target_component_info.enabled = False
match component_type:
case ComponentType.ACTION:
self._default_actions.pop(component_name, None)
case ComponentType.COMMAND:
self._command_patterns = {k: v for k, v in self._command_patterns.items() if v != component_name}
case ComponentType.EVENT_HANDLER:
self._enabled_event_handlers.pop(component_name, None)
from .events_manager import events_manager # 延迟导入防止循环导入问题
await events_manager.unregister_event_subscriber(component_name)
self._components[component_name].enabled = False
self._components_by_type[component_type][component_name].enabled = False
logger.info(f"组件 {component_name} 已禁用")
return True
# === 组件查询方法 ===
def get_component_info(
self, component_name: str, component_type: Optional[ComponentType] = None
@@ -287,7 +391,7 @@ class ComponentRegistry:
# === Action特定查询方法 ===
def get_action_registry(self) -> Dict[str, Type[BaseAction]]:
"""获取Action注册表(用于兼容现有系统)"""
"""获取Action注册表"""
return self._action_registry.copy()
def get_registered_action_info(self, action_name: str) -> Optional[ActionInfo]:
@@ -314,7 +418,7 @@ class ComponentRegistry:
"""获取Command模式注册表"""
return self._command_patterns.copy()
def find_command_by_text(self, text: str) -> Optional[Tuple[Type[BaseCommand], dict, bool, str]]:
def find_command_by_text(self, text: str) -> Optional[Tuple[Type[BaseCommand], dict, CommandInfo]]:
# sourcery skip: use-named-expression, use-next
"""根据文本查找匹配的命令
@@ -335,8 +439,7 @@ class ComponentRegistry:
return (
self._command_registry[command_name],
candidates[0].match(text).groupdict(), # type: ignore
command_info.intercept_message,
command_info.plugin_name,
command_info,
)
# === 事件处理器特定查询方法 ===

View File

@@ -6,6 +6,7 @@ from src.chat.message_receive.message import MessageRecv
from src.common.logger import get_logger
from src.plugin_system.base.component_types import EventType, EventHandlerInfo, MaiMessages
from src.plugin_system.base.base_events_handler import BaseEventHandler
from .global_announcement_manager import global_announcement_manager
logger = get_logger("events_manager")
@@ -28,18 +29,16 @@ class EventsManager:
bool: 是否注册成功
"""
handler_name = handler_info.name
plugin_name = getattr(handler_info, "plugin_name", "unknown")
namespace_name = f"{plugin_name}.{handler_name}"
if namespace_name in self._handler_mapping:
logger.warning(f"事件处理器 {namespace_name} 已存在,跳过注册")
if handler_name in self._handler_mapping:
logger.warning(f"事件处理器 {handler_name} 已存在,跳过注册")
return False
if not issubclass(handler_class, BaseEventHandler):
logger.error(f"{handler_class.__name__} 不是 BaseEventHandler 的子类")
return False
self._handler_mapping[namespace_name] = handler_class
self._handler_mapping[handler_name] = handler_class
return self._insert_event_handler(handler_class, handler_info)
async def handle_mai_events(
@@ -55,6 +54,10 @@ class EventsManager:
continue_flag = True
transformed_message = self._transform_event_message(message, llm_prompt, llm_response)
for handler in self._events_subscribers.get(event_type, []):
if message.chat_stream and message.chat_stream.stream_id:
stream_id = message.chat_stream.stream_id
if handler.handler_name in global_announcement_manager.get_disabled_chat_event_handlers(stream_id):
continue
handler.set_plugin_config(component_registry.get_plugin_config(handler.plugin_name) or {})
if handler.intercept_message:
try:
@@ -71,7 +74,7 @@ class EventsManager:
try:
handler_task = asyncio.create_task(handler.execute(transformed_message))
handler_task.add_done_callback(self._task_done_callback)
handler_task.set_name(f"EventHandler-{handler.handler_name}-{event_type.name}")
handler_task.set_name(f"{handler.plugin_name}-{handler.handler_name}")
self._handler_tasks[handler.handler_name].append(handler_task)
except Exception as e:
logger.error(f"创建事件处理器任务 {handler.handler_name} 时发生异常: {e}")
@@ -91,7 +94,7 @@ class EventsManager:
return True
def _remove_event_handler(self, handler_class: Type[BaseEventHandler]) -> bool:
def _remove_event_handler_instance(self, handler_class: Type[BaseEventHandler]) -> bool:
"""从事件类型列表中移除事件处理器"""
display_handler_name = handler_class.handler_name or handler_class.__name__
if handler_class.event_type == EventType.UNKNOWN:
@@ -190,5 +193,20 @@ class EventsManager:
finally:
del self._handler_tasks[handler_name]
async def unregister_event_subscriber(self, handler_name: str) -> bool:
"""取消注册事件处理器"""
if handler_name not in self._handler_mapping:
logger.warning(f"事件处理器 {handler_name} 不存在,无法取消注册")
return False
await self.cancel_handler_tasks(handler_name)
handler_class = self._handler_mapping.pop(handler_name)
if not self._remove_event_handler_instance(handler_class):
return False
logger.info(f"事件处理器 {handler_name} 已成功取消注册")
return True
events_manager = EventsManager()

View File

@@ -0,0 +1,90 @@
from typing import List, Dict
from src.common.logger import get_logger
logger = get_logger("global_announcement_manager")
class GlobalAnnouncementManager:
def __init__(self) -> None:
# 用户禁用的动作chat_id -> [action_name]
self._user_disabled_actions: Dict[str, List[str]] = {}
# 用户禁用的命令chat_id -> [command_name]
self._user_disabled_commands: Dict[str, List[str]] = {}
# 用户禁用的事件处理器chat_id -> [handler_name]
self._user_disabled_event_handlers: Dict[str, List[str]] = {}
def disable_specific_chat_action(self, chat_id: str, action_name: str) -> bool:
"""禁用特定聊天的某个动作"""
if chat_id not in self._user_disabled_actions:
self._user_disabled_actions[chat_id] = []
if action_name in self._user_disabled_actions[chat_id]:
logger.warning(f"动作 {action_name} 已经被禁用")
return False
self._user_disabled_actions[chat_id].append(action_name)
return True
def enable_specific_chat_action(self, chat_id: str, action_name: str) -> bool:
"""启用特定聊天的某个动作"""
if chat_id in self._user_disabled_actions:
try:
self._user_disabled_actions[chat_id].remove(action_name)
return True
except ValueError:
return False
return False
def disable_specific_chat_command(self, chat_id: str, command_name: str) -> bool:
"""禁用特定聊天的某个命令"""
if chat_id not in self._user_disabled_commands:
self._user_disabled_commands[chat_id] = []
if command_name in self._user_disabled_commands[chat_id]:
logger.warning(f"命令 {command_name} 已经被禁用")
return False
self._user_disabled_commands[chat_id].append(command_name)
return True
def enable_specific_chat_command(self, chat_id: str, command_name: str) -> bool:
"""启用特定聊天的某个命令"""
if chat_id in self._user_disabled_commands:
try:
self._user_disabled_commands[chat_id].remove(command_name)
return True
except ValueError:
return False
return False
def disable_specific_chat_event_handler(self, chat_id: str, handler_name: str) -> bool:
"""禁用特定聊天的某个事件处理器"""
if chat_id not in self._user_disabled_event_handlers:
self._user_disabled_event_handlers[chat_id] = []
if handler_name in self._user_disabled_event_handlers[chat_id]:
logger.warning(f"事件处理器 {handler_name} 已经被禁用")
return False
self._user_disabled_event_handlers[chat_id].append(handler_name)
return True
def enable_specific_chat_event_handler(self, chat_id: str, handler_name: str) -> bool:
"""启用特定聊天的某个事件处理器"""
if chat_id in self._user_disabled_event_handlers:
try:
self._user_disabled_event_handlers[chat_id].remove(handler_name)
return True
except ValueError:
return False
return False
def get_disabled_chat_actions(self, chat_id: str) -> List[str]:
"""获取特定聊天禁用的所有动作"""
return self._user_disabled_actions.get(chat_id, []).copy()
def get_disabled_chat_commands(self, chat_id: str) -> List[str]:
"""获取特定聊天禁用的所有命令"""
return self._user_disabled_commands.get(chat_id, []).copy()
def get_disabled_chat_event_handlers(self, chat_id: str) -> List[str]:
"""获取特定聊天禁用的所有事件处理器"""
return self._user_disabled_event_handlers.get(chat_id, []).copy()
global_announcement_manager = GlobalAnnouncementManager()

View File

@@ -1,5 +1,4 @@
import os
import inspect
import traceback
from typing import Dict, List, Optional, Tuple, Type, Any
@@ -8,11 +7,11 @@ from pathlib import Path
from src.common.logger import get_logger
from src.plugin_system.core.component_registry import component_registry
from src.plugin_system.core.dependency_manager import dependency_manager
from src.plugin_system.base.plugin_base import PluginBase
from src.plugin_system.base.component_types import ComponentType, PluginInfo, PythonDependency
from src.plugin_system.utils.manifest_utils import VersionComparator
from .component_registry import component_registry
from .dependency_manager import dependency_manager
logger = get_logger("plugin_manager")
@@ -36,19 +35,7 @@ class PluginManager:
self._ensure_plugin_directories()
logger.info("插件管理器初始化完成")
def _ensure_plugin_directories(self) -> None:
"""确保所有插件根目录存在,如果不存在则创建"""
default_directories = ["src/plugins/built_in", "plugins"]
for directory in default_directories:
if not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
logger.info(f"创建插件根目录: {directory}")
if directory not in self.plugin_directories:
self.plugin_directories.append(directory)
logger.debug(f"已添加插件根目录: {directory}")
else:
logger.warning(f"根目录不可重复加载: {directory}")
# === 插件目录管理 ===
def add_plugin_directory(self, directory: str) -> bool:
"""添加插件目录"""
@@ -63,6 +50,8 @@ class PluginManager:
logger.warning(f"插件目录不存在: {directory}")
return False
# === 插件加载管理 ===
def load_all_plugins(self) -> Tuple[int, int]:
"""加载所有插件
@@ -86,7 +75,7 @@ class PluginManager:
total_failed_registration = 0
for plugin_name in self.plugin_classes.keys():
load_status, count = self.load_registered_plugin_classes(plugin_name)
load_status, count = self._load_registered_plugin_classes(plugin_name)
if load_status:
total_registered += 1
else:
@@ -96,90 +85,32 @@ class PluginManager:
return total_registered, total_failed_registration
def load_registered_plugin_classes(self, plugin_name: str) -> Tuple[bool, int]:
# sourcery skip: extract-duplicate-method, extract-method
async def remove_registered_plugin(self, plugin_name: str) -> None:
"""
加载已经注册的插件类
禁用插件模块
"""
plugin_class = self.plugin_classes.get(plugin_name)
if not plugin_class:
logger.error(f"插件 {plugin_name} 的插件类未注册或不存在")
return False, 1
try:
# 使用记录的插件目录路径
plugin_dir = self.plugin_paths.get(plugin_name)
if not plugin_name:
raise ValueError("插件名称不能为空")
if plugin_name not in self.loaded_plugins:
logger.warning(f"插件 {plugin_name} 未加载")
return
plugin_instance = self.loaded_plugins[plugin_name]
plugin_info = plugin_instance.plugin_info
for component in plugin_info.components:
await component_registry.remove_component(component.name, component.component_type)
del self.loaded_plugins[plugin_name]
# 如果没有记录,直接返回失败
if not plugin_dir:
return False, 1
plugin_instance = plugin_class(plugin_dir=plugin_dir) # 实例化插件可能因为缺少manifest而失败
if not plugin_instance:
logger.error(f"插件 {plugin_name} 实例化失败")
return False, 1
# 检查插件是否启用
if not plugin_instance.enable_plugin:
logger.info(f"插件 {plugin_name} 已禁用,跳过加载")
return False, 0
# 检查版本兼容性
is_compatible, compatibility_error = self._check_plugin_version_compatibility(
plugin_name, plugin_instance.manifest_data
)
if not is_compatible:
self.failed_plugins[plugin_name] = compatibility_error
logger.error(f"❌ 插件加载失败: {plugin_name} - {compatibility_error}")
return False, 1
if plugin_instance.register_plugin():
self.loaded_plugins[plugin_name] = plugin_instance
self._show_plugin_components(plugin_name)
return True, 1
else:
self.failed_plugins[plugin_name] = "插件注册失败"
logger.error(f"❌ 插件注册失败: {plugin_name}")
return False, 1
except FileNotFoundError as e:
# manifest文件缺失
error_msg = f"缺少manifest文件: {str(e)}"
self.failed_plugins[plugin_name] = error_msg
logger.error(f"❌ 插件加载失败: {plugin_name} - {error_msg}")
return False, 1
except ValueError as e:
# manifest文件格式错误或验证失败
traceback.print_exc()
error_msg = f"manifest验证失败: {str(e)}"
self.failed_plugins[plugin_name] = error_msg
logger.error(f"❌ 插件加载失败: {plugin_name} - {error_msg}")
return False, 1
except Exception as e:
# 其他错误
error_msg = f"未知错误: {str(e)}"
self.failed_plugins[plugin_name] = error_msg
logger.error(f"❌ 插件加载失败: {plugin_name} - {error_msg}")
logger.debug("详细错误信息: ", exc_info=True)
return False, 1
def unload_registered_plugin_module(self, plugin_name: str) -> None:
"""
卸载插件模块
"""
pass
def reload_registered_plugin_module(self, plugin_name: str) -> None:
async def reload_registered_plugin_module(self, plugin_name: str) -> None:
"""
重载插件模块
"""
self.unload_registered_plugin_module(plugin_name)
self.load_registered_plugin_classes(plugin_name)
await self.remove_registered_plugin(plugin_name)
self._load_registered_plugin_classes(plugin_name)
def rescan_plugin_directory(self) -> None:
"""
重新扫描插件根目录
"""
# --------------------------------------- NEED REFACTORING ---------------------------------------
for directory in self.plugin_directories:
if os.path.exists(directory):
logger.debug(f"重新扫描插件根目录: {directory}")
@@ -195,30 +126,6 @@ class PluginManager:
"""获取所有启用的插件信息"""
return list(component_registry.get_enabled_plugins().values())
# def enable_plugin(self, plugin_name: str) -> bool:
# # -------------------------------- NEED REFACTORING --------------------------------
# """启用插件"""
# if plugin_info := component_registry.get_plugin_info(plugin_name):
# plugin_info.enabled = True
# # 启用插件的所有组件
# for component in plugin_info.components:
# component_registry.enable_component(component.name)
# logger.debug(f"已启用插件: {plugin_name}")
# return True
# return False
# def disable_plugin(self, plugin_name: str) -> bool:
# # -------------------------------- NEED REFACTORING --------------------------------
# """禁用插件"""
# if plugin_info := component_registry.get_plugin_info(plugin_name):
# plugin_info.enabled = False
# # 禁用插件的所有组件
# for component in plugin_info.components:
# component_registry.disable_component(component.name)
# logger.debug(f"已禁用插件: {plugin_name}")
# return True
# return False
def get_plugin_instance(self, plugin_name: str) -> Optional["PluginBase"]:
"""获取插件实例
@@ -230,25 +137,6 @@ class PluginManager:
"""
return self.loaded_plugins.get(plugin_name)
def get_plugin_stats(self) -> Dict[str, Any]:
"""获取插件统计信息"""
all_plugins = component_registry.get_all_plugins()
enabled_plugins = component_registry.get_enabled_plugins()
action_components = component_registry.get_components_by_type(ComponentType.ACTION)
command_components = component_registry.get_components_by_type(ComponentType.COMMAND)
return {
"total_plugins": len(all_plugins),
"enabled_plugins": len(enabled_plugins),
"failed_plugins": len(self.failed_plugins),
"total_components": len(action_components) + len(command_components),
"action_components": len(action_components),
"command_components": len(command_components),
"loaded_plugin_files": len(self.loaded_plugins),
"failed_plugin_details": self.failed_plugins.copy(),
}
def check_all_dependencies(self, auto_install: bool = False) -> Dict[str, Any]:
"""检查所有插件的Python依赖包
@@ -347,6 +235,24 @@ class PluginManager:
return dependency_manager.generate_requirements_file(all_dependencies, output_path)
# === 私有方法 ===
# == 目录管理 ==
def _ensure_plugin_directories(self) -> None:
"""确保所有插件根目录存在,如果不存在则创建"""
default_directories = ["src/plugins/built_in", "plugins"]
for directory in default_directories:
if not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
logger.info(f"创建插件根目录: {directory}")
if directory not in self.plugin_directories:
self.plugin_directories.append(directory)
logger.debug(f"已添加插件根目录: {directory}")
else:
logger.warning(f"根目录不可重复加载: {directory}")
# == 插件加载 ==
def _load_plugin_modules_from_directory(self, directory: str) -> tuple[int, int]:
"""从指定目录加载插件模块"""
loaded_count = 0
@@ -372,18 +278,6 @@ class PluginManager:
return loaded_count, failed_count
def _find_plugin_directory(self, plugin_class: Type[PluginBase]) -> Optional[str]:
"""查找插件类对应的目录路径"""
try:
# module = getmodule(plugin_class)
# if module and hasattr(module, "__file__") and module.__file__:
# return os.path.dirname(module.__file__)
file_path = inspect.getfile(plugin_class)
return os.path.dirname(file_path)
except Exception as e:
logger.debug(f"通过inspect获取插件目录失败: {e}")
return None
def _load_plugin_module_file(self, plugin_file: str) -> bool:
# sourcery skip: extract-method
"""加载单个插件模块文件
@@ -416,6 +310,74 @@ class PluginManager:
self.failed_plugins[module_name] = error_msg
return False
def _load_registered_plugin_classes(self, plugin_name: str) -> Tuple[bool, int]:
# sourcery skip: extract-duplicate-method, extract-method
"""
加载已经注册的插件类
"""
plugin_class = self.plugin_classes.get(plugin_name)
if not plugin_class:
logger.error(f"插件 {plugin_name} 的插件类未注册或不存在")
return False, 1
try:
# 使用记录的插件目录路径
plugin_dir = self.plugin_paths.get(plugin_name)
# 如果没有记录,直接返回失败
if not plugin_dir:
return False, 1
plugin_instance = plugin_class(plugin_dir=plugin_dir) # 实例化插件可能因为缺少manifest而失败
if not plugin_instance:
logger.error(f"插件 {plugin_name} 实例化失败")
return False, 1
# 检查插件是否启用
if not plugin_instance.enable_plugin:
logger.info(f"插件 {plugin_name} 已禁用,跳过加载")
return False, 0
# 检查版本兼容性
is_compatible, compatibility_error = self._check_plugin_version_compatibility(
plugin_name, plugin_instance.manifest_data
)
if not is_compatible:
self.failed_plugins[plugin_name] = compatibility_error
logger.error(f"❌ 插件加载失败: {plugin_name} - {compatibility_error}")
return False, 1
if plugin_instance.register_plugin():
self.loaded_plugins[plugin_name] = plugin_instance
self._show_plugin_components(plugin_name)
return True, 1
else:
self.failed_plugins[plugin_name] = "插件注册失败"
logger.error(f"❌ 插件注册失败: {plugin_name}")
return False, 1
except FileNotFoundError as e:
# manifest文件缺失
error_msg = f"缺少manifest文件: {str(e)}"
self.failed_plugins[plugin_name] = error_msg
logger.error(f"❌ 插件加载失败: {plugin_name} - {error_msg}")
return False, 1
except ValueError as e:
# manifest文件格式错误或验证失败
traceback.print_exc()
error_msg = f"manifest验证失败: {str(e)}"
self.failed_plugins[plugin_name] = error_msg
logger.error(f"❌ 插件加载失败: {plugin_name} - {error_msg}")
return False, 1
except Exception as e:
# 其他错误
error_msg = f"未知错误: {str(e)}"
self.failed_plugins[plugin_name] = error_msg
logger.error(f"❌ 插件加载失败: {plugin_name} - {error_msg}")
logger.debug("详细错误信息: ", exc_info=True)
return False, 1
# == 兼容性检查 ==
def _check_plugin_version_compatibility(self, plugin_name: str, manifest_data: Dict[str, Any]) -> Tuple[bool, str]:
"""检查插件版本兼容性
@@ -451,6 +413,8 @@ class PluginManager:
logger.warning(f"插件 {plugin_name} 版本兼容性检查失败: {e}")
return False, f"插件 {plugin_name} 版本兼容性检查失败: {e}" # 检查失败时默认不允许加载
# == 显示统计与插件信息 ==
def _show_stats(self, total_registered: int, total_failed_registration: int):
# sourcery skip: low-code-quality
# 获取组件统计信息
@@ -493,9 +457,15 @@ class PluginManager:
# 组件列表
if plugin_info.components:
action_components = [c for c in plugin_info.components if c.component_type == ComponentType.ACTION]
command_components = [c for c in plugin_info.components if c.component_type == ComponentType.COMMAND]
event_handler_components = [c for c in plugin_info.components if c.component_type == ComponentType.EVENT_HANDLER]
action_components = [
c for c in plugin_info.components if c.component_type == ComponentType.ACTION
]
command_components = [
c for c in plugin_info.components if c.component_type == ComponentType.COMMAND
]
event_handler_components = [
c for c in plugin_info.components if c.component_type == ComponentType.EVENT_HANDLER
]
if action_components:
action_names = [c.name for c in action_components]
@@ -504,7 +474,7 @@ class PluginManager:
if command_components:
command_names = [c.name for c in command_components]
logger.info(f" ⚡ Command组件: {', '.join(command_names)}")
if event_handler_components:
event_handler_names = [c.name for c in event_handler_components]
logger.info(f" 📢 EventHandler组件: {', '.join(event_handler_names)}")