权限
This commit is contained in:
@@ -60,15 +60,6 @@ class IPermissionManager(ABC):
|
||||
class PermissionAPI:
|
||||
def __init__(self):
|
||||
self._permission_manager: IPermissionManager | None = None
|
||||
# 需要保留的前缀(视为绝对节点名,不再自动加 plugins.<plugin>. 前缀)
|
||||
self.RESERVED_PREFIXES: tuple[str, ...] = ("system.",)
|
||||
# 系统节点列表 (name, description, default_granted)
|
||||
self._SYSTEM_NODES: list[tuple[str, str, bool]] = [
|
||||
("system.superuser", "系统超级管理员:拥有所有权限", False),
|
||||
("system.permission.manage", "系统权限管理:可管理所有权限节点", False),
|
||||
("system.permission.view", "系统权限查看:可查看所有权限节点", True),
|
||||
]
|
||||
self._system_nodes_initialized: bool = False
|
||||
|
||||
def set_permission_manager(self, manager: IPermissionManager):
|
||||
self._permission_manager = manager
|
||||
@@ -97,53 +88,27 @@ class PermissionAPI:
|
||||
plugin_name: str,
|
||||
default_granted: bool = False,
|
||||
*,
|
||||
system: bool = False,
|
||||
allow_relative: bool = True,
|
||||
) -> bool:
|
||||
self._ensure_manager()
|
||||
original_name = node_name
|
||||
if system:
|
||||
# 系统节点必须以 system./sys./core. 等保留前缀开头
|
||||
if not node_name.startswith(("system.", "sys.", "core.")):
|
||||
node_name = f"system.{node_name}" # 自动补 system.
|
||||
else:
|
||||
# 普通插件节点:若不以保留前缀开头,并允许相对,则自动加前缀
|
||||
if allow_relative and not node_name.startswith(self.RESERVED_PREFIXES):
|
||||
node_name = f"plugins.{plugin_name}.{node_name}"
|
||||
if original_name != node_name:
|
||||
logger.debug(f"规范化权限节点 '{original_name}' -> '{node_name}'")
|
||||
if plugin_name != "__system__":
|
||||
expected_prefix = f"plugins.{plugin_name}."
|
||||
if allow_relative and not node_name.startswith("plugins."):
|
||||
node_name = f"{expected_prefix}{node_name}"
|
||||
elif not node_name.startswith(expected_prefix):
|
||||
logger.error(
|
||||
"权限节点名称不符合规范,期望以 %s 开头: %s",
|
||||
expected_prefix,
|
||||
node_name,
|
||||
)
|
||||
return False
|
||||
|
||||
node = PermissionNode(node_name, description, plugin_name, default_granted)
|
||||
if not self._permission_manager:
|
||||
return False
|
||||
return await self._permission_manager.register_permission_node(node)
|
||||
|
||||
async def register_system_permission_node(
|
||||
self, node_name: str, description: str, default_granted: bool = False
|
||||
) -> bool:
|
||||
"""注册系统级权限节点(不绑定具体插件,前缀保持 system./sys./core.)。"""
|
||||
return await self.register_permission_node(
|
||||
node_name,
|
||||
description,
|
||||
plugin_name="__system__",
|
||||
default_granted=default_granted,
|
||||
system=True,
|
||||
allow_relative=True,
|
||||
)
|
||||
|
||||
async def init_system_nodes(self) -> None:
|
||||
"""初始化默认系统权限节点(幂等)。
|
||||
|
||||
在设置 permission_manager 之后且数据库准备好时调用一次即可。
|
||||
"""
|
||||
if self._system_nodes_initialized:
|
||||
return
|
||||
self._ensure_manager()
|
||||
for name, desc, granted in self._SYSTEM_NODES:
|
||||
try:
|
||||
await self.register_system_permission_node(name, desc, granted)
|
||||
except Exception as e: # 防御性
|
||||
logger.warning(f"注册系统权限节点 {name} 失败: {e}")
|
||||
self._system_nodes_initialized = True
|
||||
|
||||
async def grant_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
|
||||
self._ensure_manager()
|
||||
|
||||
Reference in New Issue
Block a user