This commit is contained in:
Windpicker-owo
2025-11-09 11:27:05 +08:00
8 changed files with 158 additions and 4151 deletions

View File

@@ -0,0 +1,44 @@
"""
清理权限节点数据库
删除所有旧的权限节点记录,让系统重新注册
"""
import asyncio
import sys
from pathlib import Path
# 添加项目根目录到 Python 路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.common.database.compatibility import get_db_session
from src.common.database.core.models import PermissionNodes
from src.common.logger import get_logger
logger = get_logger("CleanPermissionNodes")
async def clean_permission_nodes():
"""清理所有权限节点"""
try:
from sqlalchemy import delete
async with get_db_session() as session:
# 删除所有权限节点
stmt = delete(PermissionNodes)
result = await session.execute(stmt)
await session.commit()
deleted_count = result.rowcount if hasattr(result, "rowcount") else 0
logger.info(f"✅ 已清理 {deleted_count} 个权限节点记录")
print(f"✅ 已清理 {deleted_count} 个权限节点记录")
print("请重启应用以重新注册权限节点")
except Exception as e:
logger.error(f"❌ 清理权限节点失败: {e}")
print(f"❌ 清理权限节点失败: {e}")
raise
if __name__ == "__main__":
asyncio.run(clean_permission_nodes())

View File

@@ -557,11 +557,11 @@ class DefaultReplyer:
participants = [] participants = []
try: try:
# 尝试从聊天流中获取参与者信息 # 尝试从聊天流中获取参与者信息
if hasattr(stream, "chat_history_manager"): if hasattr(stream, "context_manager"):
history_manager = stream.chat_history_manager history_manager = stream.context_manager
# 获取最近的参与者列表 # 获取最近的参与者列表
recent_records = history_manager.get_memory_chat_history( recent_records = history_manager.get_memory_chat_history(
user_id=getattr(stream, "user_id", ""), user_id=getattr(stream.user_info, "user_id", ""),
count=10, count=10,
memory_types=["chat_message", "system_message"] memory_types=["chat_message", "system_message"]
) )
@@ -1411,10 +1411,10 @@ class DefaultReplyer:
safety_guidelines_block = "" safety_guidelines_block = ""
if safety_guidelines: if safety_guidelines:
guidelines_text = "\n".join(f"{i + 1}. {line}" for i, line in enumerate(safety_guidelines)) guidelines_text = "\n".join(f"{i + 1}. {line}" for i, line in enumerate(safety_guidelines))
safety_guidelines_block = f"""### 安全与互动底线 safety_guidelines_block = f"""### 互动规则
在任何情况下,你都必须遵守以下由你的设定者为你定义的原则: 在任何情况下,你都必须遵守以下由你的设定者为你定义的原则:
{guidelines_text} {guidelines_text}
如果遇到违反上述原则的请求,请在保持你核心人设的同时,巧妙地拒绝或转移话题 如果遇到违反上述原则的请求,请在保持你核心人设的同时,以合适的方式进行回应
""" """
if sender and target: if sender and target:
@@ -1740,35 +1740,20 @@ class DefaultReplyer:
if content: if content:
# 移除 [SPLIT] 标记,防止消息被分割 # 移除 [SPLIT] 标记,防止消息被分割
cleaned_content = content.replace("[SPLIT]", "") content = content.replace("[SPLIT]", "")
# 循环移除,以处理模型可能生成的嵌套回复头/尾
# 使用更健壮的正则表达式,通过非贪婪匹配和向后查找来定位真正的消息内容
pattern = re.compile(r"^\s*\[回复<.+?>\s*(?:的消息)?(?P<content>.*)\](?:?说:)?\s*$", re.DOTALL)
temp_content = cleaned_content aggressive_pattern = re.compile(r'\[\s*回复\s*<.+?>.*?\]', re.DOTALL)
while True: original_content_for_aggresive_filter = content
match = pattern.match(temp_content) cleaned_content_by_aggresive_filter = aggressive_pattern.sub('', content).strip()
if match:
new_content = match.group("content").strip()
# 如果内容没有变化,说明可能无法进一步解析,退出循环
if new_content == temp_content:
break
temp_content = new_content
else:
break # 没有匹配到,退出循环
# 在循环处理后,再使用 rsplit 来处理日志中观察到的特殊情况 # 再次检查并移除因嵌套括号可能残留的单个 ']'
# 这可以作为处理复杂嵌套的最后一道防线 if cleaned_content_by_aggresive_filter.startswith(']'):
final_split = temp_content.rsplit("],说:", 1) cleaned_content_by_aggresive_filter = cleaned_content_by_aggresive_filter[1:].strip()
if len(final_split) > 1:
final_content = final_split[1].strip()
else:
final_content = temp_content
if final_content != content: if cleaned_content_by_aggresive_filter != original_content_for_aggresive_filter:
logger.debug(f"清理了模型生成的多余内容,原始内容: '{content}', 清理后: '{final_content}'") logger.warning(f"检测到并清理了模型生成的不规范回复格式。原始内容: '{original_content_for_aggresive_filter}', 清理后: '{cleaned_content_by_aggresive_filter}'")
content = final_content content = cleaned_content_by_aggresive_filter
logger.debug(f"replyer生成内容: {content}") logger.debug(f"replyer生成内容: {content}")
return content, reasoning_content, model_name, tool_calls return content, reasoning_content, model_name, tool_calls

View File

@@ -60,15 +60,6 @@ class IPermissionManager(ABC):
class PermissionAPI: class PermissionAPI:
def __init__(self): def __init__(self):
self._permission_manager: IPermissionManager | None = None self._permission_manager: IPermissionManager | None = None
# 需要保留的前缀(视为绝对节点名,不再自动加 plugins.<plugin>. 前缀)
self.RESERVED_PREFIXES: tuple[str, ...] = ("system.",)
# 系统节点列表 (name, description, default_granted)
self._SYSTEM_NODES: list[tuple[str, str, bool]] = [
("system.superuser", "系统超级管理员:拥有所有权限", False),
("system.permission.manage", "系统权限管理:可管理所有权限节点", False),
("system.permission.view", "系统权限查看:可查看所有权限节点", True),
]
self._system_nodes_initialized: bool = False
def set_permission_manager(self, manager: IPermissionManager): def set_permission_manager(self, manager: IPermissionManager):
self._permission_manager = manager self._permission_manager = manager
@@ -97,53 +88,17 @@ class PermissionAPI:
plugin_name: str, plugin_name: str,
default_granted: bool = False, default_granted: bool = False,
*, *,
system: bool = False,
allow_relative: bool = True, allow_relative: bool = True,
) -> bool: ) -> bool:
self._ensure_manager() self._ensure_manager()
original_name = node_name original_name = node_name
if system:
# 系统节点必须以 system./sys./core. 等保留前缀开头
if not node_name.startswith(("system.", "sys.", "core.")):
node_name = f"system.{node_name}" # 自动补 system.
else:
# 普通插件节点:若不以保留前缀开头,并允许相对,则自动加前缀
if allow_relative and not node_name.startswith(self.RESERVED_PREFIXES):
node_name = f"plugins.{plugin_name}.{node_name}"
if original_name != node_name:
logger.debug(f"规范化权限节点 '{original_name}' -> '{node_name}'")
node = PermissionNode(node_name, description, plugin_name, default_granted) node = PermissionNode(node_name, description, plugin_name, default_granted)
if not self._permission_manager: if not self._permission_manager:
return False return False
return await self._permission_manager.register_permission_node(node) return await self._permission_manager.register_permission_node(node)
async def register_system_permission_node(
self, node_name: str, description: str, default_granted: bool = False
) -> bool:
"""注册系统级权限节点(不绑定具体插件,前缀保持 system./sys./core.)。"""
return await self.register_permission_node(
node_name,
description,
plugin_name="__system__",
default_granted=default_granted,
system=True,
allow_relative=True,
)
async def init_system_nodes(self) -> None:
"""初始化默认系统权限节点(幂等)。
在设置 permission_manager 之后且数据库准备好时调用一次即可。
"""
if self._system_nodes_initialized:
return
self._ensure_manager()
for name, desc, granted in self._SYSTEM_NODES:
try:
await self.register_system_permission_node(name, desc, granted)
except Exception as e: # 防御性
logger.warning(f"注册系统权限节点 {name} 失败: {e}")
self._system_nodes_initialized = True
async def grant_permission(self, platform: str, user_id: str, permission_node: str) -> bool: async def grant_permission(self, platform: str, user_id: str, permission_node: str) -> bool:
self._ensure_manager() self._ensure_manager()

View File

@@ -62,12 +62,15 @@ async def file_to_stream(
} }
action = "" action = ""
if target_stream.group_info: if target_stream.group_info and target_stream.group_info.group_id:
action = "upload_group_file" action = "upload_group_file"
params["group_id"] = target_stream.group_info.group_id params["group_id"] = target_stream.group_info.group_id
else: elif target_stream.user_info and target_stream.user_info.user_id:
action = "upload_private_file" action = "upload_private_file"
params["user_id"] = target_stream.user_info.user_id params["user_id"] = target_stream.user_info.user_id
else:
logger.error(f"[SendAPI] 无法确定文件发送目标: {stream_id}")
return False
response = await adapter_command_to_stream( response = await adapter_command_to_stream(
action=action, action=action,
@@ -173,10 +176,10 @@ async def wait_adapter_response(request_id: str, timeout: float = 30.0) -> dict:
response = await asyncio.wait_for(future, timeout=timeout) response = await asyncio.wait_for(future, timeout=timeout)
return response return response
except asyncio.TimeoutError: except asyncio.TimeoutError:
await _adapter_response_pool.pop(request_id, None) _adapter_response_pool.pop(request_id, None)
return {"status": "error", "message": "timeout"} return {"status": "error", "message": "timeout"}
except Exception as e: except Exception as e:
await _adapter_response_pool.pop(request_id, None) _adapter_response_pool.pop(request_id, None)
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
@@ -234,7 +237,7 @@ async def _send_to_target(
# 构建机器人用户信息 # 构建机器人用户信息
bot_user_info = UserInfo( bot_user_info = UserInfo(
user_id=global_config.bot.qq_account, user_id=str(global_config.bot.qq_account),
user_nickname=global_config.bot.nickname, user_nickname=global_config.bot.nickname,
platform=target_stream.platform, platform=target_stream.platform,
) )
@@ -499,6 +502,9 @@ async def adapter_command_to_stream(
logger.debug(f"[SendAPI] 创建临时虚拟聊天流: {stream_id}") logger.debug(f"[SendAPI] 创建临时虚拟聊天流: {stream_id}")
# 创建临时的用户信息和聊天流 # 创建临时的用户信息和聊天流
if not platform:
logger.error("[SendAPI] 创建临时聊天流失败: platform 未提供")
return {"status": "error", "message": "platform 未提供"}
temp_user_info = UserInfo(user_id="system", user_nickname="System", platform=platform) temp_user_info = UserInfo(user_id="system", user_nickname="System", platform=platform)
@@ -520,7 +526,7 @@ async def adapter_command_to_stream(
# 构建机器人用户信息 # 构建机器人用户信息
bot_user_info = UserInfo( bot_user_info = UserInfo(
user_id=global_config.bot.qq_account, user_id=str(global_config.bot.qq_account),
user_nickname=global_config.bot.nickname, user_nickname=global_config.bot.nickname,
platform=target_stream.platform, platform=target_stream.platform,
) )

View File

@@ -16,7 +16,7 @@ from src.plugin_system.apis.send_api import text_to_stream
logger = get_logger(__name__) logger = get_logger(__name__)
def require_permission(permission_node: str, deny_message: str | None = None): def require_permission(permission_node: str, deny_message: str | None = None, *, use_full_name: bool = False):
""" """
权限检查装饰器 权限检查装饰器
@@ -25,19 +25,29 @@ def require_permission(permission_node: str, deny_message: str | None = None):
Args: Args:
permission_node: 所需的权限节点名称 permission_node: 所需的权限节点名称
deny_message: 权限不足时的提示消息如果为None则使用默认消息 deny_message: 权限不足时的提示消息如果为None则使用默认消息
use_full_name: 是否使用完整的权限节点名称默认False
- True: permission_node 必须是完整的权限节点名称,如 "plugins.plugin_name.action"
- False: permission_node 可以是短名称,如 "action",装饰器会自动添加 "plugins.{plugin_name}." 前缀
Example: Example:
@require_permission("plugin.example.admin") # 使用完整名称(传统方式)
async def admin_command(message: Message, chat_stream: ChatStream): @require_permission("plugins.example.admin")
# 只有拥有 plugin.example.admin 权限的用户才能执行 async def admin_command(self):
pass
# 使用短名称(新方式,类似 PermissionNodeField
@require_permission("admin", use_full_name=True)
async def admin_command(self):
# 会自动转换为 "plugins.{当前插件名}.admin"
pass pass
""" """
def decorator(func: Callable): def decorator(func: Callable):
@wraps(func) @wraps(func)
async def async_wrapper(*args, **kwargs): async def async_wrapper(*args, **kwargs):
# 尝试从参数中提取 ChatStream 对象 # 尝试从参数中提取 ChatStream 对象和插件名
chat_stream = None chat_stream = None
plugin_name = None
# 首先检查位置参数中的 ChatStream # 首先检查位置参数中的 ChatStream
for arg in args: for arg in args:
@@ -45,24 +55,49 @@ def require_permission(permission_node: str, deny_message: str | None = None):
chat_stream = arg chat_stream = arg
break break
# 如果在位置参数中没找到,尝试从关键字参数中查找
if chat_stream is None:
chat_stream = kwargs.get("chat_stream")
# 如果还没找到,检查是否是 PlusCommand 方法调用 # 如果还没找到,检查是否是 PlusCommand 方法调用
if chat_stream is None and args: if args:
instance = args[0] instance = args[0]
# 检查第一个参数是否有 chat_stream 属性PlusCommand 实例) # 检查第一个参数是否有 chat_stream 属性PlusCommand 实例)
if hasattr(instance, "chat_stream"): if hasattr(instance, "chat_stream") and chat_stream is None:
chat_stream = instance.chat_stream chat_stream = instance.chat_stream
# 兼容旧的 message.chat_stream 属性
elif hasattr(instance, "message") and hasattr(instance.message, "chat_stream"): # 尝试获取插件名
chat_stream = instance.message.chat_stream # 方法1: 从类名获取(通过组件注册表)
if not plugin_name and hasattr(instance, "command_name"):
# 从组件注册表查找这个命令属于哪个插件
try:
from src.plugin_system.base.component_types import ComponentType
from src.plugin_system.core.component_registry import component_registry
component_info = component_registry.get_component_info(
instance.command_name, ComponentType.PLUS_COMMAND
)
if component_info:
plugin_name = component_info.plugin_name
except Exception:
pass
if chat_stream is None: if chat_stream is None:
logger.error(f"权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}") logger.error(f"权限装饰器无法找到 ChatStream 对象,函数: {func.__name__}")
return None return None
# 构建完整的权限节点名称
full_permission_node = permission_node
if not use_full_name:
# 需要自动构建完整名称
if not plugin_name:
logger.error(
f"权限装饰器无法推断插件名,函数: {func.__name__}"
"请使用 use_full_name=True 或确保在插件类中调用"
)
return None
full_permission_node = f"plugins.{plugin_name}.{permission_node}"
logger.info(f"自动构建权限节点: {permission_node} -> {full_permission_node} (插件: {plugin_name})")
# 检查权限 # 检查权限
if not chat_stream.user_info or not chat_stream.user_info.user_id: if not chat_stream.user_info or not chat_stream.user_info.user_id:
logger.warning(f"权限检查失败chat_stream 中缺少 user_info 或 user_id函数: {func.__name__}") logger.warning(f"权限检查失败chat_stream 中缺少 user_info 或 user_id函数: {func.__name__}")
@@ -71,12 +106,12 @@ def require_permission(permission_node: str, deny_message: str | None = None):
return None return None
has_permission = await permission_api.check_permission( has_permission = await permission_api.check_permission(
chat_stream.platform, chat_stream.user_info.user_id, permission_node chat_stream.platform, chat_stream.user_info.user_id, full_permission_node
) )
if not has_permission: if not has_permission:
# 权限不足,发送拒绝消息 # 权限不足,发送拒绝消息
message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {permission_node}" message = deny_message or f"❌ 你没有执行此操作的权限\n需要权限: {full_permission_node}"
await text_to_stream(message, chat_stream.stream_id) await text_to_stream(message, chat_stream.stream_id)
# 对于PlusCommand的execute方法需要返回适当的元组 # 对于PlusCommand的execute方法需要返回适当的元组
if func.__name__ == "execute" and hasattr(args[0], "send_text"): if func.__name__ == "execute" and hasattr(args[0], "send_text"):

View File

@@ -41,7 +41,7 @@ class SystemCommand(PlusCommand):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@require_permission("system.access", "❌ 你没有权限使用此命令") @require_permission("access", deny_message="❌ 你没有权限使用此命令")
async def execute(self, args: CommandArgs) -> tuple[bool, str | None, bool]: async def execute(self, args: CommandArgs) -> tuple[bool, str | None, bool]:
"""执行系统管理命令""" """执行系统管理命令"""
if args.is_empty: if args.is_empty:
@@ -172,7 +172,7 @@ class SystemCommand(PlusCommand):
else: else:
await self.send_text("❌ 定时任务管理命令不合法\n使用 /system schedule help 查看帮助") await self.send_text("❌ 定时任务管理命令不合法\n使用 /system schedule help 查看帮助")
@require_permission("system.schedule.view", "❌ 你没有查看定时任务的权限") @require_permission("schedule.view", deny_message="❌ 你没有查看定时任务的权限")
async def _list_schedules(self, trigger_type_str: str | None): async def _list_schedules(self, trigger_type_str: str | None):
"""列出定时任务""" """列出定时任务"""
trigger_type = None trigger_type = None
@@ -198,7 +198,7 @@ class SystemCommand(PlusCommand):
) )
await self.send_text("\n".join(response_parts)) await self.send_text("\n".join(response_parts))
@require_permission("system.schedule.view", "❌ 你没有查看定时任务详情的权限") @require_permission("schedule.view", deny_message="❌ 你没有查看定时任务详情的权限")
async def _get_schedule_info(self, schedule_id: str): async def _get_schedule_info(self, schedule_id: str):
"""获取任务详情""" """获取任务详情"""
task_info = await unified_scheduler.get_task_info(schedule_id) task_info = await unified_scheduler.get_task_info(schedule_id)
@@ -211,7 +211,7 @@ class SystemCommand(PlusCommand):
info_str += f"{key}: `{value}`\n" info_str += f"{key}: `{value}`\n"
await self.send_text(info_str) await self.send_text(info_str)
@require_permission("system.schedule.manage", "❌ 你没有管理定时任务的权限") @require_permission("schedule.manage", deny_message="❌ 你没有管理定时任务的权限")
async def _pause_schedule(self, schedule_id: str): async def _pause_schedule(self, schedule_id: str):
"""暂停任务""" """暂停任务"""
success = await unified_scheduler.pause_schedule(schedule_id) success = await unified_scheduler.pause_schedule(schedule_id)
@@ -220,7 +220,7 @@ class SystemCommand(PlusCommand):
else: else:
await self.send_text(f"❌ 暂停任务失败: `{schedule_id}`") await self.send_text(f"❌ 暂停任务失败: `{schedule_id}`")
@require_permission("system.schedule.manage", "❌ 你没有管理定时任务的权限") @require_permission("schedule.manage", deny_message="❌ 你没有管理定时任务的权限")
async def _resume_schedule(self, schedule_id: str): async def _resume_schedule(self, schedule_id: str):
"""恢复任务""" """恢复任务"""
success = await unified_scheduler.resume_schedule(schedule_id) success = await unified_scheduler.resume_schedule(schedule_id)
@@ -300,20 +300,20 @@ class SystemCommand(PlusCommand):
action = args[0].lower() action = args[0].lower()
remaining_args = args[1:] remaining_args = args[1:]
chat_stream = self.message.chat_info.stream_id chat_info = self.message.chat_info
if action in ["grant", "授权", "give"]: if action in ["grant", "授权", "give"]:
await self._grant_permission(chat_stream, remaining_args) await self._grant_permission(chat_info, remaining_args)
elif action in ["revoke", "撤销", "remove"]: elif action in ["revoke", "撤销", "remove"]:
await self._revoke_permission(chat_stream, remaining_args) await self._revoke_permission(chat_info, remaining_args)
elif action in ["list", "列表", "ls"]: elif action in ["list", "列表", "ls"]:
await self._list_permissions(chat_stream, remaining_args) await self._list_permissions(chat_info, remaining_args)
elif action in ["check", "检查"]: elif action in ["check", "检查"]:
await self._check_permission(chat_stream, remaining_args) await self._check_permission(chat_info, remaining_args)
elif action in ["nodes", "节点"]: elif action in ["nodes", "节点"]:
await self._list_nodes(chat_stream, remaining_args) await self._list_nodes(chat_info, remaining_args)
elif action in ["allnodes", "全部节点", "all"]: elif action in ["allnodes", "全部节点", "all"]:
await self._list_all_nodes_with_description(chat_stream) await self._list_all_nodes_with_description(chat_info)
else: else:
await self.send_text(f"❌ 未知的权限子命令: {action}") await self.send_text(f"❌ 未知的权限子命令: {action}")
@@ -327,8 +327,8 @@ class SystemCommand(PlusCommand):
return mention return mention
return None return None
@require_permission("system.permission.manage", "❌ 你没有权限管理的权限") @require_permission("permission.manage", deny_message="❌ 你没有权限管理的权限")
async def _grant_permission(self, chat_stream, args: list[str]): async def _grant_permission(self, chat_info, args: list[str]):
"""授权用户权限""" """授权用户权限"""
if len(args) < 2: if len(args) < 2:
await self.send_text("❌ 用法: /system permission grant <@用户|QQ号> <权限节点>") await self.send_text("❌ 用法: /system permission grant <@用户|QQ号> <权限节点>")
@@ -340,14 +340,14 @@ class SystemCommand(PlusCommand):
return return
permission_node = args[1] permission_node = args[1]
success = await permission_api.grant_permission(chat_stream.platform, user_id, permission_node) success = await permission_api.grant_permission(chat_info.platform, user_id, permission_node)
if success: if success:
await self.send_text(f"✅ 已授权用户 {user_id} 权限节点 `{permission_node}`") await self.send_text(f"✅ 已授权用户 {user_id} 权限节点 `{permission_node}`")
else: else:
await self.send_text("❌ 授权失败") await self.send_text("❌ 授权失败")
@require_permission("system.permission.manage", "❌ 你没有权限管理的权限") @require_permission("permission.manage", deny_message="❌ 你没有权限管理的权限")
async def _revoke_permission(self, chat_stream, args: list[str]): async def _revoke_permission(self, chat_info, args: list[str]):
"""撤销用户权限""" """撤销用户权限"""
if len(args) < 2: if len(args) < 2:
await self.send_text("❌ 用法: /system permission revoke <@用户|QQ号> <权限节点>") await self.send_text("❌ 用法: /system permission revoke <@用户|QQ号> <权限节点>")
@@ -359,14 +359,14 @@ class SystemCommand(PlusCommand):
return return
permission_node = args[1] permission_node = args[1]
success = await permission_api.revoke_permission(chat_stream.platform, user_id, permission_node) success = await permission_api.revoke_permission(chat_info.platform, user_id, permission_node)
if success: if success:
await self.send_text(f"✅ 已撤销用户 {user_id} 权限节点 `{permission_node}`") await self.send_text(f"✅ 已撤销用户 {user_id} 权限节点 `{permission_node}`")
else: else:
await self.send_text("❌ 撤销失败") await self.send_text("❌ 撤销失败")
@require_permission("system.permission.view", "❌ 你没有查看权限的权限") @require_permission("permission.view", deny_message="❌ 你没有查看权限的权限")
async def _list_permissions(self, chat_stream, args: list[str]): async def _list_permissions(self, chat_info, args: list[str]):
"""列出用户权限""" """列出用户权限"""
target_user_id = None target_user_id = None
if args: if args:
@@ -375,10 +375,10 @@ class SystemCommand(PlusCommand):
await self.send_text("❌ 无效的用户格式") await self.send_text("❌ 无效的用户格式")
return return
else: else:
target_user_id = chat_stream.user_info.user_id target_user_id = chat_info.user_info.user_id
is_master = await permission_api.is_master(chat_stream.platform, target_user_id) is_master = await permission_api.is_master(chat_info.platform, target_user_id)
permissions = await permission_api.get_user_permissions(chat_stream.platform, target_user_id) permissions = await permission_api.get_user_permissions(chat_info.platform, target_user_id)
if is_master: if is_master:
response = f"👑 用户 `{target_user_id}` 是Master用户拥有所有权限" response = f"👑 用户 `{target_user_id}` 是Master用户拥有所有权限"
@@ -390,8 +390,8 @@ class SystemCommand(PlusCommand):
response = f"📋 用户 `{target_user_id}` 没有任何权限" response = f"📋 用户 `{target_user_id}` 没有任何权限"
await self.send_text(response) await self.send_text(response)
@require_permission("system.permission.view", "❌ 你没有查看权限的权限") @require_permission("permission.view", deny_message="❌ 你没有查看权限的权限")
async def _check_permission(self, chat_stream, args: list[str]): async def _check_permission(self, chat_info, args: list[str]):
"""检查用户权限""" """检查用户权限"""
if len(args) < 2: if len(args) < 2:
await self.send_text("❌ 用法: /system permission check <@用户|QQ号> <权限节点>") await self.send_text("❌ 用法: /system permission check <@用户|QQ号> <权限节点>")
@@ -403,8 +403,8 @@ class SystemCommand(PlusCommand):
return return
permission_node = args[1] permission_node = args[1]
has_permission = await permission_api.check_permission(chat_stream.platform, user_id, permission_node) has_permission = await permission_api.check_permission(chat_info.platform, user_id, permission_node)
is_master = await permission_api.is_master(chat_stream.platform, user_id) is_master = await permission_api.is_master(chat_info.platform, user_id)
if has_permission: if has_permission:
response = f"✅ 用户 `{user_id}` 拥有权限 `{permission_node}`" response = f"✅ 用户 `{user_id}` 拥有权限 `{permission_node}`"
@@ -414,8 +414,8 @@ class SystemCommand(PlusCommand):
response = f"❌ 用户 `{user_id}` 没有权限 `{permission_node}`" response = f"❌ 用户 `{user_id}` 没有权限 `{permission_node}`"
await self.send_text(response) await self.send_text(response)
@require_permission("system.permission.view", "❌ 你没有查看权限的权限") @require_permission("permission.view", deny_message="❌ 你没有查看权限的权限")
async def _list_nodes(self, chat_stream, args: list[str]): async def _list_nodes(self, chat_info, args: list[str]):
"""列出权限节点""" """列出权限节点"""
plugin_name = args[0] if args else None plugin_name = args[0] if args else None
if plugin_name: if plugin_name:
@@ -439,7 +439,7 @@ class SystemCommand(PlusCommand):
response = title + "\n" + "\n".join(node_list) response = title + "\n" + "\n".join(node_list)
await self.send_text(response) await self.send_text(response)
@require_permission("system.permission.view", "❌ 你没有查看权限的权限") @require_permission("permission.view", deny_message="❌ 你没有查看权限的权限")
async def _list_all_nodes_with_description(self, chat_stream): async def _list_all_nodes_with_description(self, chat_stream):
"""列出所有插件的权限节点(带详细描述)""" """列出所有插件的权限节点(带详细描述)"""
all_nodes = await permission_api.get_all_permission_nodes() all_nodes = await permission_api.get_all_permission_nodes()
@@ -510,8 +510,8 @@ class SystemManagementPlugin(BasePlugin):
permission_nodes: ClassVar[list[PermissionNodeField]] = [ permission_nodes: ClassVar[list[PermissionNodeField]] = [
PermissionNodeField( PermissionNodeField(
node_name="system.access", node_name="access",
description="权限管理:授权和撤销权限", description="系统访问:可以使用系统管理命令",
), ),
PermissionNodeField( PermissionNodeField(
node_name="permission.manage", node_name="permission.manage",

View File

@@ -89,8 +89,8 @@ background_story = ""
# 描述MoFox-Bot说话的表达风格表达习惯如要修改可以酌情新增内容 # 描述MoFox-Bot说话的表达风格表达习惯如要修改可以酌情新增内容
reply_style = "回复可以简短一些。可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞,平淡一些。" reply_style = "回复可以简短一些。可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞,平淡一些。"
# 安全与互动底线 (Bot在任何情况下都必须遵守的原则) # 互动规则 (Bot在任何情况下都必须遵守的原则)
# 你可以在这里定义Bot的行为红线,例如如何回应不恰当的问题 # 你可以在这里定义Bot在互动中的行为准则
safety_guidelines = [ safety_guidelines = [
"拒绝任何包含骚扰、冒犯、暴力、色情或危险内容的请求。", "拒绝任何包含骚扰、冒犯、暴力、色情或危险内容的请求。",
"在拒绝时,请使用符合你人设的、坚定的语气。", "在拒绝时,请使用符合你人设的、坚定的语气。",
@@ -383,7 +383,7 @@ console_log_level = "INFO" # 控制台日志级别,可选: DEBUG, INFO, WARNIN
file_log_level = "DEBUG" # 文件日志级别,可选: DEBUG, INFO, WARNING, ERROR, CRITICAL file_log_level = "DEBUG" # 文件日志级别,可选: DEBUG, INFO, WARNING, ERROR, CRITICAL
# 第三方库日志控制 # 第三方库日志控制
suppress_libraries = ["faiss","httpx", "urllib3", "asyncio", "websockets", "httpcore", "requests", "peewee", "openai","uvicorn","rjieba","maim_message"] # 完全屏蔽的库 suppress_libraries = ["faiss","httpx", "urllib3", "asyncio", "websockets", "httpcore", "requests", "aiosqlite", "openai","uvicorn","rjieba","maim_message"] # 完全屏蔽的库
library_log_levels = { "aiohttp" = "WARNING"} # 设置特定库的日志级别 library_log_levels = { "aiohttp" = "WARNING"} # 设置特定库的日志级别
[dependency_management] # 插件Python依赖管理配置 [dependency_management] # 插件Python依赖管理配置

4018
uv.lock generated

File diff suppressed because it is too large Load Diff