Refactor plugin system and update permission checks

Removed the permission_example plugin and its files. Refactored plugin hot reload and manager logic for more robust reloading, deep reload, and cache clearing, including improved debounce and plugin name resolution. Updated MaiZone plugin and actions to use the new permission API for feed read/send actions, and registered new permission nodes. Enhanced plugin management commands with PlusCommand, improved help, and added hot reload and cache management commands.
This commit is contained in:
雅诺狐
2025-08-29 13:47:54 +08:00
committed by Windpicker-owo
parent 89bb9e5b3e
commit 71c6b0ef1e
9 changed files with 673 additions and 548 deletions

View File

@@ -7,6 +7,7 @@ from typing import Tuple
from src.common.logger import get_logger
from src.plugin_system import BaseAction, ActionActivationType, ChatMode
from src.plugin_system.apis import person_api, generator_api
from src.plugin_system.apis.permission_api import permission_api
from ..services.manager import get_qzone_service, get_config_getter
logger = get_logger("MaiZone.ReadFeedAction")
@@ -32,27 +33,11 @@ class ReadFeedAction(BaseAction):
async def _check_permission(self) -> bool:
"""检查当前用户是否有权限执行此动作"""
user_name = self.action_data.get("user_name", "")
person_id = person_api.get_person_id_by_name(user_name)
if not person_id:
return False
platform = self.chat_stream.platform
user_id = self.chat_stream.user_info.user_id
user_id = await person_api.get_person_value(person_id, "user_id")
if not user_id:
return False
get_config = get_config_getter()
permission_list = get_config("read.permission", [])
permission_type = get_config("read.permission_type", "blacklist")
if not isinstance(permission_list, list):
return False
if permission_type == 'whitelist':
return user_id in permission_list
elif permission_type == 'blacklist':
return user_id not in permission_list
return False
# 使用权限API检查用户是否有阅读说说的权限
return permission_api.check_permission(platform, user_id, "plugin.maizone.read_feed")
async def execute(self) -> Tuple[bool, str]:
"""

View File

@@ -7,6 +7,7 @@ from typing import Tuple
from src.common.logger import get_logger
from src.plugin_system import BaseAction, ActionActivationType, ChatMode
from src.plugin_system.apis import person_api, generator_api
from src.plugin_system.apis.permission_api import permission_api
from ..services.manager import get_qzone_service, get_config_getter
logger = get_logger("MaiZone.SendFeedAction")
@@ -32,27 +33,11 @@ class SendFeedAction(BaseAction):
async def _check_permission(self) -> bool:
"""检查当前用户是否有权限执行此动作"""
user_name = self.action_data.get("user_name", "")
person_id = person_api.get_person_id_by_name(user_name)
if not person_id:
return False
platform = self.chat_stream.platform
user_id = self.chat_stream.user_info.user_id
user_id = await person_api.get_person_value(person_id, "user_id")
if not user_id:
return False
get_config = get_config_getter()
permission_list = get_config("send.permission", [])
permission_type = get_config("send.permission_type", "whitelist")
if not isinstance(permission_list, list):
return False
if permission_type == 'whitelist':
return user_id in permission_list
elif permission_type == 'blacklist':
return user_id not in permission_list
return False
# 使用权限API检查用户是否有发送说说的权限
return permission_api.check_permission(platform, user_id, "plugin.maizone.send_feed")
async def execute(self) -> Tuple[bool, str]:
"""

View File

@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
发送说说命令
发送说说命令 await self.send_text(f"收到!正在为你生成关于"{topic or '随机'}"的说说,请稍候...【热重载测试成功】")
"""
from typing import Tuple
@@ -16,15 +16,16 @@ logger = get_logger("MaiZone.SendFeedCommand")
class SendFeedCommand(PlusCommand):
"""
响应用户通过 `/send_feed` 命令发送说说的请求。
测试热重载功能 - 这是一个测试注释,现在应该可以正常工作了!
"""
command_name: str = "send_feed"
command_description: str = "一条QQ空间说说"
command_description: str = "发一条QQ空间说说"
command_aliases = ["发空间"]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@require_permission("plugin.send.permission")
@require_permission("plugin.maizone.send_feed", "❌ 你没有发送QQ空间说说的权限")
async def execute(self, args: CommandArgs) -> Tuple[bool, str, bool]:
"""
执行命令的核心逻辑。

View File

@@ -83,12 +83,19 @@ class MaiZoneRefactoredPlugin(BasePlugin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 注册权限节点
permission_api.register_permission_node(
"plugin.send.permission",
"是否可以使用机器人发送说说",
"plugin.maizone.send_feed",
"是否可以使用机器人发送QQ空间说说",
"maiZone",
False
)
permission_api.register_permission_node(
"plugin.maizone.read_feed",
"是否可以使用机器人读取QQ空间说说",
"maiZone",
True
)
content_service = ContentService(self.get_config)
image_service = ImageService(self.get_config)
cookie_service = CookieService(self.get_config)
@@ -99,10 +106,18 @@ class MaiZoneRefactoredPlugin(BasePlugin):
register_service("qzone", qzone_service)
register_service("get_config", self.get_config)
asyncio.create_task(scheduler_service.start())
asyncio.create_task(monitor_service.start())
# 保存服务引用以便后续启动
self.scheduler_service = scheduler_service
self.monitor_service = monitor_service
logger.info("MaiZone重构版插件已加载服务已注册,后台任务已启动")
logger.info("MaiZone重构版插件已加载服务已注册。")
async def on_plugin_loaded(self):
"""插件加载完成后的回调,启动异步服务"""
if hasattr(self, 'scheduler_service') and hasattr(self, 'monitor_service'):
asyncio.create_task(self.scheduler_service.start())
asyncio.create_task(self.monitor_service.start())
logger.info("MaiZone后台任务已启动。")
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
return [

View File

@@ -13,226 +13,313 @@ from src.plugin_system import (
ComponentType,
send_api,
)
from src.plugin_system.base.plus_command import PlusCommand
from src.plugin_system.base.command_args import CommandArgs
from src.plugin_system.base.component_types import PlusCommandInfo, ChatType
from src.plugin_system.apis.permission_api import permission_api
from src.plugin_system.utils.permission_decorators import require_permission
from src.plugin_system.core.plugin_hot_reload import hot_reload_manager
class ManagementCommand(BaseCommand):
command_name: str = "management"
description: str = "管理命令"
command_pattern: str = r"(?P<manage_command>^/pm(\s[a-zA-Z0-9_]+)*\s*$)"
class ManagementCommand(PlusCommand):
"""插件管理命令 - 使用PlusCommand系统"""
command_name = "pm"
command_description = "插件管理命令,支持插件和组件的管理操作"
command_aliases = ["pluginmanage", "插件管理"]
priority = 10
chat_type_allow = ChatType.ALL
intercept_message = True
async def execute(self) -> Tuple[bool, str, bool]:
# sourcery skip: merge-duplicate-blocks
if (
not self.message
or not self.message.message_info
or not self.message.message_info.user_info
or str(self.message.message_info.user_info.user_id) not in self.get_config("plugin.permission", []) # type: ignore
):
await self._send_message("你没有权限使用插件管理命令")
return False, "没有权限", True
if not self.message.chat_stream:
await self._send_message("无法获取聊天流信息")
return False, "无法获取聊天流信息", True
self.stream_id = self.message.chat_stream.stream_id
if not self.stream_id:
await self._send_message("无法获取聊天流信息")
return False, "无法获取聊天流信息", True
command_list = self.matched_groups["manage_command"].strip().split(" ")
if len(command_list) == 1:
await self.show_help("all")
return True, "帮助已发送", True
if len(command_list) == 2:
match command_list[1]:
case "plugin":
await self.show_help("plugin")
case "component":
await self.show_help("component")
case "help":
await self.show_help("all")
case _:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if len(command_list) == 3:
if command_list[1] == "plugin":
match command_list[2]:
case "help":
await self.show_help("plugin")
case "list":
await self._list_registered_plugins()
case "list_enabled":
await self._list_loaded_plugins()
case "rescan":
await self._rescan_plugin_dirs()
case _:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
elif command_list[1] == "component":
if command_list[2] == "list":
await self._list_all_registered_components()
elif command_list[2] == "help":
await self.show_help("component")
else:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
else:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if len(command_list) == 4:
if command_list[1] == "plugin":
match command_list[2]:
case "load":
await self._load_plugin(command_list[3])
case "unload":
await self._unload_plugin(command_list[3])
case "reload":
await self._reload_plugin(command_list[3])
case "add_dir":
await self._add_dir(command_list[3])
case _:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
elif command_list[1] == "component":
if command_list[2] != "list":
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if command_list[3] == "enabled":
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@require_permission("plugin.management.admin", "❌ 你没有插件管理的权限")
async def execute(self, args: CommandArgs) -> Tuple[bool, str, bool]:
"""执行插件管理命令"""
if args.is_empty():
await self._show_help("all")
return True, "显示帮助信息", True
subcommand = args.get_first().lower()
remaining_args = args.get_args()[1:] # 获取除第一个参数外的所有参数
if subcommand in ["plugin", "插件"]:
return await self._handle_plugin_commands(remaining_args)
elif subcommand in ["component", "组件", "comp"]:
return await self._handle_component_commands(remaining_args)
elif subcommand in ["help", "帮助"]:
await self._show_help("all")
return True, "显示帮助信息", True
else:
await self.send_text(f"❌ 未知的子命令: {subcommand}\n使用 /pm help 查看帮助")
return True, "未知子命令", True
async def _handle_plugin_commands(self, args: List[str]) -> Tuple[bool, str, bool]:
"""处理插件相关命令"""
if not args:
await self._show_help("plugin")
return True, "显示插件帮助", True
action = args[0].lower()
if action in ["help", "帮助"]:
await self._show_help("plugin")
elif action in ["list", "列表"]:
await self._list_registered_plugins()
elif action in ["list_enabled", "已启用"]:
await self._list_loaded_plugins()
elif action in ["rescan", "重扫"]:
await self._rescan_plugin_dirs()
elif action in ["load", "加载"] and len(args) > 1:
await self._load_plugin(args[1])
elif action in ["unload", "卸载"] and len(args) > 1:
await self._unload_plugin(args[1])
elif action in ["reload", "重载"] and len(args) > 1:
await self._reload_plugin(args[1])
elif action in ["force_reload", "强制重载"] and len(args) > 1:
await self._force_reload_plugin(args[1])
elif action in ["add_dir", "添加目录"] and len(args) > 1:
await self._add_dir(args[1])
elif action in ["hotreload_status", "热重载状态"]:
await self._show_hotreload_status()
elif action in ["clear_cache", "清理缓存"]:
await self._clear_all_caches()
else:
await self.send_text("插件管理命令不合法\n使用 /pm plugin help 查看帮助")
return False, "命令不合法", True
return True, "插件命令执行完成", True
async def _handle_component_commands(self, args: List[str]) -> Tuple[bool, str, bool]:
"""处理组件相关命令"""
if not args:
await self._show_help("component")
return True, "显示组件帮助", True
action = args[0].lower()
if action in ["help", "帮助"]:
await self._show_help("component")
elif action in ["list", "列表"]:
if len(args) == 1:
await self._list_all_registered_components()
elif len(args) == 2:
if args[1] in ["enabled", "启用"]:
await self._list_enabled_components()
elif command_list[3] == "disabled":
elif args[1] in ["disabled", "禁用"]:
await self._list_disabled_components()
else:
await self._send_message("插件管理命令不合法")
await self.send_text("❌ 组件列表命令不合法")
return False, "命令不合法", True
else:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if len(command_list) == 5:
if command_list[1] != "component":
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if command_list[2] != "list":
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if command_list[3] == "enabled":
await self._list_enabled_components(target_type=command_list[4])
elif command_list[3] == "disabled":
await self._list_disabled_components(target_type=command_list[4])
elif command_list[3] == "type":
await self._list_registered_components_by_type(command_list[4])
else:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if len(command_list) == 6:
if command_list[1] != "component":
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if command_list[2] == "enable":
if command_list[3] == "global":
await self._globally_enable_component(command_list[4], command_list[5])
elif command_list[3] == "local":
await self._locally_enable_component(command_list[4], command_list[5])
elif len(args) == 3:
if args[1] in ["enabled", "启用"]:
await self._list_enabled_components(target_type=args[2])
elif args[1] in ["disabled", "禁用"]:
await self._list_disabled_components(target_type=args[2])
elif args[1] in ["type", "类型"]:
await self._list_registered_components_by_type(args[2])
else:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
elif command_list[2] == "disable":
if command_list[3] == "global":
await self._globally_disable_component(command_list[4], command_list[5])
elif command_list[3] == "local":
await self._locally_disable_component(command_list[4], command_list[5])
else:
await self._send_message("插件管理命令不合法")
await self.send_text("❌ 组件列表命令不合法")
return False, "命令不合法", True
elif action in ["enable", "启用"] and len(args) >= 4:
scope = args[1].lower()
component_name = args[2]
component_type = args[3]
if scope in ["global", "全局"]:
await self._globally_enable_component(component_name, component_type)
elif scope in ["local", "本地"]:
await self._locally_enable_component(component_name, component_type)
else:
await self._send_message("插件管理命令不合法")
await self.send_text("❌ 组件启用命令不合法,范围应为 global 或 local")
return False, "命令不合法", True
elif action in ["disable", "禁用"] and len(args) >= 4:
scope = args[1].lower()
component_name = args[2]
component_type = args[3]
if scope in ["global", "全局"]:
await self._globally_disable_component(component_name, component_type)
elif scope in ["local", "本地"]:
await self._locally_disable_component(component_name, component_type)
else:
await self.send_text("❌ 组件禁用命令不合法,范围应为 global 或 local")
return False, "命令不合法", True
else:
await self.send_text("❌ 组件管理命令不合法\n使用 /pm component help 查看帮助")
return False, "命令不合法", True
return True, "组件命令执行完成", True
return True, "命令执行完成", True
async def show_help(self, target: str):
async def _show_help(self, target: str):
"""显示帮助信息"""
help_msg = ""
match target:
case "all":
help_msg = (
"管理命令帮助\n"
"/pm help 管理命令提示\n"
"/pm plugin 插件管理命令\n"
"/pm component 组件管理命令\n"
"使用 /pm plugin help 或 /pm component help 获取具体帮助"
)
case "plugin":
help_msg = (
"插件管理命令帮助\n"
"/pm plugin help 插件管理命令提示\n"
"/pm plugin list 列出所有注册的插件\n"
"/pm plugin list_enabled 列出所有加载(启用)的插件\n"
"/pm plugin rescan 重新扫描所有目录\n"
"/pm plugin load <plugin_name> 加载指定插件\n"
"/pm plugin unload <plugin_name> 卸载指定插件\n"
"/pm plugin reload <plugin_name> 重新加载指定插件\n"
"/pm plugin add_dir <directory_path> 添加插件目录\n"
)
case "component":
help_msg = (
"组件管理命令帮助\n"
"/pm component help 组件管理命令提示\n"
"/pm component list 列出所有注册的组件\n"
"/pm component list enabled <可选: type> 列出所有启用的组件\n"
"/pm component list disabled <可选: type> 列出所有禁用的组件\n"
" - <type> 可选项: local代表当前聊天中的global代表全局的\n"
" - <type> 不填时为 global\n"
"/pm component list type <component_type> 列出已经注册的指定类型的组件\n"
"/pm component enable global <component_name> <component_type> 全局启用组件\n"
"/pm component enable local <component_name> <component_type> 本聊天启用组件\n"
"/pm component disable global <component_name> <component_type> 全局禁用组件\n"
"/pm component disable local <component_name> <component_type> 本聊天禁用组件\n"
" - <component_type> 可选项: action, command, event_handler\n"
)
case _:
return
await self._send_message(help_msg)
if target == "all":
help_msg = """📋 插件管理命令帮助
🔧 主要功能:
• `/pm help` - 显示此帮助
• `/pm plugin` - 插件管理命令
• `/pm component` - 组件管理命令
📝 使用示例:
• `/pm plugin help` - 查看插件管理帮助
• `/pm component help` - 查看组件管理帮助
🔄 别名:可以使用 `/pluginmanage` 或 `/插件管理` 代替 `/pm`"""
elif target == "plugin":
help_msg = """🔌 插件管理命令帮助
📋 基本操作:
• `/pm plugin help` - 显示插件管理帮助
• `/pm plugin list` - 列出所有注册的插件
• `/pm plugin list_enabled` - 列出所有加载(启用)的插件
• `/pm plugin rescan` - 重新扫描所有插件目录
⚙️ 插件控制:
• `/pm plugin load <插件名>` - 加载指定插件
• `/pm plugin unload <插件名>` - 卸载指定插件
• `/pm plugin reload <插件名>` - 重新加载指定插件
• `/pm plugin force_reload <插件名>` - 强制重载指定插件(深度清理)
• `/pm plugin add_dir <目录路径>` - 添加插件目录
<EFBFBD> 热重载管理:
• `/pm plugin hotreload_status` - 查看热重载状态
• `/pm plugin clear_cache` - 清理所有模块缓存
<EFBFBD>📝 示例:
• `/pm plugin load echo_example`
• `/pm plugin force_reload permission_manager_plugin`
• `/pm plugin clear_cache`"""
elif target == "component":
help_msg = """🧩 组件管理命令帮助
📋 基本查看:
• `/pm component help` - 显示组件管理帮助
• `/pm component list` - 列出所有注册的组件
• `/pm component list enabled [类型]` - 列出启用的组件
• `/pm component list disabled [类型]` - 列出禁用的组件
• `/pm component list type <组件类型>` - 列出指定类型的组件
⚙️ 组件控制:
• `/pm component enable global <组件名> <类型>` - 全局启用组件
• `/pm component enable local <组件名> <类型>` - 本聊天启用组件
• `/pm component disable global <组件名> <类型>` - 全局禁用组件
• `/pm component disable local <组件名> <类型>` - 本聊天禁用组件
📝 组件类型:
• `action` - 动作组件
• `command` - 命令组件
• `event_handler` - 事件处理组件
• `plus_command` - 增强命令组件
💡 示例:
• `/pm component list type plus_command`
• `/pm component enable global echo_command command`"""
await self.send_text(help_msg)
async def _list_loaded_plugins(self):
"""列出已加载的插件"""
plugins = plugin_manage_api.list_loaded_plugins()
await self._send_message(f"已加载的插件: {', '.join(plugins)}")
await self.send_text(f"📦 已加载的插件: {', '.join(plugins) if plugins else ''}")
async def _list_registered_plugins(self):
"""列出已注册的插件"""
plugins = plugin_manage_api.list_registered_plugins()
await self._send_message(f"已注册的插件: {', '.join(plugins)}")
await self.send_text(f"📋 已注册的插件: {', '.join(plugins) if plugins else ''}")
async def _rescan_plugin_dirs(self):
"""重新扫描插件目录"""
plugin_manage_api.rescan_plugin_directory()
await self._send_message("插件目录重新扫描执行中")
await self.send_text("🔄 插件目录重新扫描已启动")
async def _load_plugin(self, plugin_name: str):
"""加载指定插件"""
success, count = plugin_manage_api.load_plugin(plugin_name)
if success:
await self._send_message(f"插件加载成功: {plugin_name}")
await self.send_text(f"插件加载成功: `{plugin_name}`")
else:
if count == 0:
await self._send_message(f"插件{plugin_name}为禁用状态")
await self._send_message(f"插件加载失败: {plugin_name}")
await self.send_text(f"⚠️ 插件 `{plugin_name}` 为禁用状态")
else:
await self.send_text(f"❌ 插件加载失败: `{plugin_name}`")
async def _unload_plugin(self, plugin_name: str):
"""卸载指定插件"""
success = await plugin_manage_api.remove_plugin(plugin_name)
if success:
await self._send_message(f"插件卸载成功: {plugin_name}")
await self.send_text(f"插件卸载成功: `{plugin_name}`")
else:
await self._send_message(f"插件卸载失败: {plugin_name}")
await self.send_text(f"插件卸载失败: `{plugin_name}`")
async def _reload_plugin(self, plugin_name: str):
"""重新加载指定插件"""
success = await plugin_manage_api.reload_plugin(plugin_name)
if success:
await self._send_message(f"插件重新加载成功: {plugin_name}")
await self.send_text(f"插件重新加载成功: `{plugin_name}`")
else:
await self._send_message(f"插件重新加载失败: {plugin_name}")
await self.send_text(f"插件重新加载失败: `{plugin_name}`")
async def _force_reload_plugin(self, plugin_name: str):
"""强制重载指定插件(深度清理)"""
await self.send_text(f"🔄 开始强制重载插件: `{plugin_name}`...")
try:
success = hot_reload_manager.force_reload_plugin(plugin_name)
if success:
await self.send_text(f"✅ 插件强制重载成功: `{plugin_name}`")
else:
await self.send_text(f"❌ 插件强制重载失败: `{plugin_name}`")
except Exception as e:
await self.send_text(f"❌ 强制重载过程中发生错误: {str(e)}")
async def _show_hotreload_status(self):
"""显示热重载状态"""
try:
status = hot_reload_manager.get_status()
status_text = f"""🔄 **热重载系统状态**
🟢 **运行状态:** {'运行中' if status['is_running'] else '已停止'}
📂 **监听目录:** {len(status['watch_directories'])}
👁️ **活跃观察者:** {status['active_observers']}
📦 **已加载插件:** {status['loaded_plugins']}
❌ **失败插件:** {status['failed_plugins']}
⏱️ **防抖延迟:** {status.get('debounce_delay', 0)}
📋 **监听的目录:**"""
for i, watch_dir in enumerate(status['watch_directories'], 1):
dir_type = "(内置插件)" if "src" in watch_dir else "(外部插件)"
status_text += f"\n{i}. `{watch_dir}` {dir_type}"
if status.get('pending_reloads'):
status_text += f"\n\n⏳ **待重载插件:** {', '.join([f'`{p}`' for p in status['pending_reloads']])}"
await self.send_text(status_text)
except Exception as e:
await self.send_text(f"❌ 获取热重载状态时发生错误: {str(e)}")
async def _clear_all_caches(self):
"""清理所有模块缓存"""
await self.send_text("🧹 开始清理所有Python模块缓存...")
try:
hot_reload_manager.clear_all_caches()
await self.send_text("✅ 模块缓存清理完成!建议重载相关插件以确保生效。")
except Exception as e:
await self.send_text(f"❌ 清理缓存时发生错误: {str(e)}")
async def _add_dir(self, dir_path: str):
await self._send_message(f"正在添加插件目录: {dir_path}")
"""添加插件目录"""
await self.send_text(f"📁 正在添加插件目录: `{dir_path}`")
success = plugin_manage_api.add_plugin_directory(dir_path)
await asyncio.sleep(0.5) # 防止乱序发送
if success:
await self._send_message(f"插件目录添加成功: {dir_path}")
await self.send_text(f"插件目录添加成功: `{dir_path}`")
else:
await self._send_message(f"插件目录添加失败: {dir_path}")
await self.send_text(f"插件目录添加失败: `{dir_path}`")
def _fetch_all_registered_components(self) -> List[ComponentInfo]:
all_plugin_info = component_manage_api.get_all_plugin_info()
@@ -245,47 +332,55 @@ class ManagementCommand(BaseCommand):
return components_info
def _fetch_locally_disabled_components(self) -> List[str]:
"""获取本地禁用的组件列表"""
stream_id = self.message.chat_stream.stream_id
locally_disabled_components_actions = component_manage_api.get_locally_disabled_components(
self.message.chat_stream.stream_id, ComponentType.ACTION
stream_id, ComponentType.ACTION
)
locally_disabled_components_commands = component_manage_api.get_locally_disabled_components(
self.message.chat_stream.stream_id, ComponentType.COMMAND
stream_id, ComponentType.COMMAND
)
locally_disabled_components_plus_commands = component_manage_api.get_locally_disabled_components(
stream_id, ComponentType.PLUS_COMMAND
)
locally_disabled_components_event_handlers = component_manage_api.get_locally_disabled_components(
self.message.chat_stream.stream_id, ComponentType.EVENT_HANDLER
stream_id, ComponentType.EVENT_HANDLER
)
return (
locally_disabled_components_actions
+ locally_disabled_components_commands
+ locally_disabled_components_plus_commands
+ locally_disabled_components_event_handlers
)
async def _list_all_registered_components(self):
"""列出所有已注册的组件"""
components_info = self._fetch_all_registered_components()
if not components_info:
await self._send_message("没有注册的组件")
await self.send_text("📋 没有注册的组件")
return
all_components_str = ", ".join(
f"{component.name} ({component.component_type})" for component in components_info
f"`{component.name}` ({component.component_type})" for component in components_info
)
await self._send_message(f"已注册的组件: {all_components_str}")
await self.send_text(f"📋 已注册的组件:\n{all_components_str}")
async def _list_enabled_components(self, target_type: str = "global"):
"""列出启用的组件"""
components_info = self._fetch_all_registered_components()
if not components_info:
await self._send_message("没有注册的组件")
await self.send_text("📋 没有注册的组件")
return
if target_type == "global":
enabled_components = [component for component in components_info if component.enabled]
if not enabled_components:
await self._send_message("没有满足条件的已启用全局组件")
await self.send_text("📋 没有满足条件的已启用全局组件")
return
enabled_components_str = ", ".join(
f"{component.name} ({component.component_type})" for component in enabled_components
f"`{component.name}` ({component.component_type})" for component in enabled_components
)
await self._send_message(f"满足条件的已启用全局组件: {enabled_components_str}")
await self.send_text(f"满足条件的已启用全局组件:\n{enabled_components_str}")
elif target_type == "local":
locally_disabled_components = self._fetch_locally_disabled_components()
enabled_components = [
@@ -294,28 +389,29 @@ class ManagementCommand(BaseCommand):
if (component.name not in locally_disabled_components and component.enabled)
]
if not enabled_components:
await self._send_message("本聊天没有满足条件的已启用组件")
await self.send_text("📋 本聊天没有满足条件的已启用组件")
return
enabled_components_str = ", ".join(
f"{component.name} ({component.component_type})" for component in enabled_components
f"`{component.name}` ({component.component_type})" for component in enabled_components
)
await self._send_message(f"本聊天满足条件的已启用组件: {enabled_components_str}")
await self.send_text(f"本聊天满足条件的已启用组件:\n{enabled_components_str}")
async def _list_disabled_components(self, target_type: str = "global"):
"""列出禁用的组件"""
components_info = self._fetch_all_registered_components()
if not components_info:
await self._send_message("没有注册的组件")
await self.send_text("📋 没有注册的组件")
return
if target_type == "global":
disabled_components = [component for component in components_info if not component.enabled]
if not disabled_components:
await self._send_message("没有满足条件的已禁用全局组件")
await self.send_text("📋 没有满足条件的已禁用全局组件")
return
disabled_components_str = ", ".join(
f"{component.name} ({component.component_type})" for component in disabled_components
f"`{component.name}` ({component.component_type})" for component in disabled_components
)
await self._send_message(f"满足条件的已禁用全局组件: {disabled_components_str}")
await self.send_text(f"满足条件的已禁用全局组件:\n{disabled_components_str}")
elif target_type == "local":
locally_disabled_components = self._fetch_locally_disabled_components()
disabled_components = [
@@ -324,110 +420,115 @@ class ManagementCommand(BaseCommand):
if (component.name in locally_disabled_components or not component.enabled)
]
if not disabled_components:
await self._send_message("本聊天没有满足条件的已禁用组件")
await self.send_text("📋 本聊天没有满足条件的已禁用组件")
return
disabled_components_str = ", ".join(
f"{component.name} ({component.component_type})" for component in disabled_components
f"`{component.name}` ({component.component_type})" for component in disabled_components
)
await self._send_message(f"本聊天满足条件的已禁用组件: {disabled_components_str}")
await self.send_text(f"本聊天满足条件的已禁用组件:\n{disabled_components_str}")
async def _list_registered_components_by_type(self, target_type: str):
match target_type:
case "action":
component_type = ComponentType.ACTION
case "command":
component_type = ComponentType.COMMAND
case "event_handler":
component_type = ComponentType.EVENT_HANDLER
case _:
await self._send_message(f"未知组件类型: {target_type}")
return
"""按类型列出已注册的组件"""
type_mapping = {
"action": ComponentType.ACTION,
"command": ComponentType.COMMAND,
"event_handler": ComponentType.EVENT_HANDLER,
"plus_command": ComponentType.PLUS_COMMAND,
}
component_type = type_mapping.get(target_type.lower())
if not component_type:
await self.send_text(f"❌ 未知组件类型: `{target_type}`\n支持的类型: action, command, event_handler, plus_command")
return
components_info = component_manage_api.get_components_info_by_type(component_type)
if not components_info:
await self._send_message(f"没有注册的 {target_type} 组件")
await self.send_text(f"📋 没有注册的 `{target_type}` 组件")
return
components_str = ", ".join(
f"{name} ({component.component_type})" for name, component in components_info.items()
f"`{name}` ({component.component_type})" for name, component in components_info.items()
)
await self._send_message(f"注册的 {target_type} 组件: {components_str}")
await self.send_text(f"📋 注册的 `{target_type}` 组件:\n{components_str}")
async def _globally_enable_component(self, component_name: str, component_type: str):
match component_type:
case "action":
target_component_type = ComponentType.ACTION
case "command":
target_component_type = ComponentType.COMMAND
case "event_handler":
target_component_type = ComponentType.EVENT_HANDLER
case _:
await self._send_message(f"未知组件类型: {component_type}")
return
"""全局启用组件"""
type_mapping = {
"action": ComponentType.ACTION,
"command": ComponentType.COMMAND,
"event_handler": ComponentType.EVENT_HANDLER,
"plus_command": ComponentType.PLUS_COMMAND,
}
target_component_type = type_mapping.get(component_type.lower())
if not target_component_type:
await self.send_text(f"❌ 未知组件类型: `{component_type}`")
return
if component_manage_api.globally_enable_component(component_name, target_component_type):
await self._send_message(f"全局启用组件成功: {component_name}")
await self.send_text(f"全局启用组件成功: `{component_name}`")
else:
await self._send_message(f"全局启用组件失败: {component_name}")
await self.send_text(f"全局启用组件失败: `{component_name}`")
async def _globally_disable_component(self, component_name: str, component_type: str):
match component_type:
case "action":
target_component_type = ComponentType.ACTION
case "command":
target_component_type = ComponentType.COMMAND
case "event_handler":
target_component_type = ComponentType.EVENT_HANDLER
case _:
await self._send_message(f"未知组件类型: {component_type}")
return
"""全局禁用组件"""
type_mapping = {
"action": ComponentType.ACTION,
"command": ComponentType.COMMAND,
"event_handler": ComponentType.EVENT_HANDLER,
"plus_command": ComponentType.PLUS_COMMAND,
}
target_component_type = type_mapping.get(component_type.lower())
if not target_component_type:
await self.send_text(f"❌ 未知组件类型: `{component_type}`")
return
success = await component_manage_api.globally_disable_component(component_name, target_component_type)
if success:
await self._send_message(f"全局禁用组件成功: {component_name}")
await self.send_text(f"全局禁用组件成功: `{component_name}`")
else:
await self._send_message(f"全局禁用组件失败: {component_name}")
await self.send_text(f"全局禁用组件失败: `{component_name}`")
async def _locally_enable_component(self, component_name: str, component_type: str):
match component_type:
case "action":
target_component_type = ComponentType.ACTION
case "command":
target_component_type = ComponentType.COMMAND
case "event_handler":
target_component_type = ComponentType.EVENT_HANDLER
case _:
await self._send_message(f"未知组件类型: {component_type}")
return
if component_manage_api.locally_enable_component(
component_name,
target_component_type,
self.message.chat_stream.stream_id,
):
await self._send_message(f"本地启用组件成功: {component_name}")
"""本地启用组件"""
type_mapping = {
"action": ComponentType.ACTION,
"command": ComponentType.COMMAND,
"event_handler": ComponentType.EVENT_HANDLER,
"plus_command": ComponentType.PLUS_COMMAND,
}
target_component_type = type_mapping.get(component_type.lower())
if not target_component_type:
await self.send_text(f"❌ 未知组件类型: `{component_type}`")
return
stream_id = self.message.chat_stream.stream_id
if component_manage_api.locally_enable_component(component_name, target_component_type, stream_id):
await self.send_text(f"本地启用组件成功: `{component_name}`")
else:
await self._send_message(f"本地启用组件失败: {component_name}")
await self.send_text(f"本地启用组件失败: `{component_name}`")
async def _locally_disable_component(self, component_name: str, component_type: str):
match component_type:
case "action":
target_component_type = ComponentType.ACTION
case "command":
target_component_type = ComponentType.COMMAND
case "event_handler":
target_component_type = ComponentType.EVENT_HANDLER
case _:
await self._send_message(f"未知组件类型: {component_type}")
return
if component_manage_api.locally_disable_component(
component_name,
target_component_type,
self.message.chat_stream.stream_id,
):
await self._send_message(f"本地禁用组件成功: {component_name}")
"""本地禁用组件"""
type_mapping = {
"action": ComponentType.ACTION,
"command": ComponentType.COMMAND,
"event_handler": ComponentType.EVENT_HANDLER,
"plus_command": ComponentType.PLUS_COMMAND,
}
target_component_type = type_mapping.get(component_type.lower())
if not target_component_type:
await self.send_text(f"❌ 未知组件类型: `{component_type}`")
return
stream_id = self.message.chat_stream.stream_id
if component_manage_api.locally_disable_component(component_name, target_component_type, stream_id):
await self.send_text(f"本地禁用组件成功: `{component_name}`")
else:
await self._send_message(f"本地禁用组件失败: {component_name}")
async def _send_message(self, message: str):
await send_api.text_to_stream(message, self.stream_id, typing=False, storage_message=False)
await self.send_text(f"本地禁用组件失败: `{component_name}`")
@register_plugin
@@ -441,14 +542,22 @@ class PluginManagementPlugin(BasePlugin):
"plugin": {
"enabled": ConfigField(bool, default=False, description="是否启用插件"),
"config_version": ConfigField(type=str, default="1.1.0", description="配置文件版本"),
"permission": ConfigField(
list, default=[], description="有权限使用插件管理命令的用户列表请填写字符串形式的用户ID"
),
},
}
def get_plugin_components(self) -> List[Tuple[CommandInfo, Type[BaseCommand]]]:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 注册权限节点
permission_api.register_permission_node(
"plugin.management.admin",
"插件管理:可以管理插件和组件的加载、卸载、启用、禁用等操作",
"plugin_management",
False
)
def get_plugin_components(self) -> List[Tuple[PlusCommandInfo, Type[PlusCommand]]]:
"""返回插件的PlusCommand组件"""
components = []
if self.get_config("plugin.enabled", True):
components.append((ManagementCommand.get_command_info(), ManagementCommand))
components.append((ManagementCommand.get_plus_command_info(), ManagementCommand))
return components