feat(plugin_system): 引入组件局部状态管理并重构插件API

引入了基于 `stream_id` 的组件局部状态管理机制。这允许在不修改全局配置的情况下,为特定会话临时启用或禁用组件,提供了更高的灵活性。

全面重构了 `plugin_manage_api`,提供了更强大和稳定的插件管理功能:
- 新增 `reload_all_plugins` 和 `get_system_report` API,方便进行批量重载和系统状态诊断。
- 增强了组件卸载逻辑,确保在插件移除时能更彻底地清理资源,特别是对 `EventHandler` 的订阅。
- 重写了内置的 `/system plugin` 命令,以利用新的API,并为相关操作添加了权限控制。

组件注册中心(ComponentRegistry)中的多个 `get_enabled_*` 方法现在可以接受 `stream_id`,以正确反映局部状态。

BREAKING CHANGE: `plugin_manage_api` 中的多个函数已被移除或替换。例如 `list_loaded_plugins` 和 `remove_plugin` 已被移除,加载插件的逻辑已整合到 `register_plugin_from_file` 中。内置的 `/system plugin` 命令的子命令也已更改。
This commit is contained in:
minecraft1024a
2025-11-21 21:05:02 +08:00
parent ccfe17c986
commit a0618fb3c4
5 changed files with 390 additions and 143 deletions

View File

@@ -296,7 +296,7 @@ class ChatBot:
response_data = None
if request_id and response_data:
logger.info(f"[DEBUG bot.py] 收到适配器响应request_id={request_id}")
logger.debug(f"[DEBUG bot.py] 收到适配器响应request_id={request_id}")
put_adapter_response(request_id, response_data)
else:
logger.warning(f"适配器响应消息格式不正确: request_id={request_id}, response_data={response_data}")

View File

@@ -1,117 +1,257 @@
def list_loaded_plugins() -> list[str]:
# -*- coding: utf-8 -*-
import os
from typing import Any
from src.common.logger import get_logger
from src.plugin_system.base.component_types import ComponentType
from src.plugin_system.core.component_registry import ComponentInfo, component_registry
from src.plugin_system.core.plugin_manager import plugin_manager
logger = get_logger("plugin_manage_api")
async def reload_all_plugins() -> bool:
"""
列出所有当前加载的插件。
重新加载所有当前已成功加载的插件。
此操作会先卸载所有插件,然后重新加载它们。
Returns:
List[str]: 当前加载的插件名称列表
bool: 如果所有插件都成功重载,则为 True否则为 False
"""
from src.plugin_system.core.plugin_manager import plugin_manager
logger.info("开始重新加载所有插件...")
# 使用 list() 复制一份列表,防止在迭代时修改原始列表
loaded_plugins = list(plugin_manager.list_loaded_plugins())
all_success = True
return plugin_manager.list_loaded_plugins()
for plugin_name in loaded_plugins:
try:
success = await reload_plugin(plugin_name)
if not success:
all_success = False
logger.error(f"重载插件 {plugin_name} 失败。")
except Exception as e:
all_success = False
logger.error(f"重载插件 {plugin_name} 时发生异常: {e}", exc_info=True)
logger.info("所有插件重载完毕。")
return all_success
def list_registered_plugins() -> list[str]:
async def reload_plugin(name: str) -> bool:
"""
列出所有已注册的插件。
Returns:
List[str]: 已注册的插件名称列表。
"""
from src.plugin_system.core.plugin_manager import plugin_manager
return plugin_manager.list_registered_plugins()
def get_plugin_path(plugin_name: str) -> str:
"""
获取指定插件的路径。
重新加载指定的单个插件。
Args:
plugin_name (str): 插件名称。
name (str): 要重载的插件名称。
Returns:
str: 插件目录的绝对路径
bool: 成功则为 True
Raises:
ValueError: 如果插件不存在
ValueError: 如果插件未找到
"""
from src.plugin_system.core.plugin_manager import plugin_manager
if plugin_path := plugin_manager.get_plugin_path(plugin_name):
return plugin_path
else:
raise ValueError(f"插件 '{plugin_name}' 不存在。")
if name not in plugin_manager.list_registered_plugins():
raise ValueError(f"插件 '{name}' 未注册。")
return await plugin_manager.reload_registered_plugin(name)
async def remove_plugin(plugin_name: str) -> bool:
async def set_component_enabled(name: str, component_type: ComponentType, enabled: bool) -> bool:
"""
卸载指定的插件。
全局范围内启用或禁用一个组件。
**此函数是异步的,确保在异步环境中调用。**
此更改会更新组件注册表中的状态,但不会持久化到文件。
Args:
plugin_name (str): 要卸载的插件名称。
name (str): 件名称。
component_type (ComponentType): 组件类型。
enabled (bool): True 为启用, False 为禁用。
Returns:
bool: 卸载是否成功
bool: 操作成功则为 True
"""
from src.plugin_system.core.plugin_manager import plugin_manager
# Chatter 唯一性保护
if component_type == ComponentType.CHATTER and not enabled:
enabled_chatters = component_registry.get_enabled_components_by_type(ComponentType.CHATTER)
if len(enabled_chatters) <= 1 and name in enabled_chatters:
logger.warning(f"操作被阻止:不能禁用最后一个启用的 Chatter 组件 ('{name}')。")
return False
return await plugin_manager.remove_registered_plugin(plugin_name)
# 注意:这里我们直接修改 ComponentInfo 中的状态
component_info = component_registry.get_component_info(name, component_type)
if not component_info:
logger.error(f"未找到组件 {name} ({component_type.value}),无法更改其状态。")
return False
component_info.enabled = enabled
logger.info(f"组件 {name} ({component_type.value}) 的全局状态已设置为: {enabled}")
return True
async def reload_plugin(plugin_name: str) -> bool:
def set_component_enabled_local(stream_id: str, name: str, component_type: ComponentType, enabled: bool) -> bool:
"""
重新加载指定的插件。
在一个特定的 stream_id 上下文中临时启用或禁用组件。
**此函数是异步的,确保在异步环境中调用。**
此状态仅存于内存,不影响全局状态。
Args:
plugin_name (str): 要重新加载的插件名称
stream_id (str): 上下文标识符
name (str): 组件名称。
component_type (ComponentType): 组件类型。
enabled (bool): True 为启用, False 为禁用。
Returns:
bool: 重新加载是否成功
bool: 操作成功则为 True
"""
from src.plugin_system.core.plugin_manager import plugin_manager
return await plugin_manager.reload_registered_plugin(plugin_name)
component_registry.set_local_component_state(stream_id, name, component_type, enabled)
return True
def load_plugin(plugin_name: str) -> tuple[bool, int]:
def rescan_and_register_plugins(load_after_register: bool = True) -> tuple[int, int]:
"""
加载指定的插件
重新扫描所有插件目录,发现新插件并注册
Args:
plugin_name (str): 要加载的插件名称
load_after_register (bool): 如果为 True新发现的插件将在注册后立即被加载
Returns:
Tuple[bool, int]: 加载是否成功,成功或失败个数。
Tuple[int, int]: (成功数量, 失败数量)
"""
from src.plugin_system.core.plugin_manager import plugin_manager
success_count, fail_count = plugin_manager.rescan_plugin_directory()
if not load_after_register:
return success_count, fail_count
return plugin_manager.load_registered_plugin_classes(plugin_name)
newly_registered = [
p for p in plugin_manager.list_registered_plugins() if p not in plugin_manager.list_loaded_plugins()
]
loaded_success = 0
for plugin_name in newly_registered:
status, _ = plugin_manager.load_registered_plugin_classes(plugin_name)
if status:
loaded_success += 1
return loaded_success, fail_count + (len(newly_registered) - loaded_success)
def add_plugin_directory(plugin_directory: str) -> bool:
def register_plugin_from_file(plugin_name: str, load_after_register: bool = True) -> bool:
"""
添加插件目录。
从默认插件目录中查找、注册并加载一个插件
Args:
plugin_directory (str): 要添加的插件目录路径
plugin_name (str): 插件的名称(即其目录名)
load_after_register (bool): 注册后是否立即加载。
Returns:
bool: 添加是否成功。
bool: 成功则为 True
"""
from src.plugin_system.core.plugin_manager import plugin_manager
if plugin_name in plugin_manager.list_loaded_plugins():
logger.warning(f"插件 '{plugin_name}' 已经加载。")
return True
return plugin_manager.add_plugin_directory(plugin_directory)
# 如果插件未注册,则遍历插件目录去查找
if plugin_name not in plugin_manager.list_registered_plugins():
logger.info(f"插件 '{plugin_name}' 未注册,开始在插件目录中搜索...")
found_path = None
for directory in plugin_manager.plugin_directories:
potential_path = os.path.join(directory, plugin_name)
if os.path.isdir(potential_path):
found_path = potential_path
break
if not found_path:
logger.error(f"在所有插件目录中都未找到名为 '{plugin_name}' 的插件。")
return False
plugin_file = os.path.join(found_path, "plugin.py")
if not os.path.exists(plugin_file):
logger.error(f"'{found_path}' 中未找到 plugin.py 文件。")
return False
module = plugin_manager._load_plugin_module_file(plugin_file)
if not module:
logger.error(f"'{plugin_file}' 加载插件模块失败。")
return False
if plugin_name not in plugin_manager.list_registered_plugins():
logger.error(f"插件 '{plugin_name}' 在加载模块后依然未注册成功。")
return False
logger.info(f"插件 '{plugin_name}' 已成功发现并注册。")
if load_after_register:
status, _ = plugin_manager.load_registered_plugin_classes(plugin_name)
return status
return True
def rescan_plugin_directory() -> tuple[int, int]:
def get_component_count(component_type: ComponentType, stream_id: str | None = None) -> int:
"""
重新扫描插件目录,加载新插件
获取指定类型的已加载并启用的组件的总数
可以根据 stream_id 考虑局部状态。
Args:
component_type (ComponentType): 要查询的组件类型。
stream_id (str | None): 可选的上下文ID。
Returns:
Tuple[int, int]: 成功加载的插件数量和失败的插件数量。
int: 该类型组件的数量。
"""
from src.plugin_system.core.plugin_manager import plugin_manager
return len(component_registry.get_enabled_components_by_type(component_type, stream_id=stream_id))
return plugin_manager.rescan_plugin_directory()
def get_component_info(name: str, component_type: ComponentType) -> ComponentInfo | None:
"""
获取任何一个已注册组件的详细信息。
Args:
name (str): 组件的唯一名称。
component_type (ComponentType): 组件的类型。
Returns:
ComponentInfo: 包含组件信息的对象,如果找不到则返回 None。
"""
return component_registry.get_component_info(name, component_type)
def get_system_report() -> dict[str, Any]:
"""
生成一份详细的系统状态报告。
Returns:
dict: 包含系统、插件和组件状态的详细报告。
"""
loaded_plugins_info = {}
for name, instance in plugin_manager.loaded_plugins.items():
plugin_info = component_registry.get_plugin_info(name)
if not plugin_info:
continue
components_details = []
for comp_info in plugin_info.components:
components_details.append(
{
"name": comp_info.name,
"component_type": comp_info.component_type.value,
"description": comp_info.description,
"enabled": comp_info.enabled,
}
)
# 从 plugin_info (PluginInfo) 而不是 instance (PluginBase) 获取元数据
loaded_plugins_info[name] = {
"display_name": plugin_info.display_name or name,
"version": plugin_info.version,
"author": plugin_info.author,
"enabled": instance.enable_plugin, # enable_plugin 状态还是需要从实例获取
"components": components_details,
}
report = {
"system_info": {
"loaded_plugins_count": len(plugin_manager.loaded_plugins),
"total_components_count": component_registry.get_registry_stats().get("total_components", 0),
},
"plugins": loaded_plugins_info,
"failed_plugins": plugin_manager.failed_plugins,
}
return report

View File

@@ -107,6 +107,9 @@ class ComponentRegistry:
"""chatter名 -> chatter类"""
self._enabled_chatter_registry: dict[str, type["BaseChatter"]] = {}
"""启用的chatter名 -> chatter类"""
# 局部组件状态管理器,用于临时覆盖
self._local_component_states: dict[str, dict[tuple[str, ComponentType], bool]] = {}
"""stream_id -> {(component_name, component_type): enabled_status}"""
logger.info("组件注册中心初始化完成")
# == 注册方法 ==
@@ -459,20 +462,10 @@ class ComponentRegistry:
case ComponentType.EVENT_HANDLER:
# 移除EventHandler注册和事件订阅
from .event_manager import event_manager # 延迟导入防止循环导入问题
self._event_handler_registry.pop(component_name, None)
self._enabled_event_handlers.pop(component_name, None)
try:
handler = event_manager.get_event_handler(component_name)
# 事件处理器可能未找到或未声明 subscribed_events需判空
if handler and hasattr(handler, "subscribed_events"):
for event in getattr(handler, "subscribed_events"):
# 假设 unsubscribe_handler_from_event 是协程;若不是则移除 await
result = event_manager.unsubscribe_handler_from_event(event, component_name)
if hasattr(result, "__await__"):
await result # type: ignore[func-returns-value]
logger.debug(f"已移除EventHandler组件: {component_name}")
logger.debug(f"已移除EventHandler组件: {component_name}")
# 从事件管理器中完全移除事件处理器,包括其所有订阅
event_manager.remove_event_handler(component_name)
logger.debug(f"已通过 event_manager 移除EventHandler组件: {component_name}")
except Exception as e:
logger.warning(f"移除EventHandler事件订阅时出错: {e}")
@@ -480,10 +473,32 @@ class ComponentRegistry:
# 移除Chatter注册
if hasattr(self, "_chatter_registry"):
self._chatter_registry.pop(component_name, None)
if hasattr(self, "_enabled_chatter_registry"):
self._enabled_chatter_registry.pop(component_name, None)
logger.debug(f"已移除Chatter组件: {component_name}")
case ComponentType.INTEREST_CALCULATOR:
# 移除InterestCalculator注册
if hasattr(self, "_interest_calculator_registry"):
self._interest_calculator_registry.pop(component_name, None)
if hasattr(self, "_enabled_interest_calculator_registry"):
self._enabled_interest_calculator_registry.pop(component_name, None)
logger.debug(f"已移除InterestCalculator组件: {component_name}")
case ComponentType.PROMPT:
# 移除Prompt注册
if hasattr(self, "_prompt_registry"):
self._prompt_registry.pop(component_name, None)
if hasattr(self, "_enabled_prompt_registry"):
self._enabled_prompt_registry.pop(component_name, None)
logger.debug(f"已移除Prompt组件: {component_name}")
case ComponentType.ROUTER:
# Router组件的移除比较复杂目前只记录日志
logger.warning(f"Router组件 '{component_name}' 的HTTP端点无法在运行时动态移除将在下次重启后生效。")
case _:
logger.warning(f"未知的组件类型: {component_type}")
logger.warning(f"未知的组件类型: {component_type},无法进行特定的清理操作")
return False
# 移除通用注册信息
@@ -724,10 +739,16 @@ class ComponentRegistry:
"""获取指定类型的所有组件"""
return self._components_by_type.get(component_type, {}).copy()
def get_enabled_components_by_type(self, component_type: ComponentType) -> dict[str, ComponentInfo]:
"""获取指定类型的所有启用组件"""
def get_enabled_components_by_type(
self, component_type: ComponentType, stream_id: str | None = None
) -> dict[str, ComponentInfo]:
"""获取指定类型的所有启用组件, 可选地根据 stream_id 考虑局部状态"""
components = self.get_components_by_type(component_type)
return {name: info for name, info in components.items() if info.enabled}
return {
name: info
for name, info in components.items()
if self.is_component_available(name, component_type, stream_id)
}
# === Action特定查询方法 ===
@@ -740,9 +761,15 @@ class ComponentRegistry:
info = self.get_component_info(action_name, ComponentType.ACTION)
return info if isinstance(info, ActionInfo) else None
def get_default_actions(self) -> dict[str, ActionInfo]:
"""获取默认动作集"""
return self._default_actions.copy()
def get_default_actions(self, stream_id: str | None = None) -> dict[str, ActionInfo]:
"""获取默认(可用)动作集, 可选地根据 stream_id 考虑局部状态"""
all_actions = self.get_components_by_type(ComponentType.ACTION)
available_actions = {
name: info
for name, info in all_actions.items()
if self.is_component_available(name, ComponentType.ACTION, stream_id)
}
return cast(dict[str, ActionInfo], available_actions)
# === Command特定查询方法 ===
@@ -790,9 +817,14 @@ class ComponentRegistry:
"""获取Tool注册表"""
return self._tool_registry.copy()
def get_llm_available_tools(self) -> dict[str, type[BaseTool]]:
"""获取LLM可用的Tool列表"""
return self._llm_available_tools.copy()
def get_llm_available_tools(self, stream_id: str | None = None) -> dict[str, type[BaseTool]]:
"""获取LLM可用的Tool列表, 可选地根据 stream_id 考虑局部状态"""
all_tools = self.get_tool_registry()
available_tools = {}
for name, tool_class in all_tools.items():
if self.is_component_available(name, ComponentType.TOOL, stream_id):
available_tools[name] = tool_class
return available_tools
def get_registered_tool_info(self, tool_name: str) -> ToolInfo | None:
"""获取Tool信息
@@ -836,9 +868,14 @@ class ComponentRegistry:
info = self.get_component_info(handler_name, ComponentType.EVENT_HANDLER)
return info if isinstance(info, EventHandlerInfo) else None
def get_enabled_event_handlers(self) -> dict[str, type[BaseEventHandler]]:
"""获取启用的事件处理器"""
return self._enabled_event_handlers.copy()
def get_enabled_event_handlers(self, stream_id: str | None = None) -> dict[str, type[BaseEventHandler]]:
"""获取启用的事件处理器, 可选地根据 stream_id 考虑局部状态"""
all_handlers = self.get_event_handler_registry()
available_handlers = {}
for name, handler_class in all_handlers.items():
if self.is_component_available(name, ComponentType.EVENT_HANDLER, stream_id):
available_handlers[name] = handler_class
return available_handlers
# === Chatter 特定查询方法 ===
def get_chatter_registry(self) -> dict[str, type[BaseChatter]]:
@@ -847,11 +884,14 @@ class ComponentRegistry:
self._chatter_registry: dict[str, type[BaseChatter]] = {}
return self._chatter_registry.copy()
def get_enabled_chatter_registry(self) -> dict[str, type[BaseChatter]]:
"""获取启用的Chatter注册表"""
if not hasattr(self, "_enabled_chatter_registry"):
self._enabled_chatter_registry: dict[str, type[BaseChatter]] = {}
return self._enabled_chatter_registry.copy()
def get_enabled_chatter_registry(self, stream_id: str | None = None) -> dict[str, type[BaseChatter]]:
"""获取启用的Chatter注册表, 可选地根据 stream_id 考虑局部状态"""
all_chatters = self.get_chatter_registry()
available_chatters = {}
for name, chatter_class in all_chatters.items():
if self.is_component_available(name, ComponentType.CHATTER, stream_id):
available_chatters[name] = chatter_class
return available_chatters
def get_registered_chatter_info(self, chatter_name: str) -> ChatterInfo | None:
"""获取Chatter信息"""
@@ -875,8 +915,12 @@ class ComponentRegistry:
def get_plugin_components(self, plugin_name: str) -> list["ComponentInfo"]:
"""获取插件的所有组件"""
plugin_info = self.get_plugin_info(plugin_name)
logger.info(plugin_info.components)
return plugin_info.components if plugin_info else []
if plugin_info:
# 记录日志时,将组件列表转换为可读的字符串,避免类型错误
component_names = [c.name for c in plugin_info.components]
logger.debug(f"获取到插件 '{plugin_name}' 的组件: {component_names}")
return plugin_info.components
return []
def get_plugin_config(self, plugin_name: str) -> dict:
"""获取插件配置
@@ -952,9 +996,40 @@ class ComponentRegistry:
component_type.value: len(components) for component_type, components in self._components_by_type.items()
},
"enabled_components": len([c for c in self._components.values() if c.enabled]),
"enabled_plugins": len([p for p in self._plugins.values() if p.enabled]),
}
# === 局部状态管理 ===
def set_local_component_state(
self, stream_id: str, component_name: str, component_type: ComponentType, enabled: bool
) -> bool:
"""为指定的 stream_id 设置组件的局部(临时)状态"""
if stream_id not in self._local_component_states:
self._local_component_states[stream_id] = {}
state_key = (component_name, component_type)
self._local_component_states[stream_id][state_key] = enabled
logger.debug(f"已为 stream '{stream_id}' 设置局部状态: {component_name} ({component_type}) -> {'启用' if enabled else '禁用'}")
return True
def is_component_available(self, component_name: str, component_type: ComponentType, stream_id: str | None = None) -> bool:
"""检查组件在给定上下文中是否可用(同时考虑全局和局部状态)"""
component_info = self.get_component_info(component_name, component_type)
# 1. 检查组件是否存在
if not component_info:
return False
# 2. 如果提供了 stream_id检查局部状态
if stream_id and stream_id in self._local_component_states:
state_key = (component_name, component_type)
local_state = self._local_component_states[stream_id].get(state_key)
if local_state is not None:
return local_state # 局部状态存在,覆盖全局状态
# 3. 如果没有局部状态覆盖,则返回全局状态
return component_info.enabled
# === MCP 工具相关方法 ===
async def load_mcp_tools(self) -> None:

View File

@@ -220,6 +220,36 @@ class EventManager:
"""
return self._event_handlers.copy()
def remove_event_handler(self, handler_name: str) -> bool:
"""
完全移除一个事件处理器,包括其所有订阅。
Args:
handler_name (str): 要移除的事件处理器的名称。
Returns:
bool: 如果成功移除则返回 True否则返回 False。
"""
if handler_name not in self._event_handlers:
logger.warning(f"事件处理器 {handler_name} 未注册,无需移除。")
return False
# 从主注册表中删除
del self._event_handlers[handler_name]
logger.debug(f"事件处理器 {handler_name} 已从主注册表移除。")
# 遍历所有事件,取消其订阅
for event in self._events.values():
# 创建订阅者列表的副本进行迭代,以安全地修改原始列表
for subscriber in list(event.subscribers):
if getattr(subscriber, 'handler_name', None) == handler_name:
event.subscribers.remove(subscriber)
logger.debug(f"事件处理器 {handler_name} 已从事件 {event.name} 取消订阅。")
logger.info(f"事件处理器 {handler_name} 已被完全移除。")
return True
def subscribe_handler_to_event(self, handler_name: str, event_name: EventType | str) -> bool:
"""订阅事件处理器到指定事件

View File

@@ -96,16 +96,13 @@ class SystemCommand(PlusCommand):
help_text = """🔌 插件管理命令帮助
📋 基本操作:
• `/system plugin help` - 显示插件管理帮助
• `/system plugin list` - 列出所有注册的插件
• `/system plugin list_enabled` - 列出所有加载(启用)的插件
• `/system plugin report` - 查看系统插件报告
• `/system plugin rescan` - 重新扫描所有插件目录
⚙️ 插件控制:
• `/system plugin load <插件名>` - 加载指定插件
• `/system plugin unload <插件名>` - 卸载指定插件
• `/system plugin reload <插件名>` - 重新加载指定插件
• `/system plugin force_reload <插件名>` - 强制重载指定插件
• `/system plugin add_dir <目录路径>` - 添加插件目录
• `/system plugin reload_all` - 重新加载所有插件
"""
elif target == "permission":
help_text = """📋 权限管理命令帮助
@@ -150,20 +147,16 @@ class SystemCommand(PlusCommand):
if action in ["help", "帮助"]:
await self._show_help("plugin")
elif action in ["list", "列表"]:
await self._list_registered_plugins()
elif action in ["list_enabled", "已启用"]:
await self._list_loaded_plugins()
elif action in ["report", "报告"]:
await self._show_system_report()
elif action in ["rescan", "重扫"]:
await self._rescan_plugin_dirs()
elif action in ["load", "加载"] and len(remaining_args) > 0:
await self._load_plugin(remaining_args[0])
elif action in ["unload", "卸载"] and len(remaining_args) > 0:
await self._unload_plugin(remaining_args[0])
elif action in ["reload", "重载"] and len(remaining_args) > 0:
await self._reload_plugin(remaining_args[0])
elif action in ["force_reload", "强制重载"] and len(remaining_args) > 0:
await self._force_reload_plugin(remaining_args[0])
elif action in ["reload_all", "重载全部"]:
await self._reload_all_plugins()
else:
await self.send_text("❌ 插件管理命令不合法\n使用 /system plugin help 查看帮助")
@@ -429,60 +422,69 @@ class SystemCommand(PlusCommand):
# Permission Management Section
# =================================================================
async def _list_loaded_plugins(self):
"""列出已加载的插件"""
plugins = plugin_manage_api.list_loaded_plugins()
await self.send_text(f"📦 已加载的插件: {', '.join(plugins) if plugins else ''}")
@require_permission("plugin.manage", deny_message="❌ 你没有权限查看插件报告")
async def _show_system_report(self):
"""显示系统插件报告"""
report = plugin_manage_api.get_system_report()
async def _list_registered_plugins(self):
"""列出已注册的插件"""
plugins = plugin_manage_api.list_registered_plugins()
await self.send_text(f"📋 已注册的插件: {', '.join(plugins) if plugins else ''}")
response_parts = [
"📊 **系统插件报告**",
f" - 已加载插件: {report['system_info']['loaded_plugins_count']}",
f" - 组件总数: {report['system_info']['total_components_count']}",
]
if report["plugins"]:
response_parts.append("\n✅ **已加载插件:**")
for name, info in report["plugins"].items():
response_parts.append(f" • **{info['display_name']} (`{name}`)** v{info['version']} by {info['author']}")
if report["failed_plugins"]:
response_parts.append("\n❌ **加载失败的插件:**")
for name, error in report["failed_plugins"].items():
response_parts.append(f" • **`{name}`**: {error}")
await self._send_long_message("\n".join(response_parts))
@require_permission("plugin.manage", deny_message="❌ 你没有权限扫描插件")
async def _rescan_plugin_dirs(self):
"""重新扫描插件目录"""
plugin_manage_api.rescan_plugin_directory()
await self.send_text("🔄 插件目录重新扫描已启动")
await self.send_text("🔄 正在重新扫描插件目录...")
success, fail = plugin_manage_api.rescan_and_register_plugins(load_after_register=True)
await self.send_text(f"✅ 扫描完成!\n新增成功: {success}个, 新增失败: {fail}个。")
@require_permission("plugin.manage", deny_message="❌ 你没有权限加载插件")
async def _load_plugin(self, plugin_name: str):
"""加载指定插件"""
success, count = plugin_manage_api.load_plugin(plugin_name)
success = plugin_manage_api.register_plugin_from_file(plugin_name, load_after_register=True)
if success:
await self.send_text(f"✅ 插件加载成功: `{plugin_name}`")
else:
if count == 0:
await self.send_text(f"⚠️ 插件 `{plugin_name}` 为禁用状态")
else:
await self.send_text(f"❌ 插件加载失败: `{plugin_name}`")
await self.send_text(f"❌ 插件加载失败: `{plugin_name}`。请检查日志获取详细信息。")
async def _unload_plugin(self, plugin_name: str):
"""卸载指定插件"""
success = await plugin_manage_api.remove_plugin(plugin_name)
if success:
await self.send_text(f"✅ 插件卸载成功: `{plugin_name}`")
else:
await self.send_text(f"❌ 插件卸载失败: `{plugin_name}`")
@require_permission("plugin.manage", deny_message="❌ 你没有权限重载插件")
async def _reload_plugin(self, plugin_name: str):
"""重新加载指定插件"""
try:
success = await plugin_manage_api.reload_plugin(plugin_name)
if success:
await self.send_text(f"✅ 插件重新加载成功: `{plugin_name}`")
else:
await self.send_text(f"❌ 插件重新加载失败: `{plugin_name}`")
except ValueError as e:
await self.send_text(f"❌ 操作失败: {e}")
async def _force_reload_plugin(self, plugin_name: str):
"""强制重载指定插件(深度清理)"""
await self.send_text(f"🔄 开始强制重载插件: `{plugin_name}`... (注意: 实际执行reload)")
try:
success = await plugin_manage_api.reload_plugin(plugin_name)
@require_permission("plugin.manage", deny_message="❌ 你没有权限重载所有插件")
async def _reload_all_plugins(self):
"""重新加载所有插件"""
await self.send_text("🔄 正在重新加载所有插件...")
success = await plugin_manage_api.reload_all_plugins()
if success:
await self.send_text(f"插件重载成功: `{plugin_name}`")
await self.send_text("所有插件已成功重载。")
else:
await self.send_text(f"插件重载失败: `{plugin_name}`")
except Exception as e:
await self.send_text(f"❌ 重载过程中发生错误: {e!s}")
await self.send_text("⚠️ 部分插件重载失败,请检查日志。")
# =================================================================