refactor: 清理代码质量和移除未使用文件
- 移除未使用的导入语句和变量 - 修复代码风格问题(空格、格式化等) - 删除备份文件和测试文件 - 改进异常处理链式调用 - 添加权限系统数据库模型和配置 - 更新版本号至6.4.4 - 优化SQL查询使用正确的布尔表达式
This commit is contained in:
@@ -18,6 +18,7 @@ from src.plugin_system.apis import (
|
||||
plugin_manage_api,
|
||||
send_api,
|
||||
tool_api,
|
||||
permission_api,
|
||||
)
|
||||
from .logging_api import get_logger
|
||||
from .plugin_register_api import register_plugin
|
||||
@@ -38,4 +39,5 @@ __all__ = [
|
||||
"get_logger",
|
||||
"register_plugin",
|
||||
"tool_api",
|
||||
"permission_api",
|
||||
]
|
||||
|
||||
339
src/plugin_system/apis/permission_api.py
Normal file
339
src/plugin_system/apis/permission_api.py
Normal file
@@ -0,0 +1,339 @@
|
||||
"""
|
||||
权限系统API - 提供权限管理相关的API接口
|
||||
|
||||
这个模块提供了权限系统的核心API,包括权限检查、权限节点管理等功能。
|
||||
插件可以通过这些API来检查用户权限和管理权限节点。
|
||||
"""
|
||||
|
||||
from typing import Optional, List, Dict, Any
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from src.common.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class PermissionLevel(Enum):
|
||||
"""权限等级枚举"""
|
||||
MASTER = "master" # 最高权限,无视所有权限节点
|
||||
|
||||
|
||||
@dataclass
|
||||
class PermissionNode:
|
||||
"""权限节点数据类"""
|
||||
node_name: str # 权限节点名称,如 "plugin.example.command.test"
|
||||
description: str # 权限节点描述
|
||||
plugin_name: str # 所属插件名称
|
||||
default_granted: bool = False # 默认是否授权
|
||||
|
||||
|
||||
@dataclass
|
||||
class UserInfo:
|
||||
"""用户信息数据类"""
|
||||
platform: str # 平台类型,如 "qq"
|
||||
user_id: str # 用户ID
|
||||
|
||||
def __post_init__(self):
|
||||
"""确保user_id是字符串类型"""
|
||||
self.user_id = str(self.user_id)
|
||||
|
||||
def to_tuple(self) -> tuple[str, str]:
|
||||
"""转换为元组格式"""
|
||||
return (self.platform, self.user_id)
|
||||
|
||||
|
||||
class IPermissionManager(ABC):
|
||||
"""权限管理器接口"""
|
||||
|
||||
@abstractmethod
|
||||
def check_permission(self, user: UserInfo, permission_node: str) -> bool:
|
||||
"""
|
||||
检查用户是否拥有指定权限节点
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 是否拥有权限
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def is_master(self, user: UserInfo) -> bool:
|
||||
"""
|
||||
检查用户是否为Master用户
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
|
||||
Returns:
|
||||
bool: 是否为Master用户
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def register_permission_node(self, node: PermissionNode) -> bool:
|
||||
"""
|
||||
注册权限节点
|
||||
|
||||
Args:
|
||||
node: 权限节点
|
||||
|
||||
Returns:
|
||||
bool: 注册是否成功
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def grant_permission(self, user: UserInfo, permission_node: str) -> bool:
|
||||
"""
|
||||
授权用户权限节点
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 授权是否成功
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def revoke_permission(self, user: UserInfo, permission_node: str) -> bool:
|
||||
"""
|
||||
撤销用户权限节点
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 撤销是否成功
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_user_permissions(self, user: UserInfo) -> List[str]:
|
||||
"""
|
||||
获取用户拥有的所有权限节点
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
|
||||
Returns:
|
||||
List[str]: 权限节点列表
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_all_permission_nodes(self) -> List[PermissionNode]:
|
||||
"""
|
||||
获取所有已注册的权限节点
|
||||
|
||||
Returns:
|
||||
List[PermissionNode]: 权限节点列表
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_plugin_permission_nodes(self, plugin_name: str) -> List[PermissionNode]:
|
||||
"""
|
||||
获取指定插件的所有权限节点
|
||||
|
||||
Args:
|
||||
plugin_name: 插件名称
|
||||
|
||||
Returns:
|
||||
List[PermissionNode]: 权限节点列表
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class PermissionAPI:
|
||||
"""权限系统API类"""
|
||||
|
||||
def __init__(self):
|
||||
self._permission_manager: Optional[IPermissionManager] = None
|
||||
|
||||
def set_permission_manager(self, manager: IPermissionManager):
|
||||
"""设置权限管理器实例"""
|
||||
self._permission_manager = manager
|
||||
logger.info("权限管理器已设置")
|
||||
|
||||
def _ensure_manager(self):
|
||||
"""确保权限管理器已设置"""
|
||||
if self._permission_manager is None:
|
||||
raise RuntimeError("权限管理器未设置,请先调用 set_permission_manager")
|
||||
|
||||
def check_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
|
||||
"""
|
||||
检查用户是否拥有指定权限节点
|
||||
|
||||
Args:
|
||||
platform: 平台类型,如 "qq"
|
||||
user_id: 用户ID
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 是否拥有权限
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
self._ensure_manager()
|
||||
user = UserInfo(platform=platform, user_id=str(user_id))
|
||||
return self._permission_manager.check_permission(user, permission_node)
|
||||
|
||||
def is_master(self, platform: str, user_id: str) -> bool:
|
||||
"""
|
||||
检查用户是否为Master用户
|
||||
|
||||
Args:
|
||||
platform: 平台类型,如 "qq"
|
||||
user_id: 用户ID
|
||||
|
||||
Returns:
|
||||
bool: 是否为Master用户
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
self._ensure_manager()
|
||||
user = UserInfo(platform=platform, user_id=str(user_id))
|
||||
return self._permission_manager.is_master(user)
|
||||
|
||||
def register_permission_node(self, node_name: str, description: str, plugin_name: str,
|
||||
default_granted: bool = False) -> bool:
|
||||
"""
|
||||
注册权限节点
|
||||
|
||||
Args:
|
||||
node_name: 权限节点名称,如 "plugin.example.command.test"
|
||||
description: 权限节点描述
|
||||
plugin_name: 所属插件名称
|
||||
default_granted: 默认是否授权
|
||||
|
||||
Returns:
|
||||
bool: 注册是否成功
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
self._ensure_manager()
|
||||
node = PermissionNode(
|
||||
node_name=node_name,
|
||||
description=description,
|
||||
plugin_name=plugin_name,
|
||||
default_granted=default_granted
|
||||
)
|
||||
return self._permission_manager.register_permission_node(node)
|
||||
|
||||
def grant_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
|
||||
"""
|
||||
授权用户权限节点
|
||||
|
||||
Args:
|
||||
platform: 平台类型,如 "qq"
|
||||
user_id: 用户ID
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 授权是否成功
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
self._ensure_manager()
|
||||
user = UserInfo(platform=platform, user_id=str(user_id))
|
||||
return self._permission_manager.grant_permission(user, permission_node)
|
||||
|
||||
def revoke_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
|
||||
"""
|
||||
撤销用户权限节点
|
||||
|
||||
Args:
|
||||
platform: 平台类型,如 "qq"
|
||||
user_id: 用户ID
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 撤销是否成功
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
self._ensure_manager()
|
||||
user = UserInfo(platform=platform, user_id=str(user_id))
|
||||
return self._permission_manager.revoke_permission(user, permission_node)
|
||||
|
||||
def get_user_permissions(self, platform: str, user_id: str) -> List[str]:
|
||||
"""
|
||||
获取用户拥有的所有权限节点
|
||||
|
||||
Args:
|
||||
platform: 平台类型,如 "qq"
|
||||
user_id: 用户ID
|
||||
|
||||
Returns:
|
||||
List[str]: 权限节点列表
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
self._ensure_manager()
|
||||
user = UserInfo(platform=platform, user_id=str(user_id))
|
||||
return self._permission_manager.get_user_permissions(user)
|
||||
|
||||
def get_all_permission_nodes(self) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
获取所有已注册的权限节点
|
||||
|
||||
Returns:
|
||||
List[Dict[str, Any]]: 权限节点列表,每个节点包含 node_name, description, plugin_name, default_granted
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
self._ensure_manager()
|
||||
nodes = self._permission_manager.get_all_permission_nodes()
|
||||
return [
|
||||
{
|
||||
"node_name": node.node_name,
|
||||
"description": node.description,
|
||||
"plugin_name": node.plugin_name,
|
||||
"default_granted": node.default_granted
|
||||
}
|
||||
for node in nodes
|
||||
]
|
||||
|
||||
def get_plugin_permission_nodes(self, plugin_name: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
获取指定插件的所有权限节点
|
||||
|
||||
Args:
|
||||
plugin_name: 插件名称
|
||||
|
||||
Returns:
|
||||
List[Dict[str, Any]]: 权限节点列表
|
||||
|
||||
Raises:
|
||||
RuntimeError: 权限管理器未设置时抛出
|
||||
"""
|
||||
self._ensure_manager()
|
||||
nodes = self._permission_manager.get_plugin_permission_nodes(plugin_name)
|
||||
return [
|
||||
{
|
||||
"node_name": node.node_name,
|
||||
"description": node.description,
|
||||
"plugin_name": node.plugin_name,
|
||||
"default_granted": node.default_granted
|
||||
}
|
||||
for node in nodes
|
||||
]
|
||||
|
||||
|
||||
# 全局权限API实例
|
||||
permission_api = PermissionAPI()
|
||||
452
src/plugin_system/core/permission_manager.py
Normal file
452
src/plugin_system/core/permission_manager.py
Normal file
@@ -0,0 +1,452 @@
|
||||
"""
|
||||
权限管理器实现
|
||||
|
||||
这个模块提供了权限系统的核心实现,包括权限检查、权限节点管理、用户权限管理等功能。
|
||||
"""
|
||||
|
||||
from typing import List, Set, Tuple
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
|
||||
from datetime import datetime
|
||||
|
||||
from src.common.logger import get_logger
|
||||
from src.common.database.sqlalchemy_models import get_database_engine, PermissionNodes, UserPermissions
|
||||
from src.plugin_system.apis.permission_api import IPermissionManager, PermissionNode, UserInfo
|
||||
from src.config.config import global_config
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class PermissionManager(IPermissionManager):
|
||||
"""权限管理器实现类"""
|
||||
|
||||
def __init__(self):
|
||||
self.engine = get_database_engine()
|
||||
self.SessionLocal = sessionmaker(bind=self.engine)
|
||||
self._master_users: Set[Tuple[str, str]] = set()
|
||||
self._load_master_users()
|
||||
logger.info("权限管理器初始化完成")
|
||||
|
||||
def _load_master_users(self):
|
||||
"""从配置文件加载Master用户列表"""
|
||||
try:
|
||||
master_users_config = global_config.permission.master_users
|
||||
self._master_users = set()
|
||||
for user_info in master_users_config:
|
||||
if isinstance(user_info, list) and len(user_info) == 2:
|
||||
platform, user_id = user_info
|
||||
self._master_users.add((str(platform), str(user_id)))
|
||||
logger.info(f"已加载 {len(self._master_users)} 个Master用户")
|
||||
except Exception as e:
|
||||
logger.warning(f"加载Master用户配置失败: {e}")
|
||||
self._master_users = set()
|
||||
|
||||
def reload_master_users(self):
|
||||
"""重新加载Master用户配置"""
|
||||
self._load_master_users()
|
||||
logger.info("Master用户配置已重新加载")
|
||||
|
||||
def is_master(self, user: UserInfo) -> bool:
|
||||
"""
|
||||
检查用户是否为Master用户
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
|
||||
Returns:
|
||||
bool: 是否为Master用户
|
||||
"""
|
||||
user_tuple = (user.platform, user.user_id)
|
||||
is_master = user_tuple in self._master_users
|
||||
if is_master:
|
||||
logger.debug(f"用户 {user.platform}:{user.user_id} 是Master用户")
|
||||
return is_master
|
||||
|
||||
def check_permission(self, user: UserInfo, permission_node: str) -> bool:
|
||||
"""
|
||||
检查用户是否拥有指定权限节点
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 是否拥有权限
|
||||
"""
|
||||
try:
|
||||
# Master用户拥有所有权限
|
||||
if self.is_master(user):
|
||||
logger.debug(f"Master用户 {user.platform}:{user.user_id} 拥有权限节点 {permission_node}")
|
||||
return True
|
||||
|
||||
with self.SessionLocal() as session:
|
||||
# 检查权限节点是否存在
|
||||
node = session.query(PermissionNodes).filter_by(node_name=permission_node).first()
|
||||
if not node:
|
||||
logger.warning(f"权限节点 {permission_node} 不存在")
|
||||
return False
|
||||
|
||||
# 检查用户是否有明确的权限设置
|
||||
user_perm = session.query(UserPermissions).filter_by(
|
||||
platform=user.platform,
|
||||
user_id=user.user_id,
|
||||
permission_node=permission_node
|
||||
).first()
|
||||
|
||||
if user_perm:
|
||||
# 有明确设置,返回设置的值
|
||||
result = user_perm.granted
|
||||
logger.debug(f"用户 {user.platform}:{user.user_id} 对权限节点 {permission_node} 的明确设置: {result}")
|
||||
return result
|
||||
else:
|
||||
# 没有明确设置,使用默认值
|
||||
result = node.default_granted
|
||||
logger.debug(f"用户 {user.platform}:{user.user_id} 对权限节点 {permission_node} 使用默认设置: {result}")
|
||||
return result
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"检查权限时数据库错误: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"检查权限时发生未知错误: {e}")
|
||||
return False
|
||||
|
||||
def register_permission_node(self, node: PermissionNode) -> bool:
|
||||
"""
|
||||
注册权限节点
|
||||
|
||||
Args:
|
||||
node: 权限节点
|
||||
|
||||
Returns:
|
||||
bool: 注册是否成功
|
||||
"""
|
||||
try:
|
||||
with self.SessionLocal() as session:
|
||||
# 检查节点是否已存在
|
||||
existing_node = session.query(PermissionNodes).filter_by(node_name=node.node_name).first()
|
||||
if existing_node:
|
||||
# 更新现有节点的信息
|
||||
existing_node.description = node.description
|
||||
existing_node.plugin_name = node.plugin_name
|
||||
existing_node.default_granted = node.default_granted
|
||||
session.commit()
|
||||
logger.debug(f"更新权限节点: {node.node_name}")
|
||||
return True
|
||||
|
||||
# 创建新节点
|
||||
new_node = PermissionNodes(
|
||||
node_name=node.node_name,
|
||||
description=node.description,
|
||||
plugin_name=node.plugin_name,
|
||||
default_granted=node.default_granted,
|
||||
created_at=datetime.utcnow()
|
||||
)
|
||||
session.add(new_node)
|
||||
session.commit()
|
||||
logger.info(f"注册新权限节点: {node.node_name} (插件: {node.plugin_name})")
|
||||
return True
|
||||
|
||||
except IntegrityError as e:
|
||||
logger.error(f"注册权限节点时发生完整性错误: {e}")
|
||||
return False
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"注册权限节点时数据库错误: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"注册权限节点时发生未知错误: {e}")
|
||||
return False
|
||||
|
||||
def grant_permission(self, user: UserInfo, permission_node: str) -> bool:
|
||||
"""
|
||||
授权用户权限节点
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 授权是否成功
|
||||
"""
|
||||
try:
|
||||
with self.SessionLocal() as session:
|
||||
# 检查权限节点是否存在
|
||||
node = session.query(PermissionNodes).filter_by(node_name=permission_node).first()
|
||||
if not node:
|
||||
logger.error(f"尝试授权不存在的权限节点: {permission_node}")
|
||||
return False
|
||||
|
||||
# 检查是否已有权限记录
|
||||
existing_perm = session.query(UserPermissions).filter_by(
|
||||
platform=user.platform,
|
||||
user_id=user.user_id,
|
||||
permission_node=permission_node
|
||||
).first()
|
||||
|
||||
if existing_perm:
|
||||
# 更新现有记录
|
||||
existing_perm.granted = True
|
||||
existing_perm.granted_at = datetime.utcnow()
|
||||
else:
|
||||
# 创建新记录
|
||||
new_perm = UserPermissions(
|
||||
platform=user.platform,
|
||||
user_id=user.user_id,
|
||||
permission_node=permission_node,
|
||||
granted=True,
|
||||
granted_at=datetime.utcnow()
|
||||
)
|
||||
session.add(new_perm)
|
||||
|
||||
session.commit()
|
||||
logger.info(f"已授权用户 {user.platform}:{user.user_id} 权限节点 {permission_node}")
|
||||
return True
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"授权权限时数据库错误: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"授权权限时发生未知错误: {e}")
|
||||
return False
|
||||
|
||||
def revoke_permission(self, user: UserInfo, permission_node: str) -> bool:
|
||||
"""
|
||||
撤销用户权限节点
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 撤销是否成功
|
||||
"""
|
||||
try:
|
||||
with self.SessionLocal() as session:
|
||||
# 检查权限节点是否存在
|
||||
node = session.query(PermissionNodes).filter_by(node_name=permission_node).first()
|
||||
if not node:
|
||||
logger.error(f"尝试撤销不存在的权限节点: {permission_node}")
|
||||
return False
|
||||
|
||||
# 检查是否已有权限记录
|
||||
existing_perm = session.query(UserPermissions).filter_by(
|
||||
platform=user.platform,
|
||||
user_id=user.user_id,
|
||||
permission_node=permission_node
|
||||
).first()
|
||||
|
||||
if existing_perm:
|
||||
# 更新现有记录
|
||||
existing_perm.granted = False
|
||||
existing_perm.granted_at = datetime.utcnow()
|
||||
else:
|
||||
# 创建新记录(明确撤销)
|
||||
new_perm = UserPermissions(
|
||||
platform=user.platform,
|
||||
user_id=user.user_id,
|
||||
permission_node=permission_node,
|
||||
granted=False,
|
||||
granted_at=datetime.utcnow()
|
||||
)
|
||||
session.add(new_perm)
|
||||
|
||||
session.commit()
|
||||
logger.info(f"已撤销用户 {user.platform}:{user.user_id} 权限节点 {permission_node}")
|
||||
return True
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"撤销权限时数据库错误: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"撤销权限时发生未知错误: {e}")
|
||||
return False
|
||||
|
||||
def get_user_permissions(self, user: UserInfo) -> List[str]:
|
||||
"""
|
||||
获取用户拥有的所有权限节点
|
||||
|
||||
Args:
|
||||
user: 用户信息
|
||||
|
||||
Returns:
|
||||
List[str]: 权限节点列表
|
||||
"""
|
||||
try:
|
||||
# Master用户拥有所有权限
|
||||
if self.is_master(user):
|
||||
with self.SessionLocal() as session:
|
||||
all_nodes = session.query(PermissionNodes.node_name).all()
|
||||
return [node.node_name for node in all_nodes]
|
||||
|
||||
permissions = []
|
||||
|
||||
with self.SessionLocal() as session:
|
||||
# 获取所有权限节点
|
||||
all_nodes = session.query(PermissionNodes).all()
|
||||
|
||||
for node in all_nodes:
|
||||
# 检查用户是否有明确的权限设置
|
||||
user_perm = session.query(UserPermissions).filter_by(
|
||||
platform=user.platform,
|
||||
user_id=user.user_id,
|
||||
permission_node=node.node_name
|
||||
).first()
|
||||
|
||||
if user_perm:
|
||||
# 有明确设置,使用设置的值
|
||||
if user_perm.granted:
|
||||
permissions.append(node.node_name)
|
||||
else:
|
||||
# 没有明确设置,使用默认值
|
||||
if node.default_granted:
|
||||
permissions.append(node.node_name)
|
||||
|
||||
return permissions
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"获取用户权限时数据库错误: {e}")
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.error(f"获取用户权限时发生未知错误: {e}")
|
||||
return []
|
||||
|
||||
def get_all_permission_nodes(self) -> List[PermissionNode]:
|
||||
"""
|
||||
获取所有已注册的权限节点
|
||||
|
||||
Returns:
|
||||
List[PermissionNode]: 权限节点列表
|
||||
"""
|
||||
try:
|
||||
with self.SessionLocal() as session:
|
||||
nodes = session.query(PermissionNodes).all()
|
||||
return [
|
||||
PermissionNode(
|
||||
node_name=node.node_name,
|
||||
description=node.description,
|
||||
plugin_name=node.plugin_name,
|
||||
default_granted=node.default_granted
|
||||
)
|
||||
for node in nodes
|
||||
]
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"获取所有权限节点时数据库错误: {e}")
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.error(f"获取所有权限节点时发生未知错误: {e}")
|
||||
return []
|
||||
|
||||
def get_plugin_permission_nodes(self, plugin_name: str) -> List[PermissionNode]:
|
||||
"""
|
||||
获取指定插件的所有权限节点
|
||||
|
||||
Args:
|
||||
plugin_name: 插件名称
|
||||
|
||||
Returns:
|
||||
List[PermissionNode]: 权限节点列表
|
||||
"""
|
||||
try:
|
||||
with self.SessionLocal() as session:
|
||||
nodes = session.query(PermissionNodes).filter_by(plugin_name=plugin_name).all()
|
||||
return [
|
||||
PermissionNode(
|
||||
node_name=node.node_name,
|
||||
description=node.description,
|
||||
plugin_name=node.plugin_name,
|
||||
default_granted=node.default_granted
|
||||
)
|
||||
for node in nodes
|
||||
]
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"获取插件权限节点时数据库错误: {e}")
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.error(f"获取插件权限节点时发生未知错误: {e}")
|
||||
return []
|
||||
|
||||
def delete_plugin_permissions(self, plugin_name: str) -> bool:
|
||||
"""
|
||||
删除指定插件的所有权限节点(用于插件卸载时清理)
|
||||
|
||||
Args:
|
||||
plugin_name: 插件名称
|
||||
|
||||
Returns:
|
||||
bool: 删除是否成功
|
||||
"""
|
||||
try:
|
||||
with self.SessionLocal() as session:
|
||||
# 获取插件的所有权限节点
|
||||
plugin_nodes = session.query(PermissionNodes).filter_by(plugin_name=plugin_name).all()
|
||||
node_names = [node.node_name for node in plugin_nodes]
|
||||
|
||||
if not node_names:
|
||||
logger.info(f"插件 {plugin_name} 没有注册任何权限节点")
|
||||
return True
|
||||
|
||||
# 删除用户权限记录
|
||||
deleted_user_perms = session.query(UserPermissions).filter(
|
||||
UserPermissions.permission_node.in_(node_names)
|
||||
).delete(synchronize_session=False)
|
||||
|
||||
# 删除权限节点
|
||||
deleted_nodes = session.query(PermissionNodes).filter_by(plugin_name=plugin_name).delete()
|
||||
|
||||
session.commit()
|
||||
logger.info(f"已删除插件 {plugin_name} 的 {deleted_nodes} 个权限节点和 {deleted_user_perms} 条用户权限记录")
|
||||
return True
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"删除插件权限时数据库错误: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"删除插件权限时发生未知错误: {e}")
|
||||
return False
|
||||
|
||||
def get_users_with_permission(self, permission_node: str) -> List[Tuple[str, str]]:
|
||||
"""
|
||||
获取拥有指定权限的所有用户
|
||||
|
||||
Args:
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
List[Tuple[str, str]]: 用户列表,格式为 [(platform, user_id), ...]
|
||||
"""
|
||||
try:
|
||||
users = []
|
||||
|
||||
with self.SessionLocal() as session:
|
||||
# 检查权限节点是否存在
|
||||
node = session.query(PermissionNodes).filter_by(node_name=permission_node).first()
|
||||
if not node:
|
||||
logger.warning(f"权限节点 {permission_node} 不存在")
|
||||
return users
|
||||
|
||||
# 获取明确授权的用户
|
||||
granted_users = session.query(UserPermissions).filter_by(
|
||||
permission_node=permission_node,
|
||||
granted=True
|
||||
).all()
|
||||
|
||||
for user_perm in granted_users:
|
||||
users.append((user_perm.platform, user_perm.user_id))
|
||||
|
||||
# 如果是默认授权的权限节点,还需要考虑没有明确设置的用户
|
||||
# 但这里我们只返回明确授权的用户,避免返回所有用户
|
||||
|
||||
# 添加Master用户(他们拥有所有权限)
|
||||
users.extend(list(self._master_users))
|
||||
|
||||
# 去重
|
||||
return list(set(users))
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
logger.error(f"获取拥有权限的用户时数据库错误: {e}")
|
||||
return []
|
||||
except Exception as e:
|
||||
logger.error(f"获取拥有权限的用户时发生未知错误: {e}")
|
||||
return []
|
||||
274
src/plugin_system/utils/permission_decorators.py
Normal file
274
src/plugin_system/utils/permission_decorators.py
Normal file
@@ -0,0 +1,274 @@
|
||||
"""
|
||||
权限装饰器
|
||||
|
||||
提供方便的权限检查装饰器,用于插件命令和其他需要权限验证的地方。
|
||||
"""
|
||||
|
||||
from functools import wraps
|
||||
from typing import Callable, Optional
|
||||
from inspect import iscoroutinefunction
|
||||
|
||||
from src.plugin_system.apis.permission_api import permission_api
|
||||
from src.plugin_system.apis.send_api import send_message
|
||||
from src.plugin_system.apis.logging_api import get_logger
|
||||
from src.common.message import ChatStream
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
def require_permission(permission_node: str, deny_message: Optional[str] = None):
|
||||
"""
|
||||
权限检查装饰器
|
||||
|
||||
用于装饰需要特定权限才能执行的函数。如果用户没有权限,会发送拒绝消息并阻止函数执行。
|
||||
|
||||
Args:
|
||||
permission_node: 所需的权限节点名称
|
||||
deny_message: 权限不足时的提示消息,如果为None则使用默认消息
|
||||
|
||||
Example:
|
||||
@require_permission("plugin.example.admin")
|
||||
async def admin_command(message: Message, chat_stream: ChatStream):
|
||||
# 只有拥有 plugin.example.admin 权限的用户才能执行
|
||||
pass
|
||||
"""
|
||||
def decorator(func: Callable):
|
||||
@wraps(func)
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
# 尝试从参数中提取 ChatStream 对象
|
||||
chat_stream = None
|
||||
for arg in args:
|
||||
if isinstance(arg, ChatStream):
|
||||
chat_stream = arg
|
||||
break
|
||||
|
||||
# 如果在位置参数中没找到,尝试从关键字参数中查找
|
||||
if chat_stream is None:
|
||||
chat_stream = kwargs.get('chat_stream')
|
||||
|
||||
if chat_stream is None:
|
||||
logger.error(f"权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
|
||||
return
|
||||
|
||||
# 检查权限
|
||||
has_permission = permission_api.check_permission(
|
||||
chat_stream.user_platform,
|
||||
chat_stream.user_id,
|
||||
permission_node
|
||||
)
|
||||
|
||||
if not has_permission:
|
||||
# 权限不足,发送拒绝消息
|
||||
message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}"
|
||||
await send_message(chat_stream, message)
|
||||
return
|
||||
|
||||
# 权限检查通过,执行原函数
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
# 对于同步函数,我们不能发送异步消息,只能记录日志
|
||||
chat_stream = None
|
||||
for arg in args:
|
||||
if isinstance(arg, ChatStream):
|
||||
chat_stream = arg
|
||||
break
|
||||
|
||||
if chat_stream is None:
|
||||
chat_stream = kwargs.get('chat_stream')
|
||||
|
||||
if chat_stream is None:
|
||||
logger.error(f"权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
|
||||
return
|
||||
|
||||
# 检查权限
|
||||
has_permission = permission_api.check_permission(
|
||||
chat_stream.user_platform,
|
||||
chat_stream.user_id,
|
||||
permission_node
|
||||
)
|
||||
|
||||
if not has_permission:
|
||||
logger.warning(f"用户 {chat_stream.user_platform}:{chat_stream.user_id} 没有权限 {permission_node}")
|
||||
return
|
||||
|
||||
# 权限检查通过,执行原函数
|
||||
return func(*args, **kwargs)
|
||||
|
||||
# 根据函数类型选择包装器
|
||||
if iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
else:
|
||||
return sync_wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def require_master(deny_message: Optional[str] = None):
|
||||
"""
|
||||
Master权限检查装饰器
|
||||
|
||||
用于装饰只有Master用户才能执行的函数。
|
||||
|
||||
Args:
|
||||
deny_message: 权限不足时的提示消息,如果为None则使用默认消息
|
||||
|
||||
Example:
|
||||
@require_master()
|
||||
async def master_only_command(message: Message, chat_stream: ChatStream):
|
||||
# 只有Master用户才能执行
|
||||
pass
|
||||
"""
|
||||
def decorator(func: Callable):
|
||||
@wraps(func)
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
# 尝试从参数中提取 ChatStream 对象
|
||||
chat_stream = None
|
||||
for arg in args:
|
||||
if isinstance(arg, ChatStream):
|
||||
chat_stream = arg
|
||||
break
|
||||
|
||||
# 如果在位置参数中没找到,尝试从关键字参数中查找
|
||||
if chat_stream is None:
|
||||
chat_stream = kwargs.get('chat_stream')
|
||||
|
||||
if chat_stream is None:
|
||||
logger.error(f"Master权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
|
||||
return
|
||||
|
||||
# 检查是否为Master用户
|
||||
is_master = permission_api.is_master(
|
||||
chat_stream.user_platform,
|
||||
chat_stream.user_id
|
||||
)
|
||||
|
||||
if not is_master:
|
||||
# 权限不足,发送拒绝消息
|
||||
message = deny_message or "❌ 此操作仅限Master用户执行"
|
||||
await send_message(chat_stream, message)
|
||||
return
|
||||
|
||||
# 权限检查通过,执行原函数
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
# 对于同步函数,我们不能发送异步消息,只能记录日志
|
||||
chat_stream = None
|
||||
for arg in args:
|
||||
if isinstance(arg, ChatStream):
|
||||
chat_stream = arg
|
||||
break
|
||||
|
||||
if chat_stream is None:
|
||||
chat_stream = kwargs.get('chat_stream')
|
||||
|
||||
if chat_stream is None:
|
||||
logger.error(f"Master权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
|
||||
return
|
||||
|
||||
# 检查是否为Master用户
|
||||
is_master = permission_api.is_master(
|
||||
chat_stream.user_platform,
|
||||
chat_stream.user_id
|
||||
)
|
||||
|
||||
if not is_master:
|
||||
logger.warning(f"用户 {chat_stream.user_platform}:{chat_stream.user_id} 不是Master用户")
|
||||
return
|
||||
|
||||
# 权限检查通过,执行原函数
|
||||
return func(*args, **kwargs)
|
||||
|
||||
# 根据函数类型选择包装器
|
||||
if iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
else:
|
||||
return sync_wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
class PermissionChecker:
|
||||
"""
|
||||
权限检查工具类
|
||||
|
||||
提供一些便捷的权限检查方法,用于在代码中进行权限验证。
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def check_permission(chat_stream: ChatStream, permission_node: str) -> bool:
|
||||
"""
|
||||
检查用户是否拥有指定权限
|
||||
|
||||
Args:
|
||||
chat_stream: 聊天流对象
|
||||
permission_node: 权限节点名称
|
||||
|
||||
Returns:
|
||||
bool: 是否拥有权限
|
||||
"""
|
||||
return permission_api.check_permission(
|
||||
chat_stream.user_platform,
|
||||
chat_stream.user_id,
|
||||
permission_node
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def is_master(chat_stream: ChatStream) -> bool:
|
||||
"""
|
||||
检查用户是否为Master用户
|
||||
|
||||
Args:
|
||||
chat_stream: 聊天流对象
|
||||
|
||||
Returns:
|
||||
bool: 是否为Master用户
|
||||
"""
|
||||
return permission_api.is_master(
|
||||
chat_stream.user_platform,
|
||||
chat_stream.user_id
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
async def ensure_permission(chat_stream: ChatStream, permission_node: str,
|
||||
deny_message: Optional[str] = None) -> bool:
|
||||
"""
|
||||
确保用户拥有指定权限,如果没有权限会发送消息并返回False
|
||||
|
||||
Args:
|
||||
chat_stream: 聊天流对象
|
||||
permission_node: 权限节点名称
|
||||
deny_message: 权限不足时的提示消息
|
||||
|
||||
Returns:
|
||||
bool: 是否拥有权限
|
||||
"""
|
||||
has_permission = PermissionChecker.check_permission(chat_stream, permission_node)
|
||||
|
||||
if not has_permission:
|
||||
message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}"
|
||||
await send_message(chat_stream, message)
|
||||
|
||||
return has_permission
|
||||
|
||||
@staticmethod
|
||||
async def ensure_master(chat_stream: ChatStream,
|
||||
deny_message: Optional[str] = None) -> bool:
|
||||
"""
|
||||
确保用户为Master用户,如果不是会发送消息并返回False
|
||||
|
||||
Args:
|
||||
chat_stream: 聊天流对象
|
||||
deny_message: 权限不足时的提示消息
|
||||
|
||||
Returns:
|
||||
bool: 是否为Master用户
|
||||
"""
|
||||
is_master = PermissionChecker.is_master(chat_stream)
|
||||
|
||||
if not is_master:
|
||||
message = deny_message or "❌ 此操作仅限Master用户执行"
|
||||
await send_message(chat_stream, message)
|
||||
|
||||
return is_master
|
||||
Reference in New Issue
Block a user