更改权限
This commit is contained in:
@@ -1,13 +1,8 @@
|
||||
"""
|
||||
权限系统API - 提供权限管理相关的API接口
|
||||
|
||||
这个模块提供了权限系统的核心API,包括权限检查、权限节点管理等功能。
|
||||
插件可以通过这些API来检查用户权限和管理权限节点。
|
||||
"""
|
||||
"""纯异步权限API定义。所有外部调用方必须使用 await。"""
|
||||
|
||||
from typing import Optional, List, Dict, Any
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from src.common.logger import get_logger
|
||||
@@ -16,325 +11,172 @@ logger = get_logger(__name__)
|
||||
|
||||
|
||||
class PermissionLevel(Enum):
|
||||
"""权限等级枚举"""
|
||||
|
||||
MASTER = "master" # 最高权限,无视所有权限节点
|
||||
MASTER = "master"
|
||||
|
||||
|
||||
@dataclass
|
||||
class PermissionNode:
|
||||
"""权限节点数据类"""
|
||||
|
||||
node_name: str # 权限节点名称,如 "plugin.example.command.test"
|
||||
description: str # 权限节点描述
|
||||
plugin_name: str # 所属插件名称
|
||||
default_granted: bool = False # 默认是否授权
|
||||
node_name: str
|
||||
description: str
|
||||
plugin_name: str
|
||||
default_granted: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserInfo:
|
||||
"""用户信息数据类"""
|
||||
|
||||
platform: str # 平台类型,如 "qq"
|
||||
user_id: str # 用户ID
|
||||
platform: str
|
||||
user_id: str
|
||||
|
||||
def __post_init__(self):
|
||||
"""确保user_id是字符串类型"""
|
||||
self.user_id = str(self.user_id)
|
||||
|
||||
def to_tuple(self) -> tuple[str, str]:
|
||||
"""转换为元组格式"""
|
||||
return self.platform, self.user_id
|
||||
|
||||
|
||||
class IPermissionManager(ABC):
|
||||
"""权限管理器接口"""
|
||||
@abstractmethod
|
||||
async def check_permission(self, user: UserInfo, permission_node: str) -> bool: ...
|
||||
|
||||
@abstractmethod
|
||||
def check_permission(self, user: UserInfo, permission_node: str) -> bool:
|
||||
"""
|
||||
检查用户是否拥有指定权限节点
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 是否拥有权限
|
||||
"""
|
||||
pass
|
||||
def is_master(self, user: UserInfo) -> bool: ... # 同步快速判断
|
||||
|
||||
@abstractmethod
|
||||
def is_master(self, user: UserInfo) -> bool:
|
||||
"""
|
||||
检查用户是否为Master用户
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
|
||||
Returns:
|
||||
bool: 是否为Master用户
|
||||
"""
|
||||
pass
|
||||
async def register_permission_node(self, node: PermissionNode) -> bool: ...
|
||||
|
||||
@abstractmethod
|
||||
def register_permission_node(self, node: PermissionNode) -> bool:
|
||||
"""
|
||||
注册权限节点
|
||||
|
||||
Args:
|
||||
node: 权限节点
|
||||
|
||||
Returns:
|
||||
bool: 注册是否成功
|
||||
"""
|
||||
pass
|
||||
async def grant_permission(self, user: UserInfo, permission_node: str) -> bool: ...
|
||||
|
||||
@abstractmethod
|
||||
def grant_permission(self, user: UserInfo, permission_node: str) -> bool:
|
||||
"""
|
||||
授权用户权限节点
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 授权是否成功
|
||||
"""
|
||||
pass
|
||||
async def revoke_permission(self, user: UserInfo, permission_node: str) -> bool: ...
|
||||
|
||||
@abstractmethod
|
||||
def revoke_permission(self, user: UserInfo, permission_node: str) -> bool:
|
||||
"""
|
||||
撤销用户权限节点
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 撤销是否成功
|
||||
"""
|
||||
pass
|
||||
async def get_user_permissions(self, user: UserInfo) -> List[str]: ...
|
||||
|
||||
@abstractmethod
|
||||
def get_user_permissions(self, user: UserInfo) -> List[str]:
|
||||
"""
|
||||
获取用户拥有的所有权限节点
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
|
||||
Returns:
|
||||
List[str]: 权限节点列表
|
||||
"""
|
||||
pass
|
||||
async def get_all_permission_nodes(self) -> List[PermissionNode]: ...
|
||||
|
||||
@abstractmethod
|
||||
def get_all_permission_nodes(self) -> List[PermissionNode]:
|
||||
"""
|
||||
获取所有已注册的权限节点
|
||||
|
||||
Returns:
|
||||
List[PermissionNode]: 权限节点列表
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_plugin_permission_nodes(self, plugin_name: str) -> List[PermissionNode]:
|
||||
"""
|
||||
获取指定插件的所有权限节点
|
||||
|
||||
Args:
|
||||
plugin_name: 插件名称
|
||||
|
||||
Returns:
|
||||
List[PermissionNode]: 权限节点列表
|
||||
"""
|
||||
pass
|
||||
async def get_plugin_permission_nodes(self, plugin_name: str) -> List[PermissionNode]: ...
|
||||
|
||||
|
||||
class PermissionAPI:
|
||||
"""权限系统API类"""
|
||||
|
||||
def __init__(self):
|
||||
self._permission_manager: Optional[IPermissionManager] = None
|
||||
# 需要保留的前缀(视为绝对节点名,不再自动加 plugins.<plugin>. 前缀)
|
||||
self.RESERVED_PREFIXES: tuple[str, ...] = (
|
||||
"system.")
|
||||
# 系统节点列表 (name, description, default_granted)
|
||||
self._SYSTEM_NODES: list[tuple[str, str, bool]] = [
|
||||
("system.superuser", "系统超级管理员:拥有所有权限", False),
|
||||
("system.permission.manage", "系统权限管理:可管理所有权限节点", False),
|
||||
("system.permission.view", "系统权限查看:可查看所有权限节点", True),
|
||||
]
|
||||
self._system_nodes_initialized: bool = False
|
||||
|
||||
def set_permission_manager(self, manager: IPermissionManager):
|
||||
"""设置权限管理器实例"""
|
||||
self._permission_manager = manager
|
||||
logger.info("权限管理器已设置")
|
||||
|
||||
def _ensure_manager(self):
|
||||
"""确保权限管理器已设置"""
|
||||
if self._permission_manager is None:
|
||||
raise RuntimeError("权限管理器未设置,请先调用 set_permission_manager")
|
||||
|
||||
def check_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
|
||||
"""
|
||||
检查用户是否拥有指定权限节点
|
||||
|
||||
Args:
|
||||
platform: 平台类型,如 "qq"
|
||||
user_id: 用户ID
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 是否拥有权限
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
async def check_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
|
||||
self._ensure_manager()
|
||||
user = UserInfo(platform=platform, user_id=str(user_id))
|
||||
return self._permission_manager.check_permission(user, permission_node)
|
||||
return await self._permission_manager.check_permission(UserInfo(platform, user_id), permission_node)
|
||||
|
||||
def is_master(self, platform: str, user_id: str) -> bool:
|
||||
"""
|
||||
检查用户是否为Master用户
|
||||
|
||||
Args:
|
||||
platform: 平台类型,如 "qq"
|
||||
user_id: 用户ID
|
||||
|
||||
Returns:
|
||||
bool: 是否为Master用户
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
self._ensure_manager()
|
||||
user = UserInfo(platform=platform, user_id=str(user_id))
|
||||
return self._permission_manager.is_master(user)
|
||||
return self._permission_manager.is_master(UserInfo(platform, user_id))
|
||||
|
||||
def register_permission_node(
|
||||
self, node_name: str, description: str, plugin_name: str, default_granted: bool = False
|
||||
async def register_permission_node(
|
||||
self,
|
||||
node_name: str,
|
||||
description: str,
|
||||
plugin_name: str,
|
||||
default_granted: bool = False,
|
||||
*,
|
||||
system: bool = False,
|
||||
allow_relative: bool = True,
|
||||
) -> bool:
|
||||
"""
|
||||
注册权限节点
|
||||
|
||||
Args:
|
||||
node_name: 权限节点名称,如 "plugin.example.command.test"
|
||||
description: 权限节点描述
|
||||
plugin_name: 所属插件名称
|
||||
default_granted: 默认是否授权
|
||||
|
||||
Returns:
|
||||
bool: 注册是否成功
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
self._ensure_manager()
|
||||
node = PermissionNode(
|
||||
node_name=node_name, description=description, plugin_name=plugin_name, default_granted=default_granted
|
||||
original_name = node_name
|
||||
if system:
|
||||
# 系统节点必须以 system./sys./core. 等保留前缀开头
|
||||
if not node_name.startswith(("system.", "sys.", "core.")):
|
||||
node_name = f"system.{node_name}" # 自动补 system.
|
||||
else:
|
||||
# 普通插件节点:若不以保留前缀开头,并允许相对,则自动加前缀
|
||||
if allow_relative and not node_name.startswith(self.RESERVED_PREFIXES):
|
||||
node_name = f"plugins.{plugin_name}.{node_name}"
|
||||
if original_name != node_name:
|
||||
logger.debug(f"规范化权限节点 '{original_name}' -> '{node_name}'")
|
||||
node = PermissionNode(node_name, description, plugin_name, default_granted)
|
||||
return await self._permission_manager.register_permission_node(node)
|
||||
|
||||
async def register_system_permission_node(
|
||||
self, node_name: str, description: str, default_granted: bool = False
|
||||
) -> bool:
|
||||
"""注册系统级权限节点(不绑定具体插件,前缀保持 system./sys./core.)。"""
|
||||
return await self.register_permission_node(
|
||||
node_name,
|
||||
description,
|
||||
plugin_name="__system__",
|
||||
default_granted=default_granted,
|
||||
system=True,
|
||||
allow_relative=True,
|
||||
)
|
||||
return self._permission_manager.register_permission_node(node)
|
||||
|
||||
def grant_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
|
||||
"""
|
||||
授权用户权限节点
|
||||
|
||||
Args:
|
||||
platform: 平台类型,如 "qq"
|
||||
user_id: 用户ID
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 授权是否成功
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
async def init_system_nodes(self) -> None:
|
||||
"""初始化默认系统权限节点(幂等)。
|
||||
|
||||
在设置 permission_manager 之后且数据库准备好时调用一次即可。
|
||||
"""
|
||||
if self._system_nodes_initialized:
|
||||
return
|
||||
self._ensure_manager()
|
||||
user = UserInfo(platform=platform, user_id=str(user_id))
|
||||
return self._permission_manager.grant_permission(user, permission_node)
|
||||
for name, desc, granted in self._SYSTEM_NODES:
|
||||
try:
|
||||
await self.register_system_permission_node(name, desc, granted)
|
||||
except Exception as e: # 防御性
|
||||
logger.warning(f"注册系统权限节点 {name} 失败: {e}")
|
||||
self._system_nodes_initialized = True
|
||||
|
||||
def revoke_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
|
||||
"""
|
||||
撤销用户权限节点
|
||||
|
||||
Args:
|
||||
platform: 平台类型,如 "qq"
|
||||
user_id: 用户ID
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 撤销是否成功
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
async def grant_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
|
||||
self._ensure_manager()
|
||||
user = UserInfo(platform=platform, user_id=str(user_id))
|
||||
return self._permission_manager.revoke_permission(user, permission_node)
|
||||
return await self._permission_manager.grant_permission(UserInfo(platform, user_id), permission_node)
|
||||
|
||||
def get_user_permissions(self, platform: str, user_id: str) -> List[str]:
|
||||
"""
|
||||
获取用户拥有的所有权限节点
|
||||
|
||||
Args:
|
||||
platform: 平台类型,如 "qq"
|
||||
user_id: 用户ID
|
||||
|
||||
Returns:
|
||||
List[str]: 权限节点列表
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
async def revoke_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
|
||||
self._ensure_manager()
|
||||
user = UserInfo(platform=platform, user_id=str(user_id))
|
||||
return self._permission_manager.get_user_permissions(user)
|
||||
return await self._permission_manager.revoke_permission(UserInfo(platform, user_id), permission_node)
|
||||
|
||||
def get_all_permission_nodes(self) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
获取所有已注册的权限节点
|
||||
|
||||
Returns:
|
||||
List[Dict[str, Any]]: 权限节点列表,每个节点包含 node_name, description, plugin_name, default_granted
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
async def get_user_permissions(self, platform: str, user_id: str) -> List[str]:
|
||||
self._ensure_manager()
|
||||
nodes = self._permission_manager.get_all_permission_nodes()
|
||||
return await self._permission_manager.get_user_permissions(UserInfo(platform, user_id))
|
||||
|
||||
async def get_all_permission_nodes(self) -> List[Dict[str, Any]]:
|
||||
self._ensure_manager()
|
||||
nodes = await self._permission_manager.get_all_permission_nodes()
|
||||
return [
|
||||
{
|
||||
"node_name": node.node_name,
|
||||
"description": node.description,
|
||||
"plugin_name": node.plugin_name,
|
||||
"default_granted": node.default_granted,
|
||||
"node_name": n.node_name,
|
||||
"description": n.description,
|
||||
"plugin_name": n.plugin_name,
|
||||
"default_granted": n.default_granted,
|
||||
}
|
||||
for node in nodes
|
||||
for n in nodes
|
||||
]
|
||||
|
||||
def get_plugin_permission_nodes(self, plugin_name: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
获取指定插件的所有权限节点
|
||||
|
||||
Args:
|
||||
plugin_name: 插件名称
|
||||
|
||||
Returns:
|
||||
List[Dict[str, Any]]: 权限节点列表
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
async def get_plugin_permission_nodes(self, plugin_name: str) -> List[Dict[str, Any]]:
|
||||
self._ensure_manager()
|
||||
nodes = self._permission_manager.get_plugin_permission_nodes(plugin_name)
|
||||
nodes = await self._permission_manager.get_plugin_permission_nodes(plugin_name)
|
||||
return [
|
||||
{
|
||||
"node_name": node.node_name,
|
||||
"description": node.description,
|
||||
"plugin_name": node.plugin_name,
|
||||
"default_granted": node.default_granted,
|
||||
"node_name": n.node_name,
|
||||
"description": n.description,
|
||||
"plugin_name": n.plugin_name,
|
||||
"default_granted": n.default_granted,
|
||||
}
|
||||
for node in nodes
|
||||
for n in nodes
|
||||
]
|
||||
|
||||
|
||||
# 全局权限API实例
|
||||
permission_api = PermissionAPI()
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
from functools import wraps
|
||||
from typing import Callable, Optional
|
||||
from inspect import iscoroutinefunction
|
||||
import inspect
|
||||
|
||||
from src.plugin_system.apis.permission_api import permission_api
|
||||
from src.plugin_system.apis.send_api import text_to_stream
|
||||
@@ -61,7 +62,7 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
|
||||
return None
|
||||
|
||||
# 检查权限
|
||||
has_permission = permission_api.check_permission(
|
||||
has_permission = await permission_api.check_permission(
|
||||
chat_stream.platform, chat_stream.user_info.user_id, permission_node
|
||||
)
|
||||
|
||||
@@ -77,40 +78,13 @@ def require_permission(permission_node: str, deny_message: Optional[str] = None)
|
||||
# 权限检查通过,执行原函数
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
# 对于同步函数,我们不能发送异步消息,只能记录日志
|
||||
chat_stream = None
|
||||
for arg in args:
|
||||
if isinstance(arg, ChatStream):
|
||||
chat_stream = arg
|
||||
break
|
||||
|
||||
if chat_stream is None:
|
||||
chat_stream = kwargs.get("chat_stream")
|
||||
|
||||
if chat_stream is None:
|
||||
logger.error(f"权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
|
||||
if not iscoroutinefunction(func):
|
||||
logger.warning(f"函数 {func.__name__} 使用 require_permission 但非异步,已强制阻止执行")
|
||||
async def blocked(*_a, **_k):
|
||||
logger.error("同步函数不再支持权限装饰器,请改为 async def")
|
||||
return None
|
||||
|
||||
# 检查权限
|
||||
has_permission = permission_api.check_permission(
|
||||
chat_stream.platform, chat_stream.user_info.user_id, permission_node
|
||||
)
|
||||
|
||||
if not has_permission:
|
||||
logger.warning(
|
||||
f"用户 {chat_stream.platform}:{chat_stream.user_info.user_id} 没有权限 {permission_node}"
|
||||
)
|
||||
return None
|
||||
|
||||
# 权限检查通过,执行原函数
|
||||
return func(*args, **kwargs)
|
||||
|
||||
# 根据函数类型选择包装器
|
||||
if iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
else:
|
||||
return sync_wrapper
|
||||
return blocked
|
||||
return async_wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
@@ -171,36 +145,13 @@ def require_master(deny_message: Optional[str] = None):
|
||||
# 权限检查通过,执行原函数
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
# 对于同步函数,我们不能发送异步消息,只能记录日志
|
||||
chat_stream = None
|
||||
for arg in args:
|
||||
if isinstance(arg, ChatStream):
|
||||
chat_stream = arg
|
||||
break
|
||||
|
||||
if chat_stream is None:
|
||||
chat_stream = kwargs.get("chat_stream")
|
||||
|
||||
if chat_stream is None:
|
||||
logger.error(f"Master权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
|
||||
if not iscoroutinefunction(func):
|
||||
logger.warning(f"函数 {func.__name__} 使用 require_master 但非异步,已强制阻止执行")
|
||||
async def blocked(*_a, **_k):
|
||||
logger.error("同步函数不再支持 require_master,请改为 async def")
|
||||
return None
|
||||
|
||||
# 检查是否为Master用户
|
||||
is_master = permission_api.is_master(chat_stream.platform, chat_stream.user_info.user_id)
|
||||
|
||||
if not is_master:
|
||||
logger.warning(f"用户 {chat_stream.platform}:{chat_stream.user_info.user_id} 不是Master用户")
|
||||
return None
|
||||
|
||||
# 权限检查通过,执行原函数
|
||||
return func(*args, **kwargs)
|
||||
|
||||
# 根据函数类型选择包装器
|
||||
if iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
else:
|
||||
return sync_wrapper
|
||||
return blocked
|
||||
return async_wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
@@ -214,17 +165,7 @@ class PermissionChecker:
|
||||
|
||||
@staticmethod
|
||||
def check_permission(chat_stream: ChatStream, permission_node: str) -> bool:
|
||||
"""
|
||||
检查用户是否拥有指定权限
|
||||
|
||||
Args:
|
||||
chat_stream: 聊天流对象
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 是否拥有权限
|
||||
"""
|
||||
return permission_api.check_permission(chat_stream.platform, chat_stream.user_info.user_id, permission_node)
|
||||
raise RuntimeError("PermissionChecker.check_permission 已移除同步支持,请直接 await permission_api.check_permission")
|
||||
|
||||
@staticmethod
|
||||
def is_master(chat_stream: ChatStream) -> bool:
|
||||
@@ -254,12 +195,12 @@ class PermissionChecker:
|
||||
Returns:
|
||||
bool: 是否拥有权限
|
||||
"""
|
||||
has_permission = PermissionChecker.check_permission(chat_stream, permission_node)
|
||||
|
||||
has_permission = await permission_api.check_permission(
|
||||
chat_stream.platform, chat_stream.user_info.user_id, permission_node
|
||||
)
|
||||
if not has_permission:
|
||||
message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}"
|
||||
await text_to_stream(message, chat_stream.stream_id)
|
||||
|
||||
return has_permission
|
||||
|
||||
@staticmethod
|
||||
|
||||
Reference in New Issue
Block a user