This commit is contained in:
minecraft1024a
2025-09-21 13:30:30 +08:00
8 changed files with 235 additions and 660 deletions

View File

@@ -1,13 +1,8 @@
"""
权限系统API - 提供权限管理相关的API接口
这个模块提供了权限系统的核心API包括权限检查、权限节点管理等功能。
插件可以通过这些API来检查用户权限和管理权限节点。
"""
"""纯异步权限API定义。所有外部调用方必须使用 await。"""
from typing import Optional, List, Dict, Any
from enum import Enum
from dataclasses import dataclass
from enum import Enum
from abc import ABC, abstractmethod
from src.common.logger import get_logger
@@ -16,325 +11,172 @@ logger = get_logger(__name__)
class PermissionLevel(Enum):
"""权限等级枚举"""
MASTER = "master" # 最高权限,无视所有权限节点
MASTER = "master"
@dataclass
class PermissionNode:
"""权限节点数据类"""
node_name: str # 权限节点名称,如 "plugin.example.command.test"
description: str # 权限节点描述
plugin_name: str # 所属插件名称
default_granted: bool = False # 默认是否授权
node_name: str
description: str
plugin_name: str
default_granted: bool = False
@dataclass
class UserInfo:
"""用户信息数据类"""
platform: str # 平台类型,如 "qq"
user_id: str # 用户ID
platform: str
user_id: str
def __post_init__(self):
"""确保user_id是字符串类型"""
self.user_id = str(self.user_id)
def to_tuple(self) -> tuple[str, str]:
"""转换为元组格式"""
return self.platform, self.user_id
class IPermissionManager(ABC):
"""权限管理器接口"""
@abstractmethod
async def check_permission(self, user: UserInfo, permission_node: str) -> bool: ...
@abstractmethod
def check_permission(self, user: UserInfo, permission_node: str) -> bool:
"""
检查用户是否拥有指定权限节点
Args:
user: 用户信息
permission_node: 权限节点名称
Returns:
bool: 是否拥有权限
"""
pass
def is_master(self, user: UserInfo) -> bool: ... # 同步快速判断
@abstractmethod
def is_master(self, user: UserInfo) -> bool:
"""
检查用户是否为Master用户
Args:
user: 用户信息
Returns:
bool: 是否为Master用户
"""
pass
async def register_permission_node(self, node: PermissionNode) -> bool: ...
@abstractmethod
def register_permission_node(self, node: PermissionNode) -> bool:
"""
注册权限节点
Args:
node: 权限节点
Returns:
bool: 注册是否成功
"""
pass
async def grant_permission(self, user: UserInfo, permission_node: str) -> bool: ...
@abstractmethod
def grant_permission(self, user: UserInfo, permission_node: str) -> bool:
"""
授权用户权限节点
Args:
user: 用户信息
permission_node: 权限节点名称
Returns:
bool: 授权是否成功
"""
pass
async def revoke_permission(self, user: UserInfo, permission_node: str) -> bool: ...
@abstractmethod
def revoke_permission(self, user: UserInfo, permission_node: str) -> bool:
"""
撤销用户权限节点
Args:
user: 用户信息
permission_node: 权限节点名称
Returns:
bool: 撤销是否成功
"""
pass
async def get_user_permissions(self, user: UserInfo) -> List[str]: ...
@abstractmethod
def get_user_permissions(self, user: UserInfo) -> List[str]:
"""
获取用户拥有的所有权限节点
Args:
user: 用户信息
Returns:
List[str]: 权限节点列表
"""
pass
async def get_all_permission_nodes(self) -> List[PermissionNode]: ...
@abstractmethod
def get_all_permission_nodes(self) -> List[PermissionNode]:
"""
获取所有已注册的权限节点
Returns:
List[PermissionNode]: 权限节点列表
"""
pass
@abstractmethod
def get_plugin_permission_nodes(self, plugin_name: str) -> List[PermissionNode]:
"""
获取指定插件的所有权限节点
Args:
plugin_name: 插件名称
Returns:
List[PermissionNode]: 权限节点列表
"""
pass
async def get_plugin_permission_nodes(self, plugin_name: str) -> List[PermissionNode]: ...
class PermissionAPI:
"""权限系统API类"""
def __init__(self):
self._permission_manager: Optional[IPermissionManager] = None
# 需要保留的前缀(视为绝对节点名,不再自动加 plugins.<plugin>. 前缀)
self.RESERVED_PREFIXES: tuple[str, ...] = (
"system.")
# 系统节点列表 (name, description, default_granted)
self._SYSTEM_NODES: list[tuple[str, str, bool]] = [
("system.superuser", "系统超级管理员:拥有所有权限", False),
("system.permission.manage", "系统权限管理:可管理所有权限节点", False),
("system.permission.view", "系统权限查看:可查看所有权限节点", True),
]
self._system_nodes_initialized: bool = False
def set_permission_manager(self, manager: IPermissionManager):
"""设置权限管理器实例"""
self._permission_manager = manager
logger.info("权限管理器已设置")
def _ensure_manager(self):
"""确保权限管理器已设置"""
if self._permission_manager is None:
raise RuntimeError("权限管理器未设置,请先调用 set_permission_manager")
def check_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
"""
检查用户是否拥有指定权限节点
Args:
platform: 平台类型,如 "qq"
user_id: 用户ID
permission_node: 权限节点名称
Returns:
bool: 是否拥有权限
Raises:
RuntimeError: 权限管理器未设置时抛出
"""
async def check_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
self._ensure_manager()
user = UserInfo(platform=platform, user_id=str(user_id))
return self._permission_manager.check_permission(user, permission_node)
return await self._permission_manager.check_permission(UserInfo(platform, user_id), permission_node)
def is_master(self, platform: str, user_id: str) -> bool:
"""
检查用户是否为Master用户
Args:
platform: 平台类型,如 "qq"
user_id: 用户ID
Returns:
bool: 是否为Master用户
Raises:
RuntimeError: 权限管理器未设置时抛出
"""
self._ensure_manager()
user = UserInfo(platform=platform, user_id=str(user_id))
return self._permission_manager.is_master(user)
return self._permission_manager.is_master(UserInfo(platform, user_id))
def register_permission_node(
self, node_name: str, description: str, plugin_name: str, default_granted: bool = False
async def register_permission_node(
self,
node_name: str,
description: str,
plugin_name: str,
default_granted: bool = False,
*,
system: bool = False,
allow_relative: bool = True,
) -> bool:
"""
注册权限节点
Args:
node_name: 权限节点名称,如 "plugin.example.command.test"
description: 权限节点描述
plugin_name: 所属插件名称
default_granted: 默认是否授权
Returns:
bool: 注册是否成功
Raises:
RuntimeError: 权限管理器未设置时抛出
"""
self._ensure_manager()
node = PermissionNode(
node_name=node_name, description=description, plugin_name=plugin_name, default_granted=default_granted
original_name = node_name
if system:
# 系统节点必须以 system./sys./core. 等保留前缀开头
if not node_name.startswith(("system.", "sys.", "core.")):
node_name = f"system.{node_name}" # 自动补 system.
else:
# 普通插件节点:若不以保留前缀开头,并允许相对,则自动加前缀
if allow_relative and not node_name.startswith(self.RESERVED_PREFIXES):
node_name = f"plugins.{plugin_name}.{node_name}"
if original_name != node_name:
logger.debug(f"规范化权限节点 '{original_name}' -> '{node_name}'")
node = PermissionNode(node_name, description, plugin_name, default_granted)
return await self._permission_manager.register_permission_node(node)
async def register_system_permission_node(
self, node_name: str, description: str, default_granted: bool = False
) -> bool:
"""注册系统级权限节点(不绑定具体插件,前缀保持 system./sys./core.)。"""
return await self.register_permission_node(
node_name,
description,
plugin_name="__system__",
default_granted=default_granted,
system=True,
allow_relative=True,
)
return self._permission_manager.register_permission_node(node)
def grant_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
"""
授权用户权限节点
Args:
platform: 平台类型,如 "qq"
user_id: 用户ID
permission_node: 权限节点名称
Returns:
bool: 授权是否成功
Raises:
RuntimeError: 权限管理器未设置时抛出
async def init_system_nodes(self) -> None:
"""初始化默认系统权限节点(幂等)。
在设置 permission_manager 之后且数据库准备好时调用一次即可。
"""
if self._system_nodes_initialized:
return
self._ensure_manager()
user = UserInfo(platform=platform, user_id=str(user_id))
return self._permission_manager.grant_permission(user, permission_node)
for name, desc, granted in self._SYSTEM_NODES:
try:
await self.register_system_permission_node(name, desc, granted)
except Exception as e: # 防御性
logger.warning(f"注册系统权限节点 {name} 失败: {e}")
self._system_nodes_initialized = True
def revoke_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
"""
撤销用户权限节点
Args:
platform: 平台类型,如 "qq"
user_id: 用户ID
permission_node: 权限节点名称
Returns:
bool: 撤销是否成功
Raises:
RuntimeError: 权限管理器未设置时抛出
"""
async def grant_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
self._ensure_manager()
user = UserInfo(platform=platform, user_id=str(user_id))
return self._permission_manager.revoke_permission(user, permission_node)
return await self._permission_manager.grant_permission(UserInfo(platform, user_id), permission_node)
def get_user_permissions(self, platform: str, user_id: str) -> List[str]:
"""
获取用户拥有的所有权限节点
Args:
platform: 平台类型,如 "qq"
user_id: 用户ID
Returns:
List[str]: 权限节点列表
Raises:
RuntimeError: 权限管理器未设置时抛出
"""
async def revoke_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
self._ensure_manager()
user = UserInfo(platform=platform, user_id=str(user_id))
return self._permission_manager.get_user_permissions(user)
return await self._permission_manager.revoke_permission(UserInfo(platform, user_id), permission_node)
def get_all_permission_nodes(self) -> List[Dict[str, Any]]:
"""
获取所有已注册的权限节点
Returns:
List[Dict[str, Any]]: 权限节点列表,每个节点包含 node_name, description, plugin_name, default_granted
Raises:
RuntimeError: 权限管理器未设置时抛出
"""
async def get_user_permissions(self, platform: str, user_id: str) -> List[str]:
self._ensure_manager()
nodes = self._permission_manager.get_all_permission_nodes()
return await self._permission_manager.get_user_permissions(UserInfo(platform, user_id))
async def get_all_permission_nodes(self) -> List[Dict[str, Any]]:
self._ensure_manager()
nodes = await self._permission_manager.get_all_permission_nodes()
return [
{
"node_name": node.node_name,
"description": node.description,
"plugin_name": node.plugin_name,
"default_granted": node.default_granted,
"node_name": n.node_name,
"description": n.description,
"plugin_name": n.plugin_name,
"default_granted": n.default_granted,
}
for node in nodes
for n in nodes
]
def get_plugin_permission_nodes(self, plugin_name: str) -> List[Dict[str, Any]]:
"""
获取指定插件的所有权限节点
Args:
plugin_name: 插件名称
Returns:
List[Dict[str, Any]]: 权限节点列表
Raises:
RuntimeError: 权限管理器未设置时抛出
"""
async def get_plugin_permission_nodes(self, plugin_name: str) -> List[Dict[str, Any]]:
self._ensure_manager()
nodes = self._permission_manager.get_plugin_permission_nodes(plugin_name)
nodes = await self._permission_manager.get_plugin_permission_nodes(plugin_name)
return [
{
"node_name": node.node_name,
"description": node.description,
"plugin_name": node.plugin_name,
"default_granted": node.default_granted,
"node_name": n.node_name,
"description": n.description,
"plugin_name": n.plugin_name,
"default_granted": n.default_granted,
}
for node in nodes
for n in nodes
]
# 全局权限API实例
permission_api = PermissionAPI()

View File

@@ -7,6 +7,7 @@
from functools import wraps
from typing import Callable, Optional
from inspect import iscoroutinefunction
import inspect
from src.plugin_system.apis.permission_api import permission_api
from src.plugin_system.apis.send_api import text_to_stream
@@ -61,7 +62,7 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
return None
# 检查权限
has_permission = permission_api.check_permission(
has_permission = await permission_api.check_permission(
chat_stream.platform, chat_stream.user_info.user_id, permission_node
)
@@ -77,40 +78,13 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
# 权限检查通过,执行原函数
return await func(*args, **kwargs)
def sync_wrapper(*args, **kwargs):
# 对于同步函数,我们不能发送异步消息,只能记录日志
chat_stream = None
for arg in args:
if isinstance(arg, ChatStream):
chat_stream = arg
break
if chat_stream is None:
chat_stream = kwargs.get("chat_stream")
if chat_stream is None:
logger.error(f"权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
if not iscoroutinefunction(func):
logger.warning(f"函数 {func.__name__} 使用 require_permission 但非异步,已强制阻止执行")
async def blocked(*_a, **_k):
logger.error("同步函数不再支持权限装饰器,请改为 async def")
return None
# 检查权限
has_permission = permission_api.check_permission(
chat_stream.platform, chat_stream.user_info.user_id, permission_node
)
if not has_permission:
logger.warning(
f"用户 {chat_stream.platform}:{chat_stream.user_info.user_id} 没有权限 {permission_node}"
)
return None
# 权限检查通过,执行原函数
return func(*args, **kwargs)
# 根据函数类型选择包装器
if iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return blocked
return async_wrapper
return decorator
@@ -171,36 +145,13 @@ def require_master(deny_message: Optional[str] = None):
# 权限检查通过,执行原函数
return await func(*args, **kwargs)
def sync_wrapper(*args, **kwargs):
# 对于同步函数,我们不能发送异步消息,只能记录日志
chat_stream = None
for arg in args:
if isinstance(arg, ChatStream):
chat_stream = arg
break
if chat_stream is None:
chat_stream = kwargs.get("chat_stream")
if chat_stream is None:
logger.error(f"Master权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
if not iscoroutinefunction(func):
logger.warning(f"函数 {func.__name__} 使用 require_master 但非异步,已强制阻止执行")
async def blocked(*_a, **_k):
logger.error("同步函数不再支持 require_master请改为 async def")
return None
# 检查是否为Master用户
is_master = permission_api.is_master(chat_stream.platform, chat_stream.user_info.user_id)
if not is_master:
logger.warning(f"用户 {chat_stream.platform}:{chat_stream.user_info.user_id} 不是Master用户")
return None
# 权限检查通过,执行原函数
return func(*args, **kwargs)
# 根据函数类型选择包装器
if iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return blocked
return async_wrapper
return decorator
@@ -214,17 +165,7 @@ class PermissionChecker:
@staticmethod
def check_permission(chat_stream: ChatStream, permission_node: str) -> bool:
"""
检查用户是否拥有指定权限
Args:
chat_stream: 聊天流对象
permission_node: 权限节点名称
Returns:
bool: 是否拥有权限
"""
return permission_api.check_permission(chat_stream.platform, chat_stream.user_info.user_id, permission_node)
raise RuntimeError("PermissionChecker.check_permission 已移除同步支持,请直接 await permission_api.check_permission")
@staticmethod
def is_master(chat_stream: ChatStream) -> bool:
@@ -254,12 +195,12 @@ class PermissionChecker:
Returns:
bool: 是否拥有权限
"""
has_permission = PermissionChecker.check_permission(chat_stream, permission_node)
has_permission = await permission_api.check_permission(
chat_stream.platform, chat_stream.user_info.user_id, permission_node
)
if not has_permission:
message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}"
await text_to_stream(message, chat_stream.stream_id)
return has_permission
@staticmethod