diff --git a/src/plugin_system/base/command_args.py b/src/plugin_system/base/command_args.py index 46a2b701b..5d8c1ad2c 100644 --- a/src/plugin_system/base/command_args.py +++ b/src/plugin_system/base/command_args.py @@ -50,7 +50,8 @@ class CommandArgs: self._parsed_args = self._raw_args.split() return self._parsed_args - + + @property def is_empty(self) -> bool: """检查参数是否为空 @@ -73,7 +74,8 @@ class CommandArgs: if 0 <= index < len(args): return args[index] return default - + + @property def get_first(self, default: str = "") -> str: """获取第一个参数 diff --git a/src/plugin_system/utils/permission_decorators.py b/src/plugin_system/utils/permission_decorators.py index ae5b48e0e..0f31a94f9 100644 --- a/src/plugin_system/utils/permission_decorators.py +++ b/src/plugin_system/utils/permission_decorators.py @@ -9,9 +9,9 @@ from typing import Callable, Optional from inspect import iscoroutinefunction from src.plugin_system.apis.permission_api import permission_api -from src.plugin_system.apis.send_api import send_message +from src.plugin_system.apis.send_api import text_to_stream from src.plugin_system.apis.logging_api import get_logger -from src.common.message import ChatStream +from src.chat.message_receive.chat_stream import ChatStream logger = get_logger(__name__) @@ -37,6 +37,8 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None) async def async_wrapper(*args, **kwargs): # 尝试从参数中提取 ChatStream 对象 chat_stream = None + + # 首先检查位置参数中的 ChatStream for arg in args: if isinstance(arg, ChatStream): chat_stream = arg @@ -46,21 +48,31 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None) if chat_stream is None: chat_stream = kwargs.get('chat_stream') + # 如果还没找到,检查是否是 PlusCommand 方法调用 + if chat_stream is None and args: + # 检查第一个参数是否有 message.chat_stream 属性(PlusCommand 实例) + instance = args[0] + if hasattr(instance, 'message') and hasattr(instance.message, 'chat_stream'): + chat_stream = instance.message.chat_stream + if chat_stream is None: logger.error(f"权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}") return # 检查权限 has_permission = permission_api.check_permission( - chat_stream.user_platform, - chat_stream.user_id, + chat_stream.platform, + chat_stream.user_info.user_id, permission_node ) if not has_permission: # 权限不足,发送拒绝消息 message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}" - await send_message(chat_stream, message) + await text_to_stream(message, chat_stream.stream_id) + # 对于PlusCommand的execute方法,需要返回适当的元组 + if func.__name__ == 'execute' and hasattr(args[0], 'send_text'): + return False, "权限不足", True return # 权限检查通过,执行原函数 @@ -83,13 +95,13 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None) # 检查权限 has_permission = permission_api.check_permission( - chat_stream.user_platform, - chat_stream.user_id, + chat_stream.platform, + chat_stream.user_info.user_id, permission_node ) if not has_permission: - logger.warning(f"用户 {chat_stream.user_platform}:{chat_stream.user_id} 没有权限 {permission_node}") + logger.warning(f"用户 {chat_stream.platform}:{chat_stream.user_info.user_id} 没有权限 {permission_node}") return # 权限检查通过,执行原函数 @@ -124,6 +136,8 @@ def require_master(deny_message: Optional[str] = None): async def async_wrapper(*args, **kwargs): # 尝试从参数中提取 ChatStream 对象 chat_stream = None + + # 首先检查位置参数中的 ChatStream for arg in args: if isinstance(arg, ChatStream): chat_stream = arg @@ -133,20 +147,28 @@ def require_master(deny_message: Optional[str] = None): if chat_stream is None: chat_stream = kwargs.get('chat_stream') + # 如果还没找到,检查是否是 PlusCommand 方法调用 + if chat_stream is None and args: + # 检查第一个参数是否有 message.chat_stream 属性(PlusCommand 实例) + instance = args[0] + if hasattr(instance, 'message') and hasattr(instance.message, 'chat_stream'): + chat_stream = instance.message.chat_stream + if chat_stream is None: logger.error(f"Master权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}") return # 检查是否为Master用户 is_master = permission_api.is_master( - chat_stream.user_platform, - chat_stream.user_id + chat_stream.platform, + chat_stream.user_info.user_id ) if not is_master: - # 权限不足,发送拒绝消息 message = deny_message or "❌ 此操作仅限Master用户执行" - await send_message(chat_stream, message) + await text_to_stream(message, chat_stream.stream_id) + if func.__name__ == 'execute' and hasattr(args[0], 'send_text'): + return False, "需要Master权限", True return # 权限检查通过,执行原函数 @@ -169,12 +191,12 @@ def require_master(deny_message: Optional[str] = None): # 检查是否为Master用户 is_master = permission_api.is_master( - chat_stream.user_platform, - chat_stream.user_id + chat_stream.platform, + chat_stream.user_info.user_id ) if not is_master: - logger.warning(f"用户 {chat_stream.user_platform}:{chat_stream.user_id} 不是Master用户") + logger.warning(f"用户 {chat_stream.platform}:{chat_stream.user_info.user_id} 不是Master用户") return # 权限检查通过,执行原函数 @@ -209,8 +231,8 @@ class PermissionChecker: bool: 是否拥有权限 """ return permission_api.check_permission( - chat_stream.user_platform, - chat_stream.user_id, + chat_stream.platform, + chat_stream.user_info.user_id, permission_node ) @@ -226,8 +248,8 @@ class PermissionChecker: bool: 是否为Master用户 """ return permission_api.is_master( - chat_stream.user_platform, - chat_stream.user_id + chat_stream.platform, + chat_stream.user_info.user_id ) @staticmethod @@ -248,7 +270,7 @@ class PermissionChecker: if not has_permission: message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}" - await send_message(chat_stream, message) + await text_to_stream(message, chat_stream.stream_id) return has_permission @@ -269,6 +291,6 @@ class PermissionChecker: if not is_master: message = deny_message or "❌ 此操作仅限Master用户执行" - await send_message(chat_stream, message) + await text_to_stream(message, chat_stream.stream_id) return is_master diff --git a/src/plugins/built_in/maizone_refactored/commands/send_feed_command.py b/src/plugins/built_in/maizone_refactored/commands/send_feed_command.py index 0f6903b2f..3a7be39d5 100644 --- a/src/plugins/built_in/maizone_refactored/commands/send_feed_command.py +++ b/src/plugins/built_in/maizone_refactored/commands/send_feed_command.py @@ -5,52 +5,32 @@ from typing import Tuple from src.common.logger import get_logger -from src.plugin_system import BaseCommand +from src.plugin_system.base.plus_command import PlusCommand +from src.plugin_system.base.command_args import CommandArgs +from src.plugin_system.utils.permission_decorators import require_permission from ..services.manager import get_qzone_service, get_config_getter logger = get_logger("MaiZone.SendFeedCommand") -class SendFeedCommand(BaseCommand): +class SendFeedCommand(PlusCommand): """ 响应用户通过 `/send_feed` 命令发送说说的请求。 """ command_name: str = "send_feed" command_description: str = "发送一条QQ空间说说" - command_pattern: str = r"^/send_feed(?:\s+(?P.*))?$" - command_help: str = "使用 /send_feed [主题] 来发送一条说说" + command_aliases = ["发空间"] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - def _check_permission(self) -> bool: - """检查当前用户是否有权限执行此命令""" - user_id = self.message.message_info.user_info.user_id - if not user_id: - return False - - get_config = get_config_getter() - permission_list = get_config("send.permission", []) - permission_type = get_config("send.permission_type", "whitelist") - - if not isinstance(permission_list, list): - return False - - if permission_type == 'whitelist': - return user_id in permission_list - elif permission_type == 'blacklist': - return user_id not in permission_list - return False - - async def execute(self) -> Tuple[bool, str, bool]: + @require_permission("plugin.send.permission") + async def execute(self, args: CommandArgs) -> Tuple[bool, str, bool]: """ 执行命令的核心逻辑。 """ - if not self._check_permission(): - await self.send_text("抱歉,你没有权限使用这个命令哦。") - return False, "权限不足", True - topic = self.matched_groups.get("topic", "") + topic = args.get_remaining() stream_id = self.message.chat_stream.stream_id await self.send_text(f"收到!正在为你生成关于“{topic or '随机'}”的说说,请稍候...") diff --git a/src/plugins/built_in/maizone_refactored/plugin.py b/src/plugins/built_in/maizone_refactored/plugin.py index 6bc5ecd11..f6c6fbb29 100644 --- a/src/plugins/built_in/maizone_refactored/plugin.py +++ b/src/plugins/built_in/maizone_refactored/plugin.py @@ -13,6 +13,7 @@ from src.plugin_system import ( register_plugin ) from src.plugin_system.base.config_types import ConfigField +from src.plugin_system.apis.permission_api import permission_api from .actions.read_feed_action import ReadFeedAction from .actions.send_feed_action import SendFeedAction @@ -82,7 +83,12 @@ class MaiZoneRefactoredPlugin(BasePlugin): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - + permission_api.register_permission_node( + "plugin.send.permission", + "是否可以使用机器人发送说说", + "maiZone", + False + ) content_service = ContentService(self.get_config) image_service = ImageService(self.get_config) cookie_service = CookieService(self.get_config) @@ -102,5 +108,5 @@ class MaiZoneRefactoredPlugin(BasePlugin): return [ (SendFeedAction.get_action_info(), SendFeedAction), (ReadFeedAction.get_action_info(), ReadFeedAction), - (SendFeedCommand.get_command_info(), SendFeedCommand), + (SendFeedCommand.get_plus_command_info(), SendFeedCommand), ] \ No newline at end of file diff --git a/src/plugins/built_in/permission_management/plugin.py b/src/plugins/built_in/permission_management/plugin.py index f5b87b093..d8a39107a 100644 --- a/src/plugins/built_in/permission_management/plugin.py +++ b/src/plugins/built_in/permission_management/plugin.py @@ -16,6 +16,7 @@ from src.plugin_system.apis.permission_api import permission_api from src.plugin_system.apis.logging_api import get_logger from src.plugin_system.base.component_types import PlusCommandInfo, ChatType from src.plugin_system.base.config_types import ConfigField +from src.plugin_system.utils.permission_decorators import require_permission, require_master, PermissionChecker logger = get_logger("Permission") @@ -46,70 +47,38 @@ class PermissionCommand(PlusCommand): "permission_manager", True ) - + async def execute(self, args: CommandArgs) -> Tuple[bool, Optional[str], bool]: """执行权限管理命令""" - if args.is_empty(): + if args.is_empty: await self._show_help() return True, "显示帮助信息", True - subcommand = args.get_first().lower() + subcommand = args.get_first.lower() remaining_args = args.get_args()[1:] # 获取除第一个参数外的所有参数 chat_stream = self.message.chat_stream - # 检查基本查看权限 - can_view = permission_api.check_permission( - chat_stream.platform, - chat_stream.user_info.user_id, - "plugin.permission.view" - ) or permission_api.is_master(chat_stream.platform, chat_stream.user_info.user_id) - - # 检查管理权限 - can_manage = permission_api.check_permission( - chat_stream.platform, - chat_stream.user_info.user_id, - "plugin.permission.manage" - ) or permission_api.is_master(chat_stream.platform, chat_stream.user_info.user_id) - if subcommand in ["grant", "授权", "give"]: - if not can_manage: - await self.send_text("❌ 你没有权限管理的权限") - return True, "权限不足", True await self._grant_permission(chat_stream, remaining_args) return True, "执行授权命令", True elif subcommand in ["revoke", "撤销", "remove"]: - if not can_manage: - await self.send_text("❌ 你没有权限管理的权限") - return True, "权限不足", True await self._revoke_permission(chat_stream, remaining_args) return True, "执行撤销命令", True elif subcommand in ["list", "列表", "ls"]: - if not can_view: - await self.send_text("❌ 你没有查看权限的权限") - return True, "权限不足", True await self._list_permissions(chat_stream, remaining_args) return True, "执行列表命令", True elif subcommand in ["check", "检查"]: - if not can_view: - await self.send_text("❌ 你没有查看权限的权限") - return True, "权限不足", True await self._check_permission(chat_stream, remaining_args) return True, "执行检查命令", True elif subcommand in ["nodes", "节点"]: - if not can_view: - await self.send_text("❌ 你没有查看权限的权限") - return True, "权限不足", True await self._list_nodes(chat_stream, remaining_args) return True, "执行节点命令", True elif subcommand in ["allnodes", "全部节点", "all"]: - if not can_view: - await self.send_text("❌ 你没有查看权限的权限") - return True, "权限不足", True await self._list_all_nodes_with_description(chat_stream) return True, "执行全部节点命令", True @@ -149,11 +118,18 @@ class PermissionCommand(PlusCommand): await self.send_text(help_text) def _parse_user_mention(self, mention: str) -> Optional[str]: - """解析用户提及,提取QQ号""" + """解析用户提及,提取QQ号 + + 支持的格式: + - @<用户名:QQ号> 格式 + - [CQ:at,qq=QQ号] 格式 + - 直接的QQ号 + """ # 匹配 @<用户名:QQ号> 格式,提取QQ号 at_match = re.search(r'@<[^:]+:(\d+)>', mention) if at_match: return at_match.group(1) + # 直接是数字 if mention.isdigit(): @@ -161,62 +137,94 @@ class PermissionCommand(PlusCommand): return None + @staticmethod + def parse_user_from_args(args: CommandArgs, index: int = 0) -> Optional[str]: + """从CommandArgs中解析用户ID + + Args: + args: 命令参数对象 + index: 参数索引,默认为0(第一个参数) + + Returns: + Optional[str]: 解析出的用户ID,如果解析失败返回None + """ + if index >= args.count(): + return None + + mention = args.get_arg(index) + + # 匹配 @<用户名:QQ号> 格式,提取QQ号 + at_match = re.search(r'@<[^:]+:(\d+)>', mention) + if at_match: + return at_match.group(1) + + # 匹配传统的 [CQ:at,qq=数字] 格式 + cq_match = re.search(r'\[CQ:at,qq=(\d+)\]', mention) + if cq_match: + return cq_match.group(1) + + # 直接是数字 + if mention.isdigit(): + return mention + + return None + + @require_permission("plugin.permission.manage", "❌ 你没有权限管理的权限") async def _grant_permission(self, chat_stream, args: List[str]): """授权用户权限""" if len(args) < 2: await self.send_text("❌ 用法: /permission grant <@用户|QQ号> <权限节点>") return - user_mention = args[0] - permission_node = args[1] - - # 解析用户ID - user_id = self._parse_user_mention(user_mention) + # 解析用户ID - 使用新的解析方法 + user_id = self._parse_user_mention(args[0]) if not user_id: - await self.send_text("❌ 无效的用户格式,请使用 @用户 或直接输入QQ号") + await self.send_text("❌ 无效的用户格式,请使用 @<用户名:QQ号> 或直接输入QQ号") return + permission_node = args[1] + # 执行授权 success = permission_api.grant_permission(chat_stream.platform, user_id, permission_node) if success: - await self.send_text(f"✅ 已授权用户 {user_id} 权限节点 {permission_node}") + await self.send_text(f"✅ 已授权用户 {user_id} 权限节点 `{permission_node}`") else: await self.send_text("❌ 授权失败,请检查权限节点是否存在") + @require_permission("plugin.permission.manage", "❌ 你没有权限管理的权限") async def _revoke_permission(self, chat_stream, args: List[str]): """撤销用户权限""" if len(args) < 2: await self.send_text("❌ 用法: /permission revoke <@用户|QQ号> <权限节点>") return - user_mention = args[0] - permission_node = args[1] - - # 解析用户ID - user_id = self._parse_user_mention(user_mention) + # 解析用户ID - 使用新的解析方法 + user_id = self._parse_user_mention(args[0]) if not user_id: - await self.send_text("❌ 无效的用户格式,请使用 @用户 或直接输入QQ号") + await self.send_text("❌ 无效的用户格式,请使用 @<用户名:QQ号> 或直接输入QQ号") return + permission_node = args[1] + # 执行撤销 success = permission_api.revoke_permission(chat_stream.platform, user_id, permission_node) if success: - await self.send_text(f"✅ 已撤销用户 {user_id} 权限节点 {permission_node}") + await self.send_text(f"✅ 已撤销用户 {user_id} 权限节点 `{permission_node}`") else: await self.send_text("❌ 撤销失败,请检查权限节点是否存在") + @require_permission("plugin.permission.view", "❌ 你没有查看权限的权限") async def _list_permissions(self, chat_stream, args: List[str]): """列出用户权限""" target_user_id = None if args: - # 指定了用户 - user_mention = args[0] - target_user_id = self._parse_user_mention(user_mention) + # 指定了用户 - 使用新的解析方法 + target_user_id = self._parse_user_mention(args[0]) if not target_user_id: - await self.send_text("❌ 无效的用户格式,请使用 @用户 或直接输入QQ号") + await self.send_text("❌ 无效的用户格式,请使用 @<用户名:QQ号> 或直接输入QQ号") return else: # 查看自己的权限 @@ -229,45 +237,46 @@ class PermissionCommand(PlusCommand): permissions = permission_api.get_user_permissions(chat_stream.platform, target_user_id) if is_master: - response = f"👑 用户 {target_user_id} 是Master用户,拥有所有权限" + response = f"👑 用户 `{target_user_id}` 是Master用户,拥有所有权限" else: if permissions: - perm_list = "\n".join([f"• {perm}" for perm in permissions]) - response = f"📋 用户 {target_user_id} 拥有的权限:\n{perm_list}" + perm_list = "\n".join([f"• `{perm}`" for perm in permissions]) + response = f"📋 用户 `{target_user_id}` 拥有的权限:\n{perm_list}" else: - response = f"📋 用户 {target_user_id} 没有任何权限" + response = f"📋 用户 `{target_user_id}` 没有任何权限" await self.send_text(response) + @require_permission("plugin.permission.view", "❌ 你没有查看权限的权限") async def _check_permission(self, chat_stream, args: List[str]): """检查用户权限""" if len(args) < 2: await self.send_text("❌ 用法: /permission check <@用户|QQ号> <权限节点>") return - user_mention = args[0] - permission_node = args[1] - - # 解析用户ID - user_id = self._parse_user_mention(user_mention) + # 解析用户ID - 使用新的解析方法 + user_id = self._parse_user_mention(args[0]) if not user_id: - await self.send_text("❌ 无效的用户格式,请使用 @用户 或直接输入QQ号") + await self.send_text("❌ 无效的用户格式,请使用 @<用户名:QQ号> 或直接输入QQ号") return + permission_node = args[1] + # 检查权限 has_permission = permission_api.check_permission(chat_stream.platform, user_id, permission_node) is_master = permission_api.is_master(chat_stream.platform, user_id) if has_permission: if is_master: - response = f"✅ 用户 {user_id} 拥有权限 {permission_node}(Master用户)" + response = f"✅ 用户 `{user_id}` 拥有权限 `{permission_node}`(Master用户)" else: - response = f"✅ 用户 {user_id} 拥有权限 {permission_node}" + response = f"✅ 用户 `{user_id}` 拥有权限 `{permission_node}`" else: - response = f"❌ 用户 {user_id} 没有权限 {permission_node}" + response = f"❌ 用户 `{user_id}` 没有权限 `{permission_node}`" await self.send_text(response) + @require_permission("plugin.permission.view", "❌ 你没有查看权限的权限") async def _list_nodes(self, chat_stream, args: List[str]): """列出权限节点""" plugin_name = args[0] if args else None @@ -300,6 +309,7 @@ class PermissionCommand(PlusCommand): await self.send_text(response) + @require_permission("plugin.permission.view", "❌ 你没有查看权限的权限") async def _list_all_nodes_with_description(self, chat_stream): """列出所有插件的权限节点(带详细描述)""" # 获取所有权限节点