This commit is contained in:
明天好像没什么
2025-11-09 09:13:09 +08:00
parent 402644900f
commit 6c00e41ef7
5 changed files with 118 additions and 4114 deletions

View File

@@ -60,15 +60,6 @@ class IPermissionManager(ABC):
class PermissionAPI:
def __init__(self):
self._permission_manager: IPermissionManager | None = None
# 需要保留的前缀(视为绝对节点名,不再自动加 plugins.<plugin>. 前缀)
self.RESERVED_PREFIXES: tuple[str, ...] = ("system.",)
# 系统节点列表 (name, description, default_granted)
self._SYSTEM_NODES: list[tuple[str, str, bool]] = [
("system.superuser", "系统超级管理员:拥有所有权限", False),
("system.permission.manage", "系统权限管理:可管理所有权限节点", False),
("system.permission.view", "系统权限查看:可查看所有权限节点", True),
]
self._system_nodes_initialized: bool = False
def set_permission_manager(self, manager: IPermissionManager):
self._permission_manager = manager
@@ -97,53 +88,27 @@ class PermissionAPI:
plugin_name: str,
default_granted: bool = False,
*,
system: bool = False,
allow_relative: bool = True,
) -> bool:
self._ensure_manager()
original_name = node_name
if system:
# 系统节点必须以 system./sys./core. 等保留前缀开头
if not node_name.startswith(("system.", "sys.", "core.")):
node_name = f"system.{node_name}" # 自动补 system.
else:
# 普通插件节点:若不以保留前缀开头,并允许相对,则自动加前缀
if allow_relative and not node_name.startswith(self.RESERVED_PREFIXES):
node_name = f"plugins.{plugin_name}.{node_name}"
if original_name != node_name:
logger.debug(f"规范化权限节点 '{original_name}' -> '{node_name}'")
if plugin_name != "__system__":
expected_prefix = f"plugins.{plugin_name}."
if allow_relative and not node_name.startswith("plugins."):
node_name = f"{expected_prefix}{node_name}"
elif not node_name.startswith(expected_prefix):
logger.error(
"权限节点名称不符合规范,期望以 %s 开头: %s",
expected_prefix,
node_name,
)
return False
node = PermissionNode(node_name, description, plugin_name, default_granted)
if not self._permission_manager:
return False
return await self._permission_manager.register_permission_node(node)
async def register_system_permission_node(
self, node_name: str, description: str, default_granted: bool = False
) -> bool:
"""注册系统级权限节点(不绑定具体插件,前缀保持 system./sys./core.)。"""
return await self.register_permission_node(
node_name,
description,
plugin_name="__system__",
default_granted=default_granted,
system=True,
allow_relative=True,
)
async def init_system_nodes(self) -> None:
"""初始化默认系统权限节点(幂等)。
在设置 permission_manager 之后且数据库准备好时调用一次即可。
"""
if self._system_nodes_initialized:
return
self._ensure_manager()
for name, desc, granted in self._SYSTEM_NODES:
try:
await self.register_system_permission_node(name, desc, granted)
except Exception as e: # 防御性
logger.warning(f"注册系统权限节点 {name} 失败: {e}")
self._system_nodes_initialized = True
async def grant_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
self._ensure_manager()

View File

@@ -131,12 +131,20 @@ class PermissionManager(IPermissionManager):
bool: 注册是否成功
"""
try:
expected_prefix = f"plugins.{node.plugin_name}."
if node.plugin_name != "__system__" and not node.node_name.startswith(expected_prefix):
logger.error(
"权限节点名称不符合规范,期望以 %s 开头: %s",
expected_prefix,
node.node_name,
)
return False
async with self.SessionLocal() as session:
# 检查节点是否已存在
# 检查节点是否已存在(仅支持规范化后的名称)
result = await session.execute(select(PermissionNodes).filter_by(node_name=node.node_name))
existing_node = result.scalar_one_or_none()
if existing_node:
# 更新现有节点的信息
existing_node.description = node.description
existing_node.plugin_name = node.plugin_name
existing_node.default_granted = node.default_granted
@@ -336,6 +344,12 @@ class PermissionManager(IPermissionManager):
"""
try:
async with self.SessionLocal() as session:
# 移除未规范化的旧权限节点
await session.execute(
delete(PermissionNodes).where(~PermissionNodes.node_name.like("plugins.%"))
)
await session.commit()
result = await session.execute(select(PermissionNodes))
nodes = result.scalars().all()
return [
@@ -367,6 +381,14 @@ class PermissionManager(IPermissionManager):
"""
try:
async with self.SessionLocal() as session:
# 返回前清理未规范化的旧节点
await session.execute(
delete(PermissionNodes)
.where(PermissionNodes.plugin_name == plugin_name)
.where(~PermissionNodes.node_name.like("plugins.%"))
)
await session.commit()
result = await session.execute(select(PermissionNodes).filter_by(plugin_name=plugin_name))
nodes = result.scalars().all()
return [

View File

@@ -16,7 +16,7 @@ from src.plugin_system.apis.send_api import text_to_stream
logger = get_logger(__name__)
def require_permission(permission_node: str, deny_message: str | None = None):
def require_permission(permission_node: str, deny_message: str | None = None, *, use_full_name: bool = True):
"""
权限检查装饰器
@@ -25,19 +25,29 @@ def require_permission(permission_node: str, deny_message: str | None = None):
Args:
permission_node: 所需的权限节点名称
deny_message: 权限不足时的提示消息如果为None则使用默认消息
use_full_name: 是否使用完整的权限节点名称默认False
- True: permission_node 必须是完整的权限节点名称,如 "plugins.plugin_name.action"
- False: permission_node 可以是短名称,如 "action",装饰器会自动添加 "plugins.{plugin_name}." 前缀
Example:
@require_permission("plugin.example.admin")
async def admin_command(message: Message, chat_stream: ChatStream):
# 只有拥有 plugin.example.admin 权限的用户才能执行
# 使用完整名称(传统方式)
@require_permission("plugins.example.admin")
async def admin_command(self):
pass
# 使用短名称(新方式,类似 PermissionNodeField
@require_permission("admin", use_full_name=True)
async def admin_command(self):
# 会自动转换为 "plugins.{当前插件名}.admin"
pass
"""
def decorator(func: Callable):
@wraps(func)
async def async_wrapper(*args, **kwargs):
# 尝试从参数中提取 ChatStream 对象
# 尝试从参数中提取 ChatStream 对象和插件名
chat_stream = None
plugin_name = None
# 首先检查位置参数中的 ChatStream
for arg in args:
@@ -45,24 +55,49 @@ def require_permission(permission_node: str, deny_message: str | None = None):
chat_stream = arg
break
# 如果在位置参数中没找到,尝试从关键字参数中查找
if chat_stream is None:
chat_stream = kwargs.get("chat_stream")
# 如果还没找到,检查是否是 PlusCommand 方法调用
if chat_stream is None and args:
if args:
instance = args[0]
# 检查第一个参数是否有 chat_stream 属性PlusCommand 实例)
if hasattr(instance, "chat_stream"):
if hasattr(instance, "chat_stream") and chat_stream is None:
chat_stream = instance.chat_stream
# 兼容旧的 message.chat_stream 属性
elif hasattr(instance, "message") and hasattr(instance.message, "chat_stream"):
chat_stream = instance.message.chat_stream
# 尝试获取插件名
# 方法1: 从类名获取(通过组件注册表)
if not plugin_name and hasattr(instance, "command_name"):
# 从组件注册表查找这个命令属于哪个插件
try:
from src.plugin_system.base.component_types import ComponentType
from src.plugin_system.core.component_registry import component_registry
component_info = component_registry.get_component_info(
instance.command_name, ComponentType.PLUS_COMMAND
)
if component_info:
plugin_name = component_info.plugin_name
except Exception:
pass
if chat_stream is None:
logger.error(f"权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
return None
# 构建完整的权限节点名称
full_permission_node = permission_node
if not use_full_name:
# 需要自动构建完整名称
if not plugin_name:
logger.error(
f"权限装饰器无法推断插件名,函数: {func.__name__}"
"请使用 use_full_name=True 或确保在插件类中调用"
)
return None
full_permission_node = f"plugins.{plugin_name}.{permission_node}"
logger.debug(f"自动构建权限节点: {permission_node} -> {full_permission_node}")
# 检查权限
if not chat_stream.user_info or not chat_stream.user_info.user_id:
logger.warning(f"权限检查失败chat_stream 中缺少 user_info 或 user_id函数: {func.__name__}")
@@ -71,12 +106,12 @@ def require_permission(permission_node: str, deny_message: str | None = None):
return None
has_permission = await permission_api.check_permission(
chat_stream.platform, chat_stream.user_info.user_id, permission_node
chat_stream.platform, chat_stream.user_info.user_id, full_permission_node
)
if not has_permission:
# 权限不足,发送拒绝消息
message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}"
message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {full_permission_node}"
await text_to_stream(message, chat_stream.stream_id)
# 对于PlusCommand的execute方法需要返回适当的元组
if func.__name__ == "execute" and hasattr(args[0], "send_text"):