Refactor permission checks and decorators usage

Refactored permission checks in built-in plugins to use the new @require_permission decorator, improving code clarity and consistency. Enhanced permission_decorators to better extract ChatStream and support PlusCommand. Updated PermissionCommand to use permission decorators for all subcommands, improved user mention parsing, and improved feedback messages. Registered a new permission node for sending feeds in MaiZone plugin and updated command registration to use PlusCommand info.
This commit is contained in:
雅诺狐
2025-08-28 18:35:59 +08:00
parent f42b4adaab
commit 8d77d1cc3f
5 changed files with 140 additions and 120 deletions

View File

@@ -50,7 +50,8 @@ class CommandArgs:
self._parsed_args = self._raw_args.split() self._parsed_args = self._raw_args.split()
return self._parsed_args return self._parsed_args
@property
def is_empty(self) -> bool: def is_empty(self) -> bool:
"""检查参数是否为空 """检查参数是否为空
@@ -73,7 +74,8 @@ class CommandArgs:
if 0 <= index < len(args): if 0 <= index < len(args):
return args[index] return args[index]
return default return default
@property
def get_first(self, default: str = "") -> str: def get_first(self, default: str = "") -> str:
"""获取第一个参数 """获取第一个参数

View File

@@ -9,9 +9,9 @@ from typing import Callable, Optional
from inspect import iscoroutinefunction from inspect import iscoroutinefunction
from src.plugin_system.apis.permission_api import permission_api from src.plugin_system.apis.permission_api import permission_api
from src.plugin_system.apis.send_api import send_message from src.plugin_system.apis.send_api import text_to_stream
from src.plugin_system.apis.logging_api import get_logger from src.plugin_system.apis.logging_api import get_logger
from src.common.message import ChatStream from src.chat.message_receive.chat_stream import ChatStream
logger = get_logger(__name__) logger = get_logger(__name__)
@@ -37,6 +37,8 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
async def async_wrapper(*args, **kwargs): async def async_wrapper(*args, **kwargs):
# 尝试从参数中提取 ChatStream 对象 # 尝试从参数中提取 ChatStream 对象
chat_stream = None chat_stream = None
# 首先检查位置参数中的 ChatStream
for arg in args: for arg in args:
if isinstance(arg, ChatStream): if isinstance(arg, ChatStream):
chat_stream = arg chat_stream = arg
@@ -46,21 +48,31 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
if chat_stream is None: if chat_stream is None:
chat_stream = kwargs.get('chat_stream') chat_stream = kwargs.get('chat_stream')
# 如果还没找到,检查是否是 PlusCommand 方法调用
if chat_stream is None and args:
# 检查第一个参数是否有 message.chat_stream 属性PlusCommand 实例)
instance = args[0]
if hasattr(instance, 'message') and hasattr(instance.message, 'chat_stream'):
chat_stream = instance.message.chat_stream
if chat_stream is None: if chat_stream is None:
logger.error(f"权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}") logger.error(f"权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
return return
# 检查权限 # 检查权限
has_permission = permission_api.check_permission( has_permission = permission_api.check_permission(
chat_stream.user_platform, chat_stream.platform,
chat_stream.user_id, chat_stream.user_info.user_id,
permission_node permission_node
) )
if not has_permission: if not has_permission:
# 权限不足,发送拒绝消息 # 权限不足,发送拒绝消息
message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}" message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}"
await send_message(chat_stream, message) await text_to_stream(message, chat_stream.stream_id)
# 对于PlusCommand的execute方法需要返回适当的元组
if func.__name__ == 'execute' and hasattr(args[0], 'send_text'):
return False, "权限不足", True
return return
# 权限检查通过,执行原函数 # 权限检查通过,执行原函数
@@ -83,13 +95,13 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
# 检查权限 # 检查权限
has_permission = permission_api.check_permission( has_permission = permission_api.check_permission(
chat_stream.user_platform, chat_stream.platform,
chat_stream.user_id, chat_stream.user_info.user_id,
permission_node permission_node
) )
if not has_permission: if not has_permission:
logger.warning(f"用户 {chat_stream.user_platform}:{chat_stream.user_id} 没有权限 {permission_node}") logger.warning(f"用户 {chat_stream.platform}:{chat_stream.user_info.user_id} 没有权限 {permission_node}")
return return
# 权限检查通过,执行原函数 # 权限检查通过,执行原函数
@@ -124,6 +136,8 @@ def require_master(deny_message: Optional[str] = None):
async def async_wrapper(*args, **kwargs): async def async_wrapper(*args, **kwargs):
# 尝试从参数中提取 ChatStream 对象 # 尝试从参数中提取 ChatStream 对象
chat_stream = None chat_stream = None
# 首先检查位置参数中的 ChatStream
for arg in args: for arg in args:
if isinstance(arg, ChatStream): if isinstance(arg, ChatStream):
chat_stream = arg chat_stream = arg
@@ -133,20 +147,28 @@ def require_master(deny_message: Optional[str] = None):
if chat_stream is None: if chat_stream is None:
chat_stream = kwargs.get('chat_stream') chat_stream = kwargs.get('chat_stream')
# 如果还没找到,检查是否是 PlusCommand 方法调用
if chat_stream is None and args:
# 检查第一个参数是否有 message.chat_stream 属性PlusCommand 实例)
instance = args[0]
if hasattr(instance, 'message') and hasattr(instance.message, 'chat_stream'):
chat_stream = instance.message.chat_stream
if chat_stream is None: if chat_stream is None:
logger.error(f"Master权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}") logger.error(f"Master权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
return return
# 检查是否为Master用户 # 检查是否为Master用户
is_master = permission_api.is_master( is_master = permission_api.is_master(
chat_stream.user_platform, chat_stream.platform,
chat_stream.user_id chat_stream.user_info.user_id
) )
if not is_master: if not is_master:
# 权限不足,发送拒绝消息
message = deny_message or "❌ 此操作仅限Master用户执行" message = deny_message or "❌ 此操作仅限Master用户执行"
await send_message(chat_stream, message) await text_to_stream(message, chat_stream.stream_id)
if func.__name__ == 'execute' and hasattr(args[0], 'send_text'):
return False, "需要Master权限", True
return return
# 权限检查通过,执行原函数 # 权限检查通过,执行原函数
@@ -169,12 +191,12 @@ def require_master(deny_message: Optional[str] = None):
# 检查是否为Master用户 # 检查是否为Master用户
is_master = permission_api.is_master( is_master = permission_api.is_master(
chat_stream.user_platform, chat_stream.platform,
chat_stream.user_id chat_stream.user_info.user_id
) )
if not is_master: if not is_master:
logger.warning(f"用户 {chat_stream.user_platform}:{chat_stream.user_id} 不是Master用户") logger.warning(f"用户 {chat_stream.platform}:{chat_stream.user_info.user_id} 不是Master用户")
return return
# 权限检查通过,执行原函数 # 权限检查通过,执行原函数
@@ -209,8 +231,8 @@ class PermissionChecker:
bool: 是否拥有权限 bool: 是否拥有权限
""" """
return permission_api.check_permission( return permission_api.check_permission(
chat_stream.user_platform, chat_stream.platform,
chat_stream.user_id, chat_stream.user_info.user_id,
permission_node permission_node
) )
@@ -226,8 +248,8 @@ class PermissionChecker:
bool: 是否为Master用户 bool: 是否为Master用户
""" """
return permission_api.is_master( return permission_api.is_master(
chat_stream.user_platform, chat_stream.platform,
chat_stream.user_id chat_stream.user_info.user_id
) )
@staticmethod @staticmethod
@@ -248,7 +270,7 @@ class PermissionChecker:
if not has_permission: if not has_permission:
message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}" message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}"
await send_message(chat_stream, message) await text_to_stream(message, chat_stream.stream_id)
return has_permission return has_permission
@@ -269,6 +291,6 @@ class PermissionChecker:
if not is_master: if not is_master:
message = deny_message or "❌ 此操作仅限Master用户执行" message = deny_message or "❌ 此操作仅限Master用户执行"
await send_message(chat_stream, message) await text_to_stream(message, chat_stream.stream_id)
return is_master return is_master

View File

@@ -5,52 +5,32 @@
from typing import Tuple from typing import Tuple
from src.common.logger import get_logger from src.common.logger import get_logger
from src.plugin_system import BaseCommand from src.plugin_system.base.plus_command import PlusCommand
from src.plugin_system.base.command_args import CommandArgs
from src.plugin_system.utils.permission_decorators import require_permission
from ..services.manager import get_qzone_service, get_config_getter from ..services.manager import get_qzone_service, get_config_getter
logger = get_logger("MaiZone.SendFeedCommand") logger = get_logger("MaiZone.SendFeedCommand")
class SendFeedCommand(BaseCommand): class SendFeedCommand(PlusCommand):
""" """
响应用户通过 `/send_feed` 命令发送说说的请求。 响应用户通过 `/send_feed` 命令发送说说的请求。
""" """
command_name: str = "send_feed" command_name: str = "send_feed"
command_description: str = "发送一条QQ空间说说" command_description: str = "发送一条QQ空间说说"
command_pattern: str = r"^/send_feed(?:\s+(?P<topic>.*))?$" command_aliases = ["发空间"]
command_help: str = "使用 /send_feed [主题] 来发送一条说说"
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def _check_permission(self) -> bool: @require_permission("plugin.send.permission")
"""检查当前用户是否有权限执行此命令""" async def execute(self, args: CommandArgs) -> Tuple[bool, str, bool]:
user_id = self.message.message_info.user_info.user_id
if not user_id:
return False
get_config = get_config_getter()
permission_list = get_config("send.permission", [])
permission_type = get_config("send.permission_type", "whitelist")
if not isinstance(permission_list, list):
return False
if permission_type == 'whitelist':
return user_id in permission_list
elif permission_type == 'blacklist':
return user_id not in permission_list
return False
async def execute(self) -> Tuple[bool, str, bool]:
""" """
执行命令的核心逻辑。 执行命令的核心逻辑。
""" """
if not self._check_permission():
await self.send_text("抱歉,你没有权限使用这个命令哦。")
return False, "权限不足", True
topic = self.matched_groups.get("topic", "") topic = args.get_remaining()
stream_id = self.message.chat_stream.stream_id stream_id = self.message.chat_stream.stream_id
await self.send_text(f"收到!正在为你生成关于“{topic or '随机'}”的说说,请稍候...") await self.send_text(f"收到!正在为你生成关于“{topic or '随机'}”的说说,请稍候...")

View File

@@ -13,6 +13,7 @@ from src.plugin_system import (
register_plugin register_plugin
) )
from src.plugin_system.base.config_types import ConfigField from src.plugin_system.base.config_types import ConfigField
from src.plugin_system.apis.permission_api import permission_api
from .actions.read_feed_action import ReadFeedAction from .actions.read_feed_action import ReadFeedAction
from .actions.send_feed_action import SendFeedAction from .actions.send_feed_action import SendFeedAction
@@ -82,7 +83,12 @@ class MaiZoneRefactoredPlugin(BasePlugin):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
permission_api.register_permission_node(
"plugin.send.permission",
"是否可以使用机器人发送说说",
"maiZone",
False
)
content_service = ContentService(self.get_config) content_service = ContentService(self.get_config)
image_service = ImageService(self.get_config) image_service = ImageService(self.get_config)
cookie_service = CookieService(self.get_config) cookie_service = CookieService(self.get_config)
@@ -102,5 +108,5 @@ class MaiZoneRefactoredPlugin(BasePlugin):
return [ return [
(SendFeedAction.get_action_info(), SendFeedAction), (SendFeedAction.get_action_info(), SendFeedAction),
(ReadFeedAction.get_action_info(), ReadFeedAction), (ReadFeedAction.get_action_info(), ReadFeedAction),
(SendFeedCommand.get_command_info(), SendFeedCommand), (SendFeedCommand.get_plus_command_info(), SendFeedCommand),
] ]

View File

@@ -16,6 +16,7 @@ from src.plugin_system.apis.permission_api import permission_api
from src.plugin_system.apis.logging_api import get_logger from src.plugin_system.apis.logging_api import get_logger
from src.plugin_system.base.component_types import PlusCommandInfo, ChatType from src.plugin_system.base.component_types import PlusCommandInfo, ChatType
from src.plugin_system.base.config_types import ConfigField from src.plugin_system.base.config_types import ConfigField
from src.plugin_system.utils.permission_decorators import require_permission, require_master, PermissionChecker
logger = get_logger("Permission") logger = get_logger("Permission")
@@ -46,70 +47,38 @@ class PermissionCommand(PlusCommand):
"permission_manager", "permission_manager",
True True
) )
async def execute(self, args: CommandArgs) -> Tuple[bool, Optional[str], bool]: async def execute(self, args: CommandArgs) -> Tuple[bool, Optional[str], bool]:
"""执行权限管理命令""" """执行权限管理命令"""
if args.is_empty(): if args.is_empty:
await self._show_help() await self._show_help()
return True, "显示帮助信息", True return True, "显示帮助信息", True
subcommand = args.get_first().lower() subcommand = args.get_first.lower()
remaining_args = args.get_args()[1:] # 获取除第一个参数外的所有参数 remaining_args = args.get_args()[1:] # 获取除第一个参数外的所有参数
chat_stream = self.message.chat_stream chat_stream = self.message.chat_stream
# 检查基本查看权限
can_view = permission_api.check_permission(
chat_stream.platform,
chat_stream.user_info.user_id,
"plugin.permission.view"
) or permission_api.is_master(chat_stream.platform, chat_stream.user_info.user_id)
# 检查管理权限
can_manage = permission_api.check_permission(
chat_stream.platform,
chat_stream.user_info.user_id,
"plugin.permission.manage"
) or permission_api.is_master(chat_stream.platform, chat_stream.user_info.user_id)
if subcommand in ["grant", "授权", "give"]: if subcommand in ["grant", "授权", "give"]:
if not can_manage:
await self.send_text("❌ 你没有权限管理的权限")
return True, "权限不足", True
await self._grant_permission(chat_stream, remaining_args) await self._grant_permission(chat_stream, remaining_args)
return True, "执行授权命令", True return True, "执行授权命令", True
elif subcommand in ["revoke", "撤销", "remove"]: elif subcommand in ["revoke", "撤销", "remove"]:
if not can_manage:
await self.send_text("❌ 你没有权限管理的权限")
return True, "权限不足", True
await self._revoke_permission(chat_stream, remaining_args) await self._revoke_permission(chat_stream, remaining_args)
return True, "执行撤销命令", True return True, "执行撤销命令", True
elif subcommand in ["list", "列表", "ls"]: elif subcommand in ["list", "列表", "ls"]:
if not can_view:
await self.send_text("❌ 你没有查看权限的权限")
return True, "权限不足", True
await self._list_permissions(chat_stream, remaining_args) await self._list_permissions(chat_stream, remaining_args)
return True, "执行列表命令", True return True, "执行列表命令", True
elif subcommand in ["check", "检查"]: elif subcommand in ["check", "检查"]:
if not can_view:
await self.send_text("❌ 你没有查看权限的权限")
return True, "权限不足", True
await self._check_permission(chat_stream, remaining_args) await self._check_permission(chat_stream, remaining_args)
return True, "执行检查命令", True return True, "执行检查命令", True
elif subcommand in ["nodes", "节点"]: elif subcommand in ["nodes", "节点"]:
if not can_view:
await self.send_text("❌ 你没有查看权限的权限")
return True, "权限不足", True
await self._list_nodes(chat_stream, remaining_args) await self._list_nodes(chat_stream, remaining_args)
return True, "执行节点命令", True return True, "执行节点命令", True
elif subcommand in ["allnodes", "全部节点", "all"]: elif subcommand in ["allnodes", "全部节点", "all"]:
if not can_view:
await self.send_text("❌ 你没有查看权限的权限")
return True, "权限不足", True
await self._list_all_nodes_with_description(chat_stream) await self._list_all_nodes_with_description(chat_stream)
return True, "执行全部节点命令", True return True, "执行全部节点命令", True
@@ -149,11 +118,18 @@ class PermissionCommand(PlusCommand):
await self.send_text(help_text) await self.send_text(help_text)
def _parse_user_mention(self, mention: str) -> Optional[str]: def _parse_user_mention(self, mention: str) -> Optional[str]:
"""解析用户提及提取QQ号""" """解析用户提及提取QQ号
支持的格式:
- @<用户名:QQ号> 格式
- [CQ:at,qq=QQ号] 格式
- 直接的QQ号
"""
# 匹配 @<用户名:QQ号> 格式提取QQ号 # 匹配 @<用户名:QQ号> 格式提取QQ号
at_match = re.search(r'@<[^:]+:(\d+)>', mention) at_match = re.search(r'@<[^:]+:(\d+)>', mention)
if at_match: if at_match:
return at_match.group(1) return at_match.group(1)
# 直接是数字 # 直接是数字
if mention.isdigit(): if mention.isdigit():
@@ -161,62 +137,94 @@ class PermissionCommand(PlusCommand):
return None return None
@staticmethod
def parse_user_from_args(args: CommandArgs, index: int = 0) -> Optional[str]:
"""从CommandArgs中解析用户ID
Args:
args: 命令参数对象
index: 参数索引默认为0第一个参数
Returns:
Optional[str]: 解析出的用户ID如果解析失败返回None
"""
if index >= args.count():
return None
mention = args.get_arg(index)
# 匹配 @<用户名:QQ号> 格式提取QQ号
at_match = re.search(r'@<[^:]+:(\d+)>', mention)
if at_match:
return at_match.group(1)
# 匹配传统的 [CQ:at,qq=数字] 格式
cq_match = re.search(r'\[CQ:at,qq=(\d+)\]', mention)
if cq_match:
return cq_match.group(1)
# 直接是数字
if mention.isdigit():
return mention
return None
@require_permission("plugin.permission.manage", "❌ 你没有权限管理的权限")
async def _grant_permission(self, chat_stream, args: List[str]): async def _grant_permission(self, chat_stream, args: List[str]):
"""授权用户权限""" """授权用户权限"""
if len(args) < 2: if len(args) < 2:
await self.send_text("❌ 用法: /permission grant <@用户|QQ号> <权限节点>") await self.send_text("❌ 用法: /permission grant <@用户|QQ号> <权限节点>")
return return
user_mention = args[0] # 解析用户ID - 使用新的解析方法
permission_node = args[1] user_id = self._parse_user_mention(args[0])
# 解析用户ID
user_id = self._parse_user_mention(user_mention)
if not user_id: if not user_id:
await self.send_text("❌ 无效的用户格式,请使用 @用户 或直接输入QQ号") await self.send_text("❌ 无效的用户格式,请使用 @<用户名:QQ号> 或直接输入QQ号")
return return
permission_node = args[1]
# 执行授权 # 执行授权
success = permission_api.grant_permission(chat_stream.platform, user_id, permission_node) success = permission_api.grant_permission(chat_stream.platform, user_id, permission_node)
if success: if success:
await self.send_text(f"✅ 已授权用户 {user_id} 权限节点 {permission_node}") await self.send_text(f"✅ 已授权用户 {user_id} 权限节点 `{permission_node}`")
else: else:
await self.send_text("❌ 授权失败,请检查权限节点是否存在") await self.send_text("❌ 授权失败,请检查权限节点是否存在")
@require_permission("plugin.permission.manage", "❌ 你没有权限管理的权限")
async def _revoke_permission(self, chat_stream, args: List[str]): async def _revoke_permission(self, chat_stream, args: List[str]):
"""撤销用户权限""" """撤销用户权限"""
if len(args) < 2: if len(args) < 2:
await self.send_text("❌ 用法: /permission revoke <@用户|QQ号> <权限节点>") await self.send_text("❌ 用法: /permission revoke <@用户|QQ号> <权限节点>")
return return
user_mention = args[0] # 解析用户ID - 使用新的解析方法
permission_node = args[1] user_id = self._parse_user_mention(args[0])
# 解析用户ID
user_id = self._parse_user_mention(user_mention)
if not user_id: if not user_id:
await self.send_text("❌ 无效的用户格式,请使用 @用户 或直接输入QQ号") await self.send_text("❌ 无效的用户格式,请使用 @<用户名:QQ号> 或直接输入QQ号")
return return
permission_node = args[1]
# 执行撤销 # 执行撤销
success = permission_api.revoke_permission(chat_stream.platform, user_id, permission_node) success = permission_api.revoke_permission(chat_stream.platform, user_id, permission_node)
if success: if success:
await self.send_text(f"✅ 已撤销用户 {user_id} 权限节点 {permission_node}") await self.send_text(f"✅ 已撤销用户 {user_id} 权限节点 `{permission_node}`")
else: else:
await self.send_text("❌ 撤销失败,请检查权限节点是否存在") await self.send_text("❌ 撤销失败,请检查权限节点是否存在")
@require_permission("plugin.permission.view", "❌ 你没有查看权限的权限")
async def _list_permissions(self, chat_stream, args: List[str]): async def _list_permissions(self, chat_stream, args: List[str]):
"""列出用户权限""" """列出用户权限"""
target_user_id = None target_user_id = None
if args: if args:
# 指定了用户 # 指定了用户 - 使用新的解析方法
user_mention = args[0] target_user_id = self._parse_user_mention(args[0])
target_user_id = self._parse_user_mention(user_mention)
if not target_user_id: if not target_user_id:
await self.send_text("❌ 无效的用户格式,请使用 @用户 或直接输入QQ号") await self.send_text("❌ 无效的用户格式,请使用 @<用户名:QQ号> 或直接输入QQ号")
return return
else: else:
# 查看自己的权限 # 查看自己的权限
@@ -229,45 +237,46 @@ class PermissionCommand(PlusCommand):
permissions = permission_api.get_user_permissions(chat_stream.platform, target_user_id) permissions = permission_api.get_user_permissions(chat_stream.platform, target_user_id)
if is_master: if is_master:
response = f"👑 用户 {target_user_id} 是Master用户拥有所有权限" response = f"👑 用户 `{target_user_id}` 是Master用户拥有所有权限"
else: else:
if permissions: if permissions:
perm_list = "\n".join([f"{perm}" for perm in permissions]) perm_list = "\n".join([f"`{perm}`" for perm in permissions])
response = f"📋 用户 {target_user_id} 拥有的权限:\n{perm_list}" response = f"📋 用户 `{target_user_id}` 拥有的权限:\n{perm_list}"
else: else:
response = f"📋 用户 {target_user_id} 没有任何权限" response = f"📋 用户 `{target_user_id}` 没有任何权限"
await self.send_text(response) await self.send_text(response)
@require_permission("plugin.permission.view", "❌ 你没有查看权限的权限")
async def _check_permission(self, chat_stream, args: List[str]): async def _check_permission(self, chat_stream, args: List[str]):
"""检查用户权限""" """检查用户权限"""
if len(args) < 2: if len(args) < 2:
await self.send_text("❌ 用法: /permission check <@用户|QQ号> <权限节点>") await self.send_text("❌ 用法: /permission check <@用户|QQ号> <权限节点>")
return return
user_mention = args[0] # 解析用户ID - 使用新的解析方法
permission_node = args[1] user_id = self._parse_user_mention(args[0])
# 解析用户ID
user_id = self._parse_user_mention(user_mention)
if not user_id: if not user_id:
await self.send_text("❌ 无效的用户格式,请使用 @用户 或直接输入QQ号") await self.send_text("❌ 无效的用户格式,请使用 @<用户名:QQ号> 或直接输入QQ号")
return return
permission_node = args[1]
# 检查权限 # 检查权限
has_permission = permission_api.check_permission(chat_stream.platform, user_id, permission_node) has_permission = permission_api.check_permission(chat_stream.platform, user_id, permission_node)
is_master = permission_api.is_master(chat_stream.platform, user_id) is_master = permission_api.is_master(chat_stream.platform, user_id)
if has_permission: if has_permission:
if is_master: if is_master:
response = f"✅ 用户 {user_id} 拥有权限 {permission_node}Master用户" response = f"✅ 用户 `{user_id}` 拥有权限 `{permission_node}`Master用户"
else: else:
response = f"✅ 用户 {user_id} 拥有权限 {permission_node}" response = f"✅ 用户 `{user_id}` 拥有权限 `{permission_node}`"
else: else:
response = f"❌ 用户 {user_id} 没有权限 {permission_node}" response = f"❌ 用户 `{user_id}` 没有权限 `{permission_node}`"
await self.send_text(response) await self.send_text(response)
@require_permission("plugin.permission.view", "❌ 你没有查看权限的权限")
async def _list_nodes(self, chat_stream, args: List[str]): async def _list_nodes(self, chat_stream, args: List[str]):
"""列出权限节点""" """列出权限节点"""
plugin_name = args[0] if args else None plugin_name = args[0] if args else None
@@ -300,6 +309,7 @@ class PermissionCommand(PlusCommand):
await self.send_text(response) await self.send_text(response)
@require_permission("plugin.permission.view", "❌ 你没有查看权限的权限")
async def _list_all_nodes_with_description(self, chat_stream): async def _list_all_nodes_with_description(self, chat_stream):
"""列出所有插件的权限节点(带详细描述)""" """列出所有插件的权限节点(带详细描述)"""
# 获取所有权限节点 # 获取所有权限节点