This commit is contained in:
minecraft1024a
2025-08-29 13:56:27 +08:00
9 changed files with 673 additions and 548 deletions

View File

@@ -1,42 +0,0 @@
{
"manifest_version": 1,
"name": "权限示例插件 (Permission Example Plugin)",
"version": "1.0.0",
"description": "MaiCore权限系统演示插件包含权限节点注册、权限检查和多种权限命令示例。",
"author": {
"name": "MoFox-Studio",
"url": "https://github.com/MoFox-Studio"
},
"license": "GPL-v3.0-or-later",
"host_application": {
"min_version": "0.10.0"
},
"keywords": ["permission", "example", "权限", "admin", "user", "master", "demo", "tutorial"],
"categories": ["Examples", "Tutorial", "Permission"],
"default_locale": "zh-CN",
"locales_path": "_locales",
"plugin_info": {
"is_built_in": false,
"plugin_type": "example",
"components": [
{
"type": "command",
"name": "admin_example",
"description": "管理员权限示例命令",
"pattern": "/admin_example"
},
{
"type": "command",
"name": "user_example",
"description": "用户权限示例命令",
"pattern": "/user_example"
},
{
"type": "command",
"name": "master_example",
"description": "Master专用示例命令",
"pattern": "/master_example"
}
]
}
}

View File

@@ -1,107 +0,0 @@
"""
权限系统示例插件
演示如何在插件中使用权限系统,包括权限节点注册、权限检查等功能。
"""
from typing import List
from src.plugin_system.apis.plugin_register_api import register_plugin
from src.plugin_system.base.base_plugin import BasePlugin
from src.plugin_system.base.base_command import BaseCommand
from src.plugin_system.apis.logging_api import get_logger
from src.plugin_system.base.config_types import ConfigField
from src.plugin_system.utils.permission_decorators import require_permission, require_master
from src.common.message import ChatStream, Message
logger = get_logger(__name__)
class ExampleAdminCommand(BaseCommand):
"""需要管理员权限的示例命令"""
command_name = "admin_example"
command_description = "管理员权限示例命令"
command_pattern = r"^/admin_example$"
command_help = "管理员权限示例命令"
command_examples = ["/admin_example"]
intercept_message = True
def can_execute(self, message: Message, chat_stream: ChatStream) -> bool:
"""基本检查"""
return True
@require_permission("plugin.example.admin")
async def execute(self, message: Message, chat_stream: ChatStream, args: List[str]) -> None:
"""执行管理员命令"""
await self.send_text("✅ 你有管理员权限!这是一个管理员专用功能。")
return True, "执行成功", True
class ExampleUserCommand(BaseCommand):
"""普通用户权限的示例命令"""
command_name = "user_example"
command_description = "用户权限示例命令"
command_pattern = r"^/user_example$"
command_help = "用户权限示例命令"
command_examples = ["/user_example"]
intercept_message = True
def can_execute(self, message: Message, chat_stream: ChatStream) -> bool:
"""基本检查"""
return True
@require_permission("plugin.example.user")
async def execute(self, message: Message, chat_stream: ChatStream, args: List[str]) -> None:
"""执行用户命令"""
await self.send_text("✅ 你有用户权限!这是一个普通用户功能。")
class ExampleMasterCommand(BaseCommand):
"""Master专用的示例命令"""
command_name = "master_example"
command_description = "Master专用示例命令"
command_pattern = r"^/master_example$"
command_help = "Master专用示例命令"
command_examples = ["/master_example"]
intercept_message = True
def can_execute(self, message: Message, chat_stream: ChatStream) -> bool:
"""基本检查"""
return True
@require_master()
async def execute(self, message: Message, chat_stream: ChatStream, args: List[str]) -> None:
"""执行Master命令"""
await self.send_text("👑 你是Master用户这是Master专用功能。")
@register_plugin
class HelloWorldPlugin(BasePlugin):
"""权限系统示例插件"""
# 插件基本信息
plugin_name: str = "permission_example" # 内部标识符
enable_plugin: bool = True
dependencies: List[str] = [] # 插件依赖列表
python_dependencies: List[str] = [] # Python包依赖列表
config_file_name: str = "config.toml" # 配置文件名
# 配置Schema定义
config_schema: dict = {
"plugin": {
"name": ConfigField(type=str, default="permission_example", description="插件名称"),
"version": ConfigField(type=str, default="1.0.0", description="插件版本"),
"enabled": ConfigField(type=bool, default=False, description="是否启用插件"),
}
}
def get_plugin_components(self):
return [(ExampleAdminCommand.get_command_info,ExampleAdminCommand),
(ExampleUserCommand.get_command_info,ExampleUserCommand),
(ExampleMasterCommand.get_command_info,ExampleMasterCommand)
]

View File

@@ -5,7 +5,9 @@
"""
import os
import sys
import time
import importlib
from pathlib import Path
from threading import Thread
from typing import Dict, Set, List, Optional, Tuple
@@ -27,7 +29,8 @@ class PluginFileHandler(FileSystemEventHandler):
self.hot_reload_manager = hot_reload_manager
self.pending_reloads: Set[str] = set() # 待重载的插件名称
self.last_reload_time: Dict[str, float] = {} # 上次重载时间
self.debounce_delay = 1.0 # 防抖延迟(秒)
self.debounce_delay = 2.0 # 增加防抖延迟到2秒确保文件写入完成
self.file_change_cache: Dict[str, float] = {} # 文件变化缓存
def on_modified(self, event):
"""文件修改事件"""
@@ -60,26 +63,42 @@ class PluginFileHandler(FileSystemEventHandler):
plugin_name, source_type = plugin_info
current_time = time.time()
last_time = self.last_reload_time.get(plugin_name, 0)
# 防抖处理,避免频繁重载
if current_time - last_time < self.debounce_delay:
# 文件变化缓存,避免重复处理同一文件的快速连续变化
file_cache_key = f"{file_path}_{change_type}"
last_file_time = self.file_change_cache.get(file_cache_key, 0)
if current_time - last_file_time < 0.5: # 0.5秒内的重复文件变化忽略
return
self.file_change_cache[file_cache_key] = current_time
# 插件级别的防抖处理
last_plugin_time = self.last_reload_time.get(plugin_name, 0)
if current_time - last_plugin_time < self.debounce_delay:
# 如果在防抖期内,更新待重载标记但不立即处理
self.pending_reloads.add(plugin_name)
return
file_name = Path(file_path).name
logger.info(f"📁 检测到插件文件变化: {file_name} ({change_type}) [{source_type}]")
logger.info(f"📁 检测到插件文件变化: {file_name} ({change_type}) [{source_type}] -> {plugin_name}")
# 如果是删除事件,处理关键文件删除
if change_type == "deleted":
# 解析实际的插件名称
actual_plugin_name = self.hot_reload_manager._resolve_plugin_name(plugin_name)
if file_name == "plugin.py":
if plugin_name in plugin_manager.loaded_plugins:
logger.info(f"🗑️ 插件主文件被删除,卸载插件: {plugin_name} [{source_type}]")
self.hot_reload_manager._unload_plugin(plugin_name)
if actual_plugin_name in plugin_manager.loaded_plugins:
logger.info(f"🗑️ 插件主文件被删除,卸载插件: {plugin_name} -> {actual_plugin_name} [{source_type}]")
self.hot_reload_manager._unload_plugin(actual_plugin_name)
else:
logger.info(f"🗑️ 插件主文件被删除,但插件未加载: {plugin_name} -> {actual_plugin_name} [{source_type}]")
return
elif file_name in ("manifest.toml", "_manifest.json"):
if plugin_name in plugin_manager.loaded_plugins:
logger.info(f"🗑️ 插件配置文件被删除,卸载插件: {plugin_name} [{source_type}]")
self.hot_reload_manager._unload_plugin(plugin_name)
if actual_plugin_name in plugin_manager.loaded_plugins:
logger.info(f"🗑️ 插件配置文件被删除,卸载插件: {plugin_name} -> {actual_plugin_name} [{source_type}]")
self.hot_reload_manager._unload_plugin(actual_plugin_name)
else:
logger.info(f"🗑️ 插件配置文件被删除,但插件未加载: {plugin_name} -> {actual_plugin_name} [{source_type}]")
return
# 对于修改和创建事件,都进行重载
@@ -87,29 +106,43 @@ class PluginFileHandler(FileSystemEventHandler):
self.pending_reloads.add(plugin_name)
self.last_reload_time[plugin_name] = current_time
# 延迟重载,避免文件正在写入时重载
# 延迟重载,确保文件写入完成
reload_thread = Thread(
target=self._delayed_reload,
args=(plugin_name, source_type),
args=(plugin_name, source_type, current_time),
daemon=True
)
reload_thread.start()
except Exception as e:
logger.error(f"❌ 处理文件变化时发生错误: {e}")
logger.error(f"❌ 处理文件变化时发生错误: {e}", exc_info=True)
def _delayed_reload(self, plugin_name: str, source_type: str):
def _delayed_reload(self, plugin_name: str, source_type: str, trigger_time: float):
"""延迟重载插件"""
try:
# 等待文件写入完成
time.sleep(self.debounce_delay)
if plugin_name in self.pending_reloads:
self.pending_reloads.remove(plugin_name)
logger.info(f"🔄 延迟重载插件: {plugin_name} [{source_type}]")
self.hot_reload_manager._reload_plugin(plugin_name)
# 检查是否还需要重载(可能在等待期间有更新的变化)
if plugin_name not in self.pending_reloads:
return
# 检查是否有更新的重载请求
if self.last_reload_time.get(plugin_name, 0) > trigger_time:
return
self.pending_reloads.discard(plugin_name)
logger.info(f"🔄 开始延迟重载插件: {plugin_name} [{source_type}]")
# 执行深度重载
success = self.hot_reload_manager._deep_reload_plugin(plugin_name)
if success:
logger.info(f"✅ 插件重载成功: {plugin_name} [{source_type}]")
else:
logger.error(f"❌ 插件重载失败: {plugin_name} [{source_type}]")
except Exception as e:
logger.error(f"❌ 延迟重载插件 {plugin_name} 时发生错误: {e}")
logger.error(f"❌ 延迟重载插件 {plugin_name} 时发生错误: {e}", exc_info=True)
def _get_plugin_info_from_path(self, file_path: str) -> Optional[Tuple[str, str]]:
"""从文件路径获取插件信息
@@ -237,35 +270,131 @@ class PluginHotReloadManager:
logger.info("🛑 插件热重载已停止")
def _reload_plugin(self, plugin_name: str):
"""重载指定插件"""
"""重载指定插件(简单重载)"""
try:
logger.info(f"🔄 开始重载插件: {plugin_name}")
# 解析实际的插件名称
actual_plugin_name = self._resolve_plugin_name(plugin_name)
logger.info(f"🔄 开始简单重载插件: {plugin_name} -> {actual_plugin_name}")
if plugin_manager.reload_plugin(plugin_name):
logger.info(f"✅ 插件重载成功: {plugin_name}")
if plugin_manager.reload_plugin(actual_plugin_name):
logger.info(f"✅ 插件简单重载成功: {actual_plugin_name}")
return True
else:
logger.error(f"❌ 插件重载失败: {plugin_name}")
logger.error(f"❌ 插件简单重载失败: {actual_plugin_name}")
return False
except Exception as e:
logger.error(f"❌ 重载插件 {plugin_name} 时发生错误: {e}")
logger.error(f"❌ 重载插件 {plugin_name} 时发生错误: {e}", exc_info=True)
return False
def _resolve_plugin_name(self, folder_name: str) -> str:
"""
将文件夹名称解析为实际的插件名称
通过检查插件管理器中的路径映射来找到对应的插件名
"""
# 首先检查是否直接匹配
if folder_name in plugin_manager.plugin_classes:
logger.debug(f"🔍 直接匹配插件名: {folder_name}")
return folder_name
# 如果没有直接匹配,搜索路径映射,并优先返回在插件类中存在的名称
matched_plugins = []
for plugin_name, plugin_path in plugin_manager.plugin_paths.items():
# 检查路径是否包含该文件夹名
if folder_name in plugin_path:
matched_plugins.append((plugin_name, plugin_path))
# 在匹配的插件中,优先选择在插件类中存在的
for plugin_name, plugin_path in matched_plugins:
if plugin_name in plugin_manager.plugin_classes:
logger.debug(f"🔍 文件夹名 '{folder_name}' 映射到插件名 '{plugin_name}' (路径: {plugin_path})")
return plugin_name
# 如果还是没找到在插件类中存在的,返回第一个匹配项
if matched_plugins:
plugin_name, plugin_path = matched_plugins[0]
logger.warning(f"⚠️ 文件夹 '{folder_name}' 映射到 '{plugin_name}',但该插件类不存在")
return plugin_name
# 如果还是没找到,返回原文件夹名
logger.warning(f"⚠️ 无法找到文件夹 '{folder_name}' 对应的插件名,使用原名称")
return folder_name
def _deep_reload_plugin(self, plugin_name: str):
"""深度重载指定插件(清理模块缓存)"""
try:
# 解析实际的插件名称
actual_plugin_name = self._resolve_plugin_name(plugin_name)
logger.info(f"🔄 开始深度重载插件: {plugin_name} -> {actual_plugin_name}")
# 强制清理相关模块缓存
self._force_clear_plugin_modules(plugin_name)
# 使用插件管理器的强制重载功能
success = plugin_manager.force_reload_plugin(actual_plugin_name)
if success:
logger.info(f"✅ 插件深度重载成功: {actual_plugin_name}")
return True
else:
logger.error(f"❌ 插件深度重载失败,尝试简单重载: {actual_plugin_name}")
# 如果深度重载失败,尝试简单重载
return self._reload_plugin(actual_plugin_name)
except Exception as e:
logger.error(f"❌ 深度重载插件 {plugin_name} 时发生错误: {e}", exc_info=True)
# 出错时尝试简单重载
return self._reload_plugin(plugin_name)
def _force_clear_plugin_modules(self, plugin_name: str):
"""强制清理插件相关的模块缓存"""
import sys
# 找到所有相关的模块名
modules_to_remove = []
plugin_module_prefix = f"src.plugins.built_in.{plugin_name}"
for module_name in list(sys.modules.keys()):
if plugin_module_prefix in module_name:
modules_to_remove.append(module_name)
# 删除模块缓存
for module_name in modules_to_remove:
if module_name in sys.modules:
logger.debug(f"🗑️ 清理模块缓存: {module_name}")
del sys.modules[module_name]
def _force_reimport_plugin(self, plugin_name: str):
"""强制重新导入插件(委托给插件管理器)"""
try:
# 使用插件管理器的重载功能
success = plugin_manager.reload_plugin(plugin_name)
return success
except Exception as e:
logger.error(f"❌ 强制重新导入插件 {plugin_name} 时发生错误: {e}", exc_info=True)
return False
def _unload_plugin(self, plugin_name: str):
"""卸载指定插件"""
try:
logger.info(f"🗑️ 开始卸载插件: {plugin_name}")
if plugin_manager.unload_plugin(plugin_name):
logger.info(f"✅ 插件卸载成功: {plugin_name}")
return True
else:
logger.error(f"❌ 插件卸载失败: {plugin_name}")
return False
except Exception as e:
logger.error(f"❌ 卸载插件 {plugin_name} 时发生错误: {e}")
logger.error(f"❌ 卸载插件 {plugin_name} 时发生错误: {e}", exc_info=True)
return False
def reload_all_plugins(self):
"""重载所有插件"""
try:
logger.info("🔄 开始重载所有插件...")
logger.info("🔄 开始深度重载所有插件...")
# 获取当前已加载的插件列表
loaded_plugins = list(plugin_manager.loaded_plugins.keys())
@@ -274,15 +403,42 @@ class PluginHotReloadManager:
fail_count = 0
for plugin_name in loaded_plugins:
if plugin_manager.reload_plugin(plugin_name):
logger.info(f"🔄 重载插件: {plugin_name}")
if self._deep_reload_plugin(plugin_name):
success_count += 1
else:
fail_count += 1
logger.info(f"✅ 插件重载完成: 成功 {success_count} 个,失败 {fail_count}")
# 清理全局缓存
importlib.invalidate_caches()
except Exception as e:
logger.error(f"❌ 重载所有插件时发生错误: {e}")
logger.error(f"❌ 重载所有插件时发生错误: {e}", exc_info=True)
def force_reload_plugin(self, plugin_name: str):
"""手动强制重载指定插件(委托给插件管理器)"""
try:
logger.info(f"🔄 手动强制重载插件: {plugin_name}")
# 清理待重载列表中的该插件(避免重复重载)
for handler in self.file_handlers:
handler.pending_reloads.discard(plugin_name)
# 使用插件管理器的强制重载功能
success = plugin_manager.force_reload_plugin(plugin_name)
if success:
logger.info(f"✅ 手动强制重载成功: {plugin_name}")
else:
logger.error(f"❌ 手动强制重载失败: {plugin_name}")
return success
except Exception as e:
logger.error(f"❌ 手动强制重载插件 {plugin_name} 时发生错误: {e}", exc_info=True)
return False
def add_watch_directory(self, directory: str):
"""添加新的监听目录"""
@@ -321,14 +477,33 @@ class PluginHotReloadManager:
def get_status(self) -> dict:
"""获取热重载状态"""
pending_reloads = set()
if self.file_handlers:
for handler in self.file_handlers:
pending_reloads.update(handler.pending_reloads)
return {
"is_running": self.is_running,
"watch_directories": self.watch_directories,
"active_observers": len(self.observers),
"loaded_plugins": len(plugin_manager.loaded_plugins),
"failed_plugins": len(plugin_manager.failed_plugins),
"pending_reloads": list(pending_reloads),
"debounce_delay": self.file_handlers[0].debounce_delay if self.file_handlers else 0,
}
def clear_all_caches(self):
"""清理所有Python模块缓存"""
try:
logger.info("🧹 开始清理所有Python模块缓存...")
# 重新扫描所有插件目录,这会重新加载模块
plugin_manager.rescan_plugin_directory()
logger.info("✅ 模块缓存清理完成")
except Exception as e:
logger.error(f"❌ 清理模块缓存时发生错误: {e}", exc_info=True)
# 全局热重载管理器实例
hot_reload_manager = PluginHotReloadManager()

View File

@@ -2,6 +2,7 @@ import asyncio
import os
import traceback
import sys
import importlib
from typing import Dict, List, Optional, Tuple, Type, Any
from importlib.util import spec_from_file_location, module_from_spec
@@ -289,11 +290,11 @@ class PluginManager:
Args:
plugin_file: 插件文件路径
plugin_name: 插件名称
plugin_dir: 插件目录路径
"""
# 生成模块名
# 生成模块名和插件信息
plugin_path = Path(plugin_file)
plugin_dir = plugin_path.parent # 插件目录
plugin_name = plugin_dir.name # 插件名称
module_name = ".".join(plugin_path.parent.parts)
try:
@@ -307,13 +308,13 @@ class PluginManager:
module.__package__ = module_name # 设置模块包名
spec.loader.exec_module(module)
logger.debug(f"插件模块加载成功: {plugin_file}")
logger.debug(f"插件模块加载成功: {plugin_file} -> {plugin_name} ({plugin_dir})")
return True
except Exception as e:
error_msg = f"加载插件模块 {plugin_file} 失败: {e}"
logger.error(error_msg)
self.failed_plugins[module_name] = error_msg
self.failed_plugins[plugin_name if 'plugin_name' in locals() else module_name] = error_msg
return False
# == 兼容性检查 ==
@@ -527,6 +528,10 @@ class PluginManager:
# 从已加载插件中移除
del self.loaded_plugins[plugin_name]
# 从插件类注册表中移除
if plugin_name in self.plugin_classes:
del self.plugin_classes[plugin_name]
# 从失败列表中移除(如果存在)
if plugin_name in self.failed_plugins:
del self.failed_plugins[plugin_name]
@@ -535,7 +540,7 @@ class PluginManager:
return True
except Exception as e:
logger.error(f"❌ 插件卸载失败: {plugin_name} - {str(e)}")
logger.error(f"❌ 插件卸载失败: {plugin_name} - {str(e)}", exc_info=True)
return False
def reload_plugin(self, plugin_name: str) -> bool:
@@ -548,55 +553,54 @@ class PluginManager:
bool: 重载是否成功
"""
try:
# 先卸载插件
logger.info(f"🔄 开始重载插件: {plugin_name}")
# 卸载插件
if plugin_name in self.loaded_plugins:
self.unload_plugin(plugin_name)
if not self.unload_plugin(plugin_name):
logger.warning(f"⚠️ 插件卸载失败,继续重载: {plugin_name}")
# 清除Python模块缓存
plugin_path = self.plugin_paths.get(plugin_name)
if plugin_path:
plugin_file = os.path.join(plugin_path, "plugin.py")
if os.path.exists(plugin_file):
# 从sys.modules中移除相关模块
modules_to_remove = []
plugin_module_prefix = ".".join(Path(plugin_file).parent.parts)
# 重新扫描插件目录
self.rescan_plugin_directory()
for module_name in sys.modules:
if module_name.startswith(plugin_module_prefix):
modules_to_remove.append(module_name)
for module_name in modules_to_remove:
del sys.modules[module_name]
# 从插件类注册表中移除
if plugin_name in self.plugin_classes:
del self.plugin_classes[plugin_name]
# 重新加载插件模块
if self._load_plugin_module_file(plugin_file):
# 重新加载插件实例
success, _ = self.load_registered_plugin_classes(plugin_name)
if success:
logger.info(f"🔄 插件重载成功: {plugin_name}")
return True
else:
logger.error(f"❌ 插件重载失败: {plugin_name} - 实例化失败")
return False
else:
logger.error(f"❌ 插件重载失败: {plugin_name} - 模块加载失败")
return False
# 重新加载插件实例
if plugin_name in self.plugin_classes:
success, _ = self.load_registered_plugin_classes(plugin_name)
if success:
logger.info(f"✅ 插件重载成功: {plugin_name}")
return True
else:
logger.error(f"❌ 插件重载失败: {plugin_name} - 插件文件不存在")
logger.error(f"❌ 插件重载失败: {plugin_name} - 实例化失败")
return False
else:
logger.error(f"❌ 插件重载失败: {plugin_name} - 插件路径未知")
logger.error(f"❌ 插件重载失败: {plugin_name} - 插件类未找到")
return False
except Exception as e:
logger.error(f"❌ 插件重载失败: {plugin_name} - {str(e)}")
logger.debug("详细错误信息: ", exc_info=True)
logger.error(f"❌ 插件重载失败: {plugin_name} - {str(e)}", exc_info=True)
return False
def force_reload_plugin(self, plugin_name: str) -> bool:
"""强制重载插件(使用简化的方法)
Args:
plugin_name: 插件名称
Returns:
bool: 重载是否成功
"""
return self.reload_plugin(plugin_name)
def clear_all_plugin_caches(self):
"""清理所有插件相关的模块缓存(简化版)"""
try:
logger.info("🧹 清理模块缓存...")
# 清理importlib缓存
importlib.invalidate_caches()
logger.info("🧹 模块缓存清理完成")
except Exception as e:
logger.error(f"❌ 清理模块缓存时发生错误: {e}", exc_info=True)
# 全局插件管理器实例
plugin_manager = PluginManager()

View File

@@ -7,6 +7,7 @@ from typing import Tuple
from src.common.logger import get_logger
from src.plugin_system import BaseAction, ActionActivationType, ChatMode
from src.plugin_system.apis import person_api, generator_api
from src.plugin_system.apis.permission_api import permission_api
from ..services.manager import get_qzone_service, get_config_getter
logger = get_logger("MaiZone.ReadFeedAction")
@@ -32,27 +33,11 @@ class ReadFeedAction(BaseAction):
async def _check_permission(self) -> bool:
"""检查当前用户是否有权限执行此动作"""
user_name = self.action_data.get("user_name", "")
person_id = person_api.get_person_id_by_name(user_name)
if not person_id:
return False
platform = self.chat_stream.platform
user_id = self.chat_stream.user_info.user_id
user_id = await person_api.get_person_value(person_id, "user_id")
if not user_id:
return False
get_config = get_config_getter()
permission_list = get_config("read.permission", [])
permission_type = get_config("read.permission_type", "blacklist")
if not isinstance(permission_list, list):
return False
if permission_type == 'whitelist':
return user_id in permission_list
elif permission_type == 'blacklist':
return user_id not in permission_list
return False
# 使用权限API检查用户是否有阅读说说的权限
return permission_api.check_permission(platform, user_id, "plugin.maizone.read_feed")
async def execute(self) -> Tuple[bool, str]:
"""

View File

@@ -7,6 +7,7 @@ from typing import Tuple
from src.common.logger import get_logger
from src.plugin_system import BaseAction, ActionActivationType, ChatMode
from src.plugin_system.apis import person_api, generator_api
from src.plugin_system.apis.permission_api import permission_api
from ..services.manager import get_qzone_service, get_config_getter
logger = get_logger("MaiZone.SendFeedAction")
@@ -32,27 +33,11 @@ class SendFeedAction(BaseAction):
async def _check_permission(self) -> bool:
"""检查当前用户是否有权限执行此动作"""
user_name = self.action_data.get("user_name", "")
person_id = person_api.get_person_id_by_name(user_name)
if not person_id:
return False
platform = self.chat_stream.platform
user_id = self.chat_stream.user_info.user_id
user_id = await person_api.get_person_value(person_id, "user_id")
if not user_id:
return False
get_config = get_config_getter()
permission_list = get_config("send.permission", [])
permission_type = get_config("send.permission_type", "whitelist")
if not isinstance(permission_list, list):
return False
if permission_type == 'whitelist':
return user_id in permission_list
elif permission_type == 'blacklist':
return user_id not in permission_list
return False
# 使用权限API检查用户是否有发送说说的权限
return permission_api.check_permission(platform, user_id, "plugin.maizone.send_feed")
async def execute(self) -> Tuple[bool, str]:
"""

View File

@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
发送说说命令
发送说说命令 await self.send_text(f"收到!正在为你生成关于"{topic or '随机'}"的说说,请稍候...【热重载测试成功】")
"""
from typing import Tuple
@@ -16,15 +16,16 @@ logger = get_logger("MaiZone.SendFeedCommand")
class SendFeedCommand(PlusCommand):
"""
响应用户通过 `/send_feed` 命令发送说说的请求。
测试热重载功能 - 这是一个测试注释,现在应该可以正常工作了!
"""
command_name: str = "send_feed"
command_description: str = "一条QQ空间说说"
command_description: str = "发一条QQ空间说说"
command_aliases = ["发空间"]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@require_permission("plugin.send.permission")
@require_permission("plugin.maizone.send_feed", "❌ 你没有发送QQ空间说说的权限")
async def execute(self, args: CommandArgs) -> Tuple[bool, str, bool]:
"""
执行命令的核心逻辑。

View File

@@ -84,12 +84,19 @@ class MaiZoneRefactoredPlugin(BasePlugin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 注册权限节点
permission_api.register_permission_node(
"plugin.send.permission",
"是否可以使用机器人发送说说",
"plugin.maizone.send_feed",
"是否可以使用机器人发送QQ空间说说",
"maiZone",
False
)
permission_api.register_permission_node(
"plugin.maizone.read_feed",
"是否可以使用机器人读取QQ空间说说",
"maiZone",
True
)
content_service = ContentService(self.get_config)
image_service = ImageService(self.get_config)
cookie_service = CookieService(self.get_config)
@@ -102,10 +109,18 @@ class MaiZoneRefactoredPlugin(BasePlugin):
register_service("reply_tracker", reply_tracker_service)
register_service("get_config", self.get_config)
asyncio.create_task(scheduler_service.start())
asyncio.create_task(monitor_service.start())
# 保存服务引用以便后续启动
self.scheduler_service = scheduler_service
self.monitor_service = monitor_service
logger.info("MaiZone重构版插件已加载服务已注册,后台任务已启动")
logger.info("MaiZone重构版插件已加载服务已注册。")
async def on_plugin_loaded(self):
"""插件加载完成后的回调,启动异步服务"""
if hasattr(self, 'scheduler_service') and hasattr(self, 'monitor_service'):
asyncio.create_task(self.scheduler_service.start())
asyncio.create_task(self.monitor_service.start())
logger.info("MaiZone后台任务已启动。")
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
return [

View File

@@ -13,226 +13,313 @@ from src.plugin_system import (
ComponentType,
send_api,
)
from src.plugin_system.base.plus_command import PlusCommand
from src.plugin_system.base.command_args import CommandArgs
from src.plugin_system.base.component_types import PlusCommandInfo, ChatType
from src.plugin_system.apis.permission_api import permission_api
from src.plugin_system.utils.permission_decorators import require_permission
from src.plugin_system.core.plugin_hot_reload import hot_reload_manager
class ManagementCommand(BaseCommand):
command_name: str = "management"
description: str = "管理命令"
command_pattern: str = r"(?P<manage_command>^/pm(\s[a-zA-Z0-9_]+)*\s*$)"
class ManagementCommand(PlusCommand):
"""插件管理命令 - 使用PlusCommand系统"""
command_name = "pm"
command_description = "插件管理命令,支持插件和组件的管理操作"
command_aliases = ["pluginmanage", "插件管理"]
priority = 10
chat_type_allow = ChatType.ALL
intercept_message = True
async def execute(self) -> Tuple[bool, str, bool]:
# sourcery skip: merge-duplicate-blocks
if (
not self.message
or not self.message.message_info
or not self.message.message_info.user_info
or str(self.message.message_info.user_info.user_id) not in self.get_config("plugin.permission", []) # type: ignore
):
await self._send_message("你没有权限使用插件管理命令")
return False, "没有权限", True
if not self.message.chat_stream:
await self._send_message("无法获取聊天流信息")
return False, "无法获取聊天流信息", True
self.stream_id = self.message.chat_stream.stream_id
if not self.stream_id:
await self._send_message("无法获取聊天流信息")
return False, "无法获取聊天流信息", True
command_list = self.matched_groups["manage_command"].strip().split(" ")
if len(command_list) == 1:
await self.show_help("all")
return True, "帮助已发送", True
if len(command_list) == 2:
match command_list[1]:
case "plugin":
await self.show_help("plugin")
case "component":
await self.show_help("component")
case "help":
await self.show_help("all")
case _:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if len(command_list) == 3:
if command_list[1] == "plugin":
match command_list[2]:
case "help":
await self.show_help("plugin")
case "list":
await self._list_registered_plugins()
case "list_enabled":
await self._list_loaded_plugins()
case "rescan":
await self._rescan_plugin_dirs()
case _:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
elif command_list[1] == "component":
if command_list[2] == "list":
await self._list_all_registered_components()
elif command_list[2] == "help":
await self.show_help("component")
else:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
else:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if len(command_list) == 4:
if command_list[1] == "plugin":
match command_list[2]:
case "load":
await self._load_plugin(command_list[3])
case "unload":
await self._unload_plugin(command_list[3])
case "reload":
await self._reload_plugin(command_list[3])
case "add_dir":
await self._add_dir(command_list[3])
case _:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
elif command_list[1] == "component":
if command_list[2] != "list":
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if command_list[3] == "enabled":
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@require_permission("plugin.management.admin", "❌ 你没有插件管理的权限")
async def execute(self, args: CommandArgs) -> Tuple[bool, str, bool]:
"""执行插件管理命令"""
if args.is_empty():
await self._show_help("all")
return True, "显示帮助信息", True
subcommand = args.get_first().lower()
remaining_args = args.get_args()[1:] # 获取除第一个参数外的所有参数
if subcommand in ["plugin", "插件"]:
return await self._handle_plugin_commands(remaining_args)
elif subcommand in ["component", "组件", "comp"]:
return await self._handle_component_commands(remaining_args)
elif subcommand in ["help", "帮助"]:
await self._show_help("all")
return True, "显示帮助信息", True
else:
await self.send_text(f"❌ 未知的子命令: {subcommand}\n使用 /pm help 查看帮助")
return True, "未知子命令", True
async def _handle_plugin_commands(self, args: List[str]) -> Tuple[bool, str, bool]:
"""处理插件相关命令"""
if not args:
await self._show_help("plugin")
return True, "显示插件帮助", True
action = args[0].lower()
if action in ["help", "帮助"]:
await self._show_help("plugin")
elif action in ["list", "列表"]:
await self._list_registered_plugins()
elif action in ["list_enabled", "已启用"]:
await self._list_loaded_plugins()
elif action in ["rescan", "重扫"]:
await self._rescan_plugin_dirs()
elif action in ["load", "加载"] and len(args) > 1:
await self._load_plugin(args[1])
elif action in ["unload", "卸载"] and len(args) > 1:
await self._unload_plugin(args[1])
elif action in ["reload", "重载"] and len(args) > 1:
await self._reload_plugin(args[1])
elif action in ["force_reload", "强制重载"] and len(args) > 1:
await self._force_reload_plugin(args[1])
elif action in ["add_dir", "添加目录"] and len(args) > 1:
await self._add_dir(args[1])
elif action in ["hotreload_status", "热重载状态"]:
await self._show_hotreload_status()
elif action in ["clear_cache", "清理缓存"]:
await self._clear_all_caches()
else:
await self.send_text("插件管理命令不合法\n使用 /pm plugin help 查看帮助")
return False, "命令不合法", True
return True, "插件命令执行完成", True
async def _handle_component_commands(self, args: List[str]) -> Tuple[bool, str, bool]:
"""处理组件相关命令"""
if not args:
await self._show_help("component")
return True, "显示组件帮助", True
action = args[0].lower()
if action in ["help", "帮助"]:
await self._show_help("component")
elif action in ["list", "列表"]:
if len(args) == 1:
await self._list_all_registered_components()
elif len(args) == 2:
if args[1] in ["enabled", "启用"]:
await self._list_enabled_components()
elif command_list[3] == "disabled":
elif args[1] in ["disabled", "禁用"]:
await self._list_disabled_components()
else:
await self._send_message("插件管理命令不合法")
await self.send_text("❌ 组件列表命令不合法")
return False, "命令不合法", True
else:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if len(command_list) == 5:
if command_list[1] != "component":
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if command_list[2] != "list":
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if command_list[3] == "enabled":
await self._list_enabled_components(target_type=command_list[4])
elif command_list[3] == "disabled":
await self._list_disabled_components(target_type=command_list[4])
elif command_list[3] == "type":
await self._list_registered_components_by_type(command_list[4])
else:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if len(command_list) == 6:
if command_list[1] != "component":
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
if command_list[2] == "enable":
if command_list[3] == "global":
await self._globally_enable_component(command_list[4], command_list[5])
elif command_list[3] == "local":
await self._locally_enable_component(command_list[4], command_list[5])
elif len(args) == 3:
if args[1] in ["enabled", "启用"]:
await self._list_enabled_components(target_type=args[2])
elif args[1] in ["disabled", "禁用"]:
await self._list_disabled_components(target_type=args[2])
elif args[1] in ["type", "类型"]:
await self._list_registered_components_by_type(args[2])
else:
await self._send_message("插件管理命令不合法")
return False, "命令不合法", True
elif command_list[2] == "disable":
if command_list[3] == "global":
await self._globally_disable_component(command_list[4], command_list[5])
elif command_list[3] == "local":
await self._locally_disable_component(command_list[4], command_list[5])
else:
await self._send_message("插件管理命令不合法")
await self.send_text("❌ 组件列表命令不合法")
return False, "命令不合法", True
elif action in ["enable", "启用"] and len(args) >= 4:
scope = args[1].lower()
component_name = args[2]
component_type = args[3]
if scope in ["global", "全局"]:
await self._globally_enable_component(component_name, component_type)
elif scope in ["local", "本地"]:
await self._locally_enable_component(component_name, component_type)
else:
await self._send_message("插件管理命令不合法")
await self.send_text("❌ 组件启用命令不合法,范围应为 global 或 local")
return False, "命令不合法", True
elif action in ["disable", "禁用"] and len(args) >= 4:
scope = args[1].lower()
component_name = args[2]
component_type = args[3]
if scope in ["global", "全局"]:
await self._globally_disable_component(component_name, component_type)
elif scope in ["local", "本地"]:
await self._locally_disable_component(component_name, component_type)
else:
await self.send_text("❌ 组件禁用命令不合法,范围应为 global 或 local")
return False, "命令不合法", True
else:
await self.send_text("❌ 组件管理命令不合法\n使用 /pm component help 查看帮助")
return False, "命令不合法", True
return True, "组件命令执行完成", True
return True, "命令执行完成", True
async def show_help(self, target: str):
async def _show_help(self, target: str):
"""显示帮助信息"""
help_msg = ""
match target:
case "all":
help_msg = (
"管理命令帮助\n"
"/pm help 管理命令提示\n"
"/pm plugin 插件管理命令\n"
"/pm component 组件管理命令\n"
"使用 /pm plugin help 或 /pm component help 获取具体帮助"
)
case "plugin":
help_msg = (
"插件管理命令帮助\n"
"/pm plugin help 插件管理命令提示\n"
"/pm plugin list 列出所有注册的插件\n"
"/pm plugin list_enabled 列出所有加载(启用)的插件\n"
"/pm plugin rescan 重新扫描所有目录\n"
"/pm plugin load <plugin_name> 加载指定插件\n"
"/pm plugin unload <plugin_name> 卸载指定插件\n"
"/pm plugin reload <plugin_name> 重新加载指定插件\n"
"/pm plugin add_dir <directory_path> 添加插件目录\n"
)
case "component":
help_msg = (
"组件管理命令帮助\n"
"/pm component help 组件管理命令提示\n"
"/pm component list 列出所有注册的组件\n"
"/pm component list enabled <可选: type> 列出所有启用的组件\n"
"/pm component list disabled <可选: type> 列出所有禁用的组件\n"
" - <type> 可选项: local代表当前聊天中的global代表全局的\n"
" - <type> 不填时为 global\n"
"/pm component list type <component_type> 列出已经注册的指定类型的组件\n"
"/pm component enable global <component_name> <component_type> 全局启用组件\n"
"/pm component enable local <component_name> <component_type> 本聊天启用组件\n"
"/pm component disable global <component_name> <component_type> 全局禁用组件\n"
"/pm component disable local <component_name> <component_type> 本聊天禁用组件\n"
" - <component_type> 可选项: action, command, event_handler\n"
)
case _:
return
await self._send_message(help_msg)
if target == "all":
help_msg = """📋 插件管理命令帮助
🔧 主要功能:
• `/pm help` - 显示此帮助
• `/pm plugin` - 插件管理命令
• `/pm component` - 组件管理命令
📝 使用示例:
• `/pm plugin help` - 查看插件管理帮助
• `/pm component help` - 查看组件管理帮助
🔄 别名:可以使用 `/pluginmanage` 或 `/插件管理` 代替 `/pm`"""
elif target == "plugin":
help_msg = """🔌 插件管理命令帮助
📋 基本操作:
• `/pm plugin help` - 显示插件管理帮助
• `/pm plugin list` - 列出所有注册的插件
• `/pm plugin list_enabled` - 列出所有加载(启用)的插件
• `/pm plugin rescan` - 重新扫描所有插件目录
⚙️ 插件控制:
• `/pm plugin load <插件名>` - 加载指定插件
• `/pm plugin unload <插件名>` - 卸载指定插件
• `/pm plugin reload <插件名>` - 重新加载指定插件
• `/pm plugin force_reload <插件名>` - 强制重载指定插件(深度清理)
• `/pm plugin add_dir <目录路径>` - 添加插件目录
<EFBFBD> 热重载管理:
• `/pm plugin hotreload_status` - 查看热重载状态
• `/pm plugin clear_cache` - 清理所有模块缓存
<EFBFBD>📝 示例:
• `/pm plugin load echo_example`
• `/pm plugin force_reload permission_manager_plugin`
• `/pm plugin clear_cache`"""
elif target == "component":
help_msg = """🧩 组件管理命令帮助
📋 基本查看:
• `/pm component help` - 显示组件管理帮助
• `/pm component list` - 列出所有注册的组件
• `/pm component list enabled [类型]` - 列出启用的组件
• `/pm component list disabled [类型]` - 列出禁用的组件
• `/pm component list type <组件类型>` - 列出指定类型的组件
⚙️ 组件控制:
• `/pm component enable global <组件名> <类型>` - 全局启用组件
• `/pm component enable local <组件名> <类型>` - 本聊天启用组件
• `/pm component disable global <组件名> <类型>` - 全局禁用组件
• `/pm component disable local <组件名> <类型>` - 本聊天禁用组件
📝 组件类型:
• `action` - 动作组件
• `command` - 命令组件
• `event_handler` - 事件处理组件
• `plus_command` - 增强命令组件
💡 示例:
• `/pm component list type plus_command`
• `/pm component enable global echo_command command`"""
await self.send_text(help_msg)
async def _list_loaded_plugins(self):
"""列出已加载的插件"""
plugins = plugin_manage_api.list_loaded_plugins()
await self._send_message(f"已加载的插件: {', '.join(plugins)}")
await self.send_text(f"📦 已加载的插件: {', '.join(plugins) if plugins else ''}")
async def _list_registered_plugins(self):
"""列出已注册的插件"""
plugins = plugin_manage_api.list_registered_plugins()
await self._send_message(f"已注册的插件: {', '.join(plugins)}")
await self.send_text(f"📋 已注册的插件: {', '.join(plugins) if plugins else ''}")
async def _rescan_plugin_dirs(self):
"""重新扫描插件目录"""
plugin_manage_api.rescan_plugin_directory()
await self._send_message("插件目录重新扫描执行中")
await self.send_text("🔄 插件目录重新扫描已启动")
async def _load_plugin(self, plugin_name: str):
"""加载指定插件"""
success, count = plugin_manage_api.load_plugin(plugin_name)
if success:
await self._send_message(f"插件加载成功: {plugin_name}")
await self.send_text(f"插件加载成功: `{plugin_name}`")
else:
if count == 0:
await self._send_message(f"插件{plugin_name}为禁用状态")
await self._send_message(f"插件加载失败: {plugin_name}")
await self.send_text(f"⚠️ 插件 `{plugin_name}` 为禁用状态")
else:
await self.send_text(f"❌ 插件加载失败: `{plugin_name}`")
async def _unload_plugin(self, plugin_name: str):
"""卸载指定插件"""
success = await plugin_manage_api.remove_plugin(plugin_name)
if success:
await self._send_message(f"插件卸载成功: {plugin_name}")
await self.send_text(f"插件卸载成功: `{plugin_name}`")
else:
await self._send_message(f"插件卸载失败: {plugin_name}")
await self.send_text(f"插件卸载失败: `{plugin_name}`")
async def _reload_plugin(self, plugin_name: str):
"""重新加载指定插件"""
success = await plugin_manage_api.reload_plugin(plugin_name)
if success:
await self._send_message(f"插件重新加载成功: {plugin_name}")
await self.send_text(f"插件重新加载成功: `{plugin_name}`")
else:
await self._send_message(f"插件重新加载失败: {plugin_name}")
await self.send_text(f"插件重新加载失败: `{plugin_name}`")
async def _force_reload_plugin(self, plugin_name: str):
"""强制重载指定插件(深度清理)"""
await self.send_text(f"🔄 开始强制重载插件: `{plugin_name}`...")
try:
success = hot_reload_manager.force_reload_plugin(plugin_name)
if success:
await self.send_text(f"✅ 插件强制重载成功: `{plugin_name}`")
else:
await self.send_text(f"❌ 插件强制重载失败: `{plugin_name}`")
except Exception as e:
await self.send_text(f"❌ 强制重载过程中发生错误: {str(e)}")
async def _show_hotreload_status(self):
"""显示热重载状态"""
try:
status = hot_reload_manager.get_status()
status_text = f"""🔄 **热重载系统状态**
🟢 **运行状态:** {'运行中' if status['is_running'] else '已停止'}
📂 **监听目录:** {len(status['watch_directories'])}
👁️ **活跃观察者:** {status['active_observers']}
📦 **已加载插件:** {status['loaded_plugins']}
❌ **失败插件:** {status['failed_plugins']}
⏱️ **防抖延迟:** {status.get('debounce_delay', 0)}
📋 **监听的目录:**"""
for i, watch_dir in enumerate(status['watch_directories'], 1):
dir_type = "(内置插件)" if "src" in watch_dir else "(外部插件)"
status_text += f"\n{i}. `{watch_dir}` {dir_type}"
if status.get('pending_reloads'):
status_text += f"\n\n⏳ **待重载插件:** {', '.join([f'`{p}`' for p in status['pending_reloads']])}"
await self.send_text(status_text)
except Exception as e:
await self.send_text(f"❌ 获取热重载状态时发生错误: {str(e)}")
async def _clear_all_caches(self):
"""清理所有模块缓存"""
await self.send_text("🧹 开始清理所有Python模块缓存...")
try:
hot_reload_manager.clear_all_caches()
await self.send_text("✅ 模块缓存清理完成!建议重载相关插件以确保生效。")
except Exception as e:
await self.send_text(f"❌ 清理缓存时发生错误: {str(e)}")
async def _add_dir(self, dir_path: str):
await self._send_message(f"正在添加插件目录: {dir_path}")
"""添加插件目录"""
await self.send_text(f"📁 正在添加插件目录: `{dir_path}`")
success = plugin_manage_api.add_plugin_directory(dir_path)
await asyncio.sleep(0.5) # 防止乱序发送
if success:
await self._send_message(f"插件目录添加成功: {dir_path}")
await self.send_text(f"插件目录添加成功: `{dir_path}`")
else:
await self._send_message(f"插件目录添加失败: {dir_path}")
await self.send_text(f"插件目录添加失败: `{dir_path}`")
def _fetch_all_registered_components(self) -> List[ComponentInfo]:
all_plugin_info = component_manage_api.get_all_plugin_info()
@@ -245,47 +332,55 @@ class ManagementCommand(BaseCommand):
return components_info
def _fetch_locally_disabled_components(self) -> List[str]:
"""获取本地禁用的组件列表"""
stream_id = self.message.chat_stream.stream_id
locally_disabled_components_actions = component_manage_api.get_locally_disabled_components(
self.message.chat_stream.stream_id, ComponentType.ACTION
stream_id, ComponentType.ACTION
)
locally_disabled_components_commands = component_manage_api.get_locally_disabled_components(
self.message.chat_stream.stream_id, ComponentType.COMMAND
stream_id, ComponentType.COMMAND
)
locally_disabled_components_plus_commands = component_manage_api.get_locally_disabled_components(
stream_id, ComponentType.PLUS_COMMAND
)
locally_disabled_components_event_handlers = component_manage_api.get_locally_disabled_components(
self.message.chat_stream.stream_id, ComponentType.EVENT_HANDLER
stream_id, ComponentType.EVENT_HANDLER
)
return (
locally_disabled_components_actions
+ locally_disabled_components_commands
+ locally_disabled_components_plus_commands
+ locally_disabled_components_event_handlers
)
async def _list_all_registered_components(self):
"""列出所有已注册的组件"""
components_info = self._fetch_all_registered_components()
if not components_info:
await self._send_message("没有注册的组件")
await self.send_text("📋 没有注册的组件")
return
all_components_str = ", ".join(
f"{component.name} ({component.component_type})" for component in components_info
f"`{component.name}` ({component.component_type})" for component in components_info
)
await self._send_message(f"已注册的组件: {all_components_str}")
await self.send_text(f"📋 已注册的组件:\n{all_components_str}")
async def _list_enabled_components(self, target_type: str = "global"):
"""列出启用的组件"""
components_info = self._fetch_all_registered_components()
if not components_info:
await self._send_message("没有注册的组件")
await self.send_text("📋 没有注册的组件")
return
if target_type == "global":
enabled_components = [component for component in components_info if component.enabled]
if not enabled_components:
await self._send_message("没有满足条件的已启用全局组件")
await self.send_text("📋 没有满足条件的已启用全局组件")
return
enabled_components_str = ", ".join(
f"{component.name} ({component.component_type})" for component in enabled_components
f"`{component.name}` ({component.component_type})" for component in enabled_components
)
await self._send_message(f"满足条件的已启用全局组件: {enabled_components_str}")
await self.send_text(f"满足条件的已启用全局组件:\n{enabled_components_str}")
elif target_type == "local":
locally_disabled_components = self._fetch_locally_disabled_components()
enabled_components = [
@@ -294,28 +389,29 @@ class ManagementCommand(BaseCommand):
if (component.name not in locally_disabled_components and component.enabled)
]
if not enabled_components:
await self._send_message("本聊天没有满足条件的已启用组件")
await self.send_text("📋 本聊天没有满足条件的已启用组件")
return
enabled_components_str = ", ".join(
f"{component.name} ({component.component_type})" for component in enabled_components
f"`{component.name}` ({component.component_type})" for component in enabled_components
)
await self._send_message(f"本聊天满足条件的已启用组件: {enabled_components_str}")
await self.send_text(f"本聊天满足条件的已启用组件:\n{enabled_components_str}")
async def _list_disabled_components(self, target_type: str = "global"):
"""列出禁用的组件"""
components_info = self._fetch_all_registered_components()
if not components_info:
await self._send_message("没有注册的组件")
await self.send_text("📋 没有注册的组件")
return
if target_type == "global":
disabled_components = [component for component in components_info if not component.enabled]
if not disabled_components:
await self._send_message("没有满足条件的已禁用全局组件")
await self.send_text("📋 没有满足条件的已禁用全局组件")
return
disabled_components_str = ", ".join(
f"{component.name} ({component.component_type})" for component in disabled_components
f"`{component.name}` ({component.component_type})" for component in disabled_components
)
await self._send_message(f"满足条件的已禁用全局组件: {disabled_components_str}")
await self.send_text(f"满足条件的已禁用全局组件:\n{disabled_components_str}")
elif target_type == "local":
locally_disabled_components = self._fetch_locally_disabled_components()
disabled_components = [
@@ -324,110 +420,115 @@ class ManagementCommand(BaseCommand):
if (component.name in locally_disabled_components or not component.enabled)
]
if not disabled_components:
await self._send_message("本聊天没有满足条件的已禁用组件")
await self.send_text("📋 本聊天没有满足条件的已禁用组件")
return
disabled_components_str = ", ".join(
f"{component.name} ({component.component_type})" for component in disabled_components
f"`{component.name}` ({component.component_type})" for component in disabled_components
)
await self._send_message(f"本聊天满足条件的已禁用组件: {disabled_components_str}")
await self.send_text(f"本聊天满足条件的已禁用组件:\n{disabled_components_str}")
async def _list_registered_components_by_type(self, target_type: str):
match target_type:
case "action":
component_type = ComponentType.ACTION
case "command":
component_type = ComponentType.COMMAND
case "event_handler":
component_type = ComponentType.EVENT_HANDLER
case _:
await self._send_message(f"未知组件类型: {target_type}")
return
"""按类型列出已注册的组件"""
type_mapping = {
"action": ComponentType.ACTION,
"command": ComponentType.COMMAND,
"event_handler": ComponentType.EVENT_HANDLER,
"plus_command": ComponentType.PLUS_COMMAND,
}
component_type = type_mapping.get(target_type.lower())
if not component_type:
await self.send_text(f"❌ 未知组件类型: `{target_type}`\n支持的类型: action, command, event_handler, plus_command")
return
components_info = component_manage_api.get_components_info_by_type(component_type)
if not components_info:
await self._send_message(f"没有注册的 {target_type} 组件")
await self.send_text(f"📋 没有注册的 `{target_type}` 组件")
return
components_str = ", ".join(
f"{name} ({component.component_type})" for name, component in components_info.items()
f"`{name}` ({component.component_type})" for name, component in components_info.items()
)
await self._send_message(f"注册的 {target_type} 组件: {components_str}")
await self.send_text(f"📋 注册的 `{target_type}` 组件:\n{components_str}")
async def _globally_enable_component(self, component_name: str, component_type: str):
match component_type:
case "action":
target_component_type = ComponentType.ACTION
case "command":
target_component_type = ComponentType.COMMAND
case "event_handler":
target_component_type = ComponentType.EVENT_HANDLER
case _:
await self._send_message(f"未知组件类型: {component_type}")
return
"""全局启用组件"""
type_mapping = {
"action": ComponentType.ACTION,
"command": ComponentType.COMMAND,
"event_handler": ComponentType.EVENT_HANDLER,
"plus_command": ComponentType.PLUS_COMMAND,
}
target_component_type = type_mapping.get(component_type.lower())
if not target_component_type:
await self.send_text(f"❌ 未知组件类型: `{component_type}`")
return
if component_manage_api.globally_enable_component(component_name, target_component_type):
await self._send_message(f"全局启用组件成功: {component_name}")
await self.send_text(f"全局启用组件成功: `{component_name}`")
else:
await self._send_message(f"全局启用组件失败: {component_name}")
await self.send_text(f"全局启用组件失败: `{component_name}`")
async def _globally_disable_component(self, component_name: str, component_type: str):
match component_type:
case "action":
target_component_type = ComponentType.ACTION
case "command":
target_component_type = ComponentType.COMMAND
case "event_handler":
target_component_type = ComponentType.EVENT_HANDLER
case _:
await self._send_message(f"未知组件类型: {component_type}")
return
"""全局禁用组件"""
type_mapping = {
"action": ComponentType.ACTION,
"command": ComponentType.COMMAND,
"event_handler": ComponentType.EVENT_HANDLER,
"plus_command": ComponentType.PLUS_COMMAND,
}
target_component_type = type_mapping.get(component_type.lower())
if not target_component_type:
await self.send_text(f"❌ 未知组件类型: `{component_type}`")
return
success = await component_manage_api.globally_disable_component(component_name, target_component_type)
if success:
await self._send_message(f"全局禁用组件成功: {component_name}")
await self.send_text(f"全局禁用组件成功: `{component_name}`")
else:
await self._send_message(f"全局禁用组件失败: {component_name}")
await self.send_text(f"全局禁用组件失败: `{component_name}`")
async def _locally_enable_component(self, component_name: str, component_type: str):
match component_type:
case "action":
target_component_type = ComponentType.ACTION
case "command":
target_component_type = ComponentType.COMMAND
case "event_handler":
target_component_type = ComponentType.EVENT_HANDLER
case _:
await self._send_message(f"未知组件类型: {component_type}")
return
if component_manage_api.locally_enable_component(
component_name,
target_component_type,
self.message.chat_stream.stream_id,
):
await self._send_message(f"本地启用组件成功: {component_name}")
"""本地启用组件"""
type_mapping = {
"action": ComponentType.ACTION,
"command": ComponentType.COMMAND,
"event_handler": ComponentType.EVENT_HANDLER,
"plus_command": ComponentType.PLUS_COMMAND,
}
target_component_type = type_mapping.get(component_type.lower())
if not target_component_type:
await self.send_text(f"❌ 未知组件类型: `{component_type}`")
return
stream_id = self.message.chat_stream.stream_id
if component_manage_api.locally_enable_component(component_name, target_component_type, stream_id):
await self.send_text(f"本地启用组件成功: `{component_name}`")
else:
await self._send_message(f"本地启用组件失败: {component_name}")
await self.send_text(f"本地启用组件失败: `{component_name}`")
async def _locally_disable_component(self, component_name: str, component_type: str):
match component_type:
case "action":
target_component_type = ComponentType.ACTION
case "command":
target_component_type = ComponentType.COMMAND
case "event_handler":
target_component_type = ComponentType.EVENT_HANDLER
case _:
await self._send_message(f"未知组件类型: {component_type}")
return
if component_manage_api.locally_disable_component(
component_name,
target_component_type,
self.message.chat_stream.stream_id,
):
await self._send_message(f"本地禁用组件成功: {component_name}")
"""本地禁用组件"""
type_mapping = {
"action": ComponentType.ACTION,
"command": ComponentType.COMMAND,
"event_handler": ComponentType.EVENT_HANDLER,
"plus_command": ComponentType.PLUS_COMMAND,
}
target_component_type = type_mapping.get(component_type.lower())
if not target_component_type:
await self.send_text(f"❌ 未知组件类型: `{component_type}`")
return
stream_id = self.message.chat_stream.stream_id
if component_manage_api.locally_disable_component(component_name, target_component_type, stream_id):
await self.send_text(f"本地禁用组件成功: `{component_name}`")
else:
await self._send_message(f"本地禁用组件失败: {component_name}")
async def _send_message(self, message: str):
await send_api.text_to_stream(message, self.stream_id, typing=False, storage_message=False)
await self.send_text(f"本地禁用组件失败: `{component_name}`")
@register_plugin
@@ -441,14 +542,22 @@ class PluginManagementPlugin(BasePlugin):
"plugin": {
"enabled": ConfigField(bool, default=False, description="是否启用插件"),
"config_version": ConfigField(type=str, default="1.1.0", description="配置文件版本"),
"permission": ConfigField(
list, default=[], description="有权限使用插件管理命令的用户列表请填写字符串形式的用户ID"
),
},
}
def get_plugin_components(self) -> List[Tuple[CommandInfo, Type[BaseCommand]]]:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 注册权限节点
permission_api.register_permission_node(
"plugin.management.admin",
"插件管理:可以管理插件和组件的加载、卸载、启用、禁用等操作",
"plugin_management",
False
)
def get_plugin_components(self) -> List[Tuple[PlusCommandInfo, Type[PlusCommand]]]:
"""返回插件的PlusCommand组件"""
components = []
if self.get_config("plugin.enabled", True):
components.append((ManagementCommand.get_command_info(), ManagementCommand))
components.append((ManagementCommand.get_plus_command_info(), ManagementCommand))
return components