From 0bcc0ba8b54eea8f642d3d7578640e04e699d6ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9B=85=E8=AF=BA=E7=8B=90?= <212194964+foxcyber907@users.noreply.github.com> Date: Mon, 25 Aug 2025 16:16:33 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=A4=9A=E7=BA=BF=E7=A8=8B=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=92=8C=E6=B6=88=E6=81=AF=E5=88=87=E7=89=87=E9=87=8D=E7=BB=84?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增视频帧提取的线程池支持,提升大视频文件处理性能 - 集成消息切片重组器,支持长消息的自动重组处理 - 优化视频帧提取算法,使用numpy进行数值计算优化 - 重构权限管理插件,修复属性访问和方法签名问题 - 清理未使用的导入和代码,提升代码质量 - 默认启用插件管理功能 --- plugins/permission_example/plugin.py | 1 - src/chat/message_receive/bot.py | 16 +- src/chat/utils/utils_video.py | 206 +++++++++++++++--- src/chat/utils/video_worker.py | 75 +++++++ src/main.py | 18 ++ .../permission_management/_manifest.json | 39 ++++ .../built_in/permission_management/plugin.py | 59 +++-- .../built_in/plugin_management/plugin.py | 4 +- src/utils/message_chunker.py | 160 ++++++++++++++ 9 files changed, 513 insertions(+), 65 deletions(-) create mode 100644 src/chat/utils/video_worker.py create mode 100644 src/plugins/built_in/permission_management/_manifest.json create mode 100644 src/utils/message_chunker.py diff --git a/plugins/permission_example/plugin.py b/plugins/permission_example/plugin.py index a8dc88330..9fcb1a360 100644 --- a/plugins/permission_example/plugin.py +++ b/plugins/permission_example/plugin.py @@ -9,7 +9,6 @@ from typing import List from src.plugin_system.apis.plugin_register_api import register_plugin from src.plugin_system.base.base_plugin import BasePlugin from src.plugin_system.base.base_command import BaseCommand -from src.plugin_system.apis.permission_api import permission_api from src.plugin_system.apis.logging_api import get_logger from src.plugin_system.base.config_types import ConfigField from src.plugin_system.utils.permission_decorators import require_permission, require_master, PermissionChecker diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 226f2ff7d..4bef61109 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -20,8 +20,6 @@ from src.mais4u.mais4u_chat.s4u_msg_processor import S4UMessageProcessor # 导入反注入系统 from src.chat.antipromptinjector import initialize_anti_injector -# 定义日志配置 - # 获取项目根目录(假设本文件在src/chat/message_receive/下,根目录为上上上级目录) PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")) @@ -242,6 +240,20 @@ class ChatBot: - 性能计时 """ try: + # 首先处理可能的切片消息重组 + from src.utils.message_chunker import reassembler + + # 尝试重组切片消息 + reassembled_message = await reassembler.process_chunk(message_data) + if reassembled_message is None: + # 这是一个切片,但还未完整,等待更多切片 + logger.debug("等待更多切片,跳过此次处理") + return + elif reassembled_message != message_data: + # 消息已被重组,使用重组后的消息 + logger.info("使用重组后的完整消息进行处理") + message_data = reassembled_message + # 确保所有任务已启动 await self._ensure_started() diff --git a/src/chat/utils/utils_video.py b/src/chat/utils/utils_video.py index f68118580..2e886e960 100644 --- a/src/chat/utils/utils_video.py +++ b/src/chat/utils/utils_video.py @@ -12,10 +12,14 @@ import asyncio import base64 import hashlib import time +import numpy as np from PIL import Image from pathlib import Path from typing import List, Tuple, Optional, Dict import io +from concurrent.futures import ThreadPoolExecutor +from functools import partial +import numpy as np from src.llm_models.utils_model import LLMRequest from src.config.config import global_config, model_config @@ -33,6 +37,68 @@ video_events = {} video_lock_manager = asyncio.Lock() +def _extract_frames_worker(video_path: str, max_frames: int, frame_quality: int, max_image_size: int) -> List[Tuple[str, float]]: + """线程池中提取视频帧的工作函数""" + frames = [] + try: + cap = cv2.VideoCapture(video_path) + fps = cap.get(cv2.CAP_PROP_FPS) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + duration = total_frames / fps if fps > 0 else 0 + + # 使用numpy优化帧间隔计算 + if duration > 0: + frame_interval = max(1, int(duration / max_frames * fps)) + else: + frame_interval = 30 # 默认间隔 + + # 使用numpy计算目标帧位置 + target_frames = np.arange(0, min(max_frames, total_frames // frame_interval + 1)) * frame_interval + target_frames = target_frames[target_frames < total_frames].astype(int) + + for target_frame in target_frames: + # 跳转到目标帧 + cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) + ret, frame = cap.read() + if not ret: + continue + + # 使用numpy优化图像处理 + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + + # 转换为PIL图像并使用numpy进行尺寸计算 + height, width = frame_rgb.shape[:2] + max_dim = max(height, width) + + if max_dim > max_image_size: + # 使用numpy计算缩放比例 + ratio = max_image_size / max_dim + new_width = int(width * ratio) + new_height = int(height * ratio) + + # 使用opencv进行高效缩放 + frame_resized = cv2.resize(frame_rgb, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) + pil_image = Image.fromarray(frame_resized) + else: + pil_image = Image.fromarray(frame_rgb) + + # 转换为base64 + buffer = io.BytesIO() + pil_image.save(buffer, format='JPEG', quality=frame_quality) + frame_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') + + # 计算时间戳 + timestamp = target_frame / fps if fps > 0 else 0 + frames.append((frame_base64, timestamp)) + + cap.release() + return frames + + except Exception as e: + # 返回错误信息 + return [("ERROR", str(e))] + + class VideoAnalyzer: """优化的视频分析器类""" @@ -61,6 +127,9 @@ class VideoAnalyzer: self.max_image_size = config.max_image_size self.enable_frame_timing = config.enable_frame_timing self.batch_analysis_prompt = config.batch_analysis_prompt + # 新增的线程池配置 + self.use_multiprocessing = getattr(config, 'use_multiprocessing', True) + self.max_workers = getattr(config, 'max_workers', 2) # 将配置文件中的模式映射到内部使用的模式名称 config_mode = config.analysis_mode @@ -92,6 +161,8 @@ class VideoAnalyzer: self.batch_size = 3 # 批处理时每批处理的帧数 self.timeout = 60.0 # 分析超时时间(秒) self.enable_frame_timing = True + self.use_multiprocessing = True # 默认启用线程池 + self.max_workers = 2 # 默认最大2个线程 self.batch_analysis_prompt = """请分析这个视频的内容。这些图片是从视频中按时间顺序提取的关键帧。 请提供详细的分析,包括: @@ -107,7 +178,7 @@ class VideoAnalyzer: # 系统提示词 self.system_prompt = "你是一个专业的视频内容分析助手。请仔细观察用户提供的视频关键帧,详细描述视频内容。" - logger.info(f"✅ 视频分析器初始化完成,分析模式: {self.analysis_mode}") + logger.info(f"✅ 视频分析器初始化完成,分析模式: {self.analysis_mode}, 线程池: {self.use_multiprocessing}") def _calculate_video_hash(self, video_data: bytes) -> str: """计算视频文件的hash值""" @@ -182,7 +253,66 @@ class VideoAnalyzer: logger.warning(f"无效的分析模式: {mode}") async def extract_frames(self, video_path: str) -> List[Tuple[str, float]]: - """提取视频帧""" + """提取视频帧 - 支持多进程和单线程模式""" + # 先获取视频信息 + cap = cv2.VideoCapture(video_path) + fps = cap.get(cv2.CAP_PROP_FPS) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + duration = total_frames / fps if fps > 0 else 0 + cap.release() + + logger.info(f"视频信息: {total_frames}帧, {fps:.2f}FPS, {duration:.2f}秒") + + # 估算提取帧数 + if duration > 0: + frame_interval = max(1, int(duration / self.max_frames * fps)) + estimated_frames = min(self.max_frames, total_frames // frame_interval + 1) + else: + estimated_frames = self.max_frames + + logger.info(f"计算得出帧间隔: {frame_interval} (将提取约{estimated_frames}帧)") + + # 根据配置选择处理方式 + if self.use_multiprocessing: + return await self._extract_frames_multiprocess(video_path) + else: + return await self._extract_frames_fallback(video_path) + + async def _extract_frames_multiprocess(self, video_path: str) -> List[Tuple[str, float]]: + """线程池版本的帧提取""" + loop = asyncio.get_event_loop() + + try: + logger.info("🔄 启动线程池帧提取...") + # 使用线程池,避免进程间的导入问题 + with ThreadPoolExecutor(max_workers=1) as executor: + frames = await loop.run_in_executor( + executor, + _extract_frames_worker, + video_path, + self.max_frames, + self.frame_quality, + self.max_image_size + ) + + # 检查是否有错误 + if frames and frames[0][0] == "ERROR": + logger.error(f"线程池帧提取失败: {frames[0][1]}") + # 降级到单线程模式 + logger.info("🔄 降级到单线程模式...") + return await self._extract_frames_fallback(video_path) + + logger.info(f"✅ 成功提取{len(frames)}帧 (线程池模式)") + return frames + + except Exception as e: + logger.error(f"线程池帧提取失败: {e}") + # 降级到原始方法 + logger.info("🔄 降级到单线程模式...") + return await self._extract_frames_fallback(video_path) + + async def _extract_frames_fallback(self, video_path: str) -> List[Tuple[str, float]]: + """帧提取的降级方法 - 原始异步版本""" frames = [] cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) @@ -191,45 +321,61 @@ class VideoAnalyzer: logger.info(f"视频信息: {total_frames}帧, {fps:.2f}FPS, {duration:.2f}秒") - # 动态计算帧间隔 + # 使用numpy优化帧间隔计算 if duration > 0: frame_interval = max(1, int(duration / self.max_frames * fps)) else: frame_interval = 30 # 默认间隔 + + logger.info(f"计算得出帧间隔: {frame_interval} (将提取约{min(self.max_frames, total_frames // frame_interval + 1)}帧)") + + # 使用numpy计算目标帧位置 + target_frames = np.arange(0, min(self.max_frames, total_frames // frame_interval + 1)) * frame_interval + target_frames = target_frames[target_frames < total_frames].astype(int) - frame_count = 0 extracted_count = 0 - while cap.isOpened() and extracted_count < self.max_frames: + for target_frame in target_frames: + # 跳转到目标帧 + cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) ret, frame = cap.read() if not ret: - break + continue - if frame_count % frame_interval == 0: - # 转换为PIL图像并压缩 - frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - pil_image = Image.fromarray(frame_rgb) - - # 调整图像大小 - if max(pil_image.size) > self.max_image_size: - ratio = self.max_image_size / max(pil_image.size) - new_size = tuple(int(dim * ratio) for dim in pil_image.size) - pil_image = pil_image.resize(new_size, Image.Resampling.LANCZOS) - - # 转换为base64 - buffer = io.BytesIO() - pil_image.save(buffer, format='JPEG', quality=self.frame_quality) - frame_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') - - # 计算时间戳 - timestamp = frame_count / fps if fps > 0 else 0 - frames.append((frame_base64, timestamp)) - extracted_count += 1 - - logger.debug(f"提取第{extracted_count}帧 (时间: {timestamp:.2f}s)") + # 使用numpy优化图像处理 + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + + # 转换为PIL图像并使用numpy进行尺寸计算 + height, width = frame_rgb.shape[:2] + max_dim = max(height, width) + + if max_dim > self.max_image_size: + # 使用numpy计算缩放比例 + ratio = self.max_image_size / max_dim + new_width = int(width * ratio) + new_height = int(height * ratio) + + # 使用opencv进行高效缩放 + frame_resized = cv2.resize(frame_rgb, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) + pil_image = Image.fromarray(frame_resized) + else: + pil_image = Image.fromarray(frame_rgb) + + # 转换为base64 + buffer = io.BytesIO() + pil_image.save(buffer, format='JPEG', quality=self.frame_quality) + frame_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') + + # 计算时间戳 + timestamp = target_frame / fps if fps > 0 else 0 + frames.append((frame_base64, timestamp)) + extracted_count += 1 + + logger.debug(f"提取第{extracted_count}帧 (时间: {timestamp:.2f}s, 帧号: {target_frame})") + + # 每提取一帧让步一次 + await asyncio.sleep(0.001) - frame_count += 1 - cap.release() logger.info(f"✅ 成功提取{len(frames)}帧") return frames diff --git a/src/chat/utils/video_worker.py b/src/chat/utils/video_worker.py new file mode 100644 index 000000000..cc32d7a40 --- /dev/null +++ b/src/chat/utils/video_worker.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +视频帧提取工作函数 - 用于多进程 +这个模块专门用于子进程,避免重复加载主框架 +""" + +import cv2 +import base64 +import io +import numpy as np +from PIL import Image +from typing import List, Tuple + + +def extract_frames_worker(video_path: str, max_frames: int, frame_quality: int, max_image_size: int) -> List[Tuple[str, float]]: + """在子进程中提取视频帧的工作函数""" + frames = [] + try: + cap = cv2.VideoCapture(video_path) + fps = cap.get(cv2.CAP_PROP_FPS) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + duration = total_frames / fps if fps > 0 else 0 + + # 使用numpy优化帧间隔计算 + if duration > 0: + frame_interval = max(1, int(duration / max_frames * fps)) + else: + frame_interval = 30 # 默认间隔 + + # 使用numpy计算目标帧位置 + target_frames = np.arange(0, min(max_frames, total_frames // frame_interval + 1)) * frame_interval + target_frames = target_frames[target_frames < total_frames].astype(int) + + for target_frame in target_frames: + # 跳转到目标帧 + cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame) + ret, frame = cap.read() + if not ret: + continue + + # 使用numpy优化图像处理 + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + + # 转换为PIL图像并使用numpy进行尺寸计算 + height, width = frame_rgb.shape[:2] + max_dim = max(height, width) + + if max_dim > max_image_size: + # 使用numpy计算缩放比例 + ratio = max_image_size / max_dim + new_width = int(width * ratio) + new_height = int(height * ratio) + + # 使用opencv进行高效缩放 + frame_resized = cv2.resize(frame_rgb, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) + pil_image = Image.fromarray(frame_resized) + else: + pil_image = Image.fromarray(frame_rgb) + + # 转换为base64 + buffer = io.BytesIO() + pil_image.save(buffer, format='JPEG', quality=frame_quality) + frame_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') + + # 计算时间戳 + timestamp = target_frame / fps if fps > 0 else 0 + frames.append((frame_base64, timestamp)) + + cap.release() + return frames + + except Exception as e: + # 在子进程中不能使用logger,返回错误信息 + return [("ERROR", str(e))] diff --git a/src/main.py b/src/main.py index ceeec4940..ad251e5a7 100644 --- a/src/main.py +++ b/src/main.py @@ -71,6 +71,19 @@ class MainSystem: def _cleanup(self): """清理资源""" + try: + # 停止消息重组器 + from src.utils.message_chunker import reassembler + import asyncio + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.create_task(reassembler.stop_cleanup_task()) + else: + loop.run_until_complete(reassembler.stop_cleanup_task()) + logger.info("🛑 消息重组器已停止") + except Exception as e: + logger.error(f"停止消息重组器时出错: {e}") + try: # 停止插件热重载系统 hot_reload_manager.stop() @@ -208,6 +221,11 @@ MaiMbot-Pro-Max(第三方修改版) # 将bot.py中的chat_bot.message_process消息处理函数注册到api.py的消息处理基类中 self.app.register_message_handler(chat_bot.message_process) + # 启动消息重组器的清理任务 + from src.utils.message_chunker import reassembler + await reassembler.start_cleanup_task() + logger.info("消息重组器已启动") + # 初始化个体特征 await self.individuality.initialize() # 初始化日程管理器 diff --git a/src/plugins/built_in/permission_management/_manifest.json b/src/plugins/built_in/permission_management/_manifest.json new file mode 100644 index 000000000..b4748ce8c --- /dev/null +++ b/src/plugins/built_in/permission_management/_manifest.json @@ -0,0 +1,39 @@ +{ + "manifest_version": 1, + "name": "插件和组件管理 (Plugin and Component Management)", + "version": "1.0.0", + "description": "通过系统API管理插件和组件的生命周期,包括加载、卸载、启用和禁用等操作。", + "author": { + "name": "MaiBot团队", + "url": "https://github.com/MaiM-with-u" + }, + "license": "GPL-v3.0-or-later", + "host_application": { + "min_version": "0.9.1" + }, + "homepage_url": "https://github.com/MaiM-with-u/maibot", + "repository_url": "https://github.com/MaiM-with-u/maibot", + "keywords": [ + "plugins", + "components", + "management", + "built-in" + ], + "categories": [ + "Core System", + "Plugin Management" + ], + "default_locale": "zh-CN", + "locales_path": "_locales", + "plugin_info": { + "is_built_in": true, + "plugin_type": "permission", + "components": [ + { + "type": "command", + "name": "permission_management", + "description": "管理用户权限,包括添加、删除和修改权限等操作。" + } + ] + } +} \ No newline at end of file diff --git a/src/plugins/built_in/permission_management/plugin.py b/src/plugins/built_in/permission_management/plugin.py index e6b02f5f5..d534acc32 100644 --- a/src/plugins/built_in/permission_management/plugin.py +++ b/src/plugins/built_in/permission_management/plugin.py @@ -12,7 +12,6 @@ from src.plugin_system.base.base_plugin import BasePlugin from src.plugin_system.base.base_command import BaseCommand from src.plugin_system.apis.permission_api import permission_api from src.plugin_system.apis.logging_api import get_logger -from src.common.message import ChatStream, Message from src.plugin_system.base.component_types import CommandInfo from src.plugin_system.base.config_types import ConfigField @@ -25,7 +24,7 @@ class PermissionCommand(BaseCommand): command_name = "permission" command_description = "权限管理命令" - command_pattern = r"^/permission(\s[a-zA-Z0-9_]+)*\s*$)" + command_pattern = r"^/permission" command_help = "/permission <子命令> [参数...]" intercept_message = True @@ -44,33 +43,33 @@ class PermissionCommand(BaseCommand): True ) - def can_execute(self, message: Message, chat_stream: ChatStream) -> bool: + def can_execute(self) -> bool: """检查命令是否可以执行""" # 基本权限检查由权限系统处理 return True - async def execute(self, message: Message, chat_stream: ChatStream, args: List[str]) -> None: + async def execute(self, args: List[str]) -> None: """执行权限管理命令""" if not args: - await self._show_help(chat_stream) + await self._show_help() return subcommand = args[0].lower() remaining_args = args[1:] - + chat_stream = self.message.chat_stream # 检查基本查看权限 can_view = permission_api.check_permission( - chat_stream.user_platform, - chat_stream.user_id, + chat_stream.platform, + chat_stream.user_info.user_id, "plugin.permission.view" - ) or permission_api.is_master(chat_stream.user_platform, chat_stream.user_id) - + ) or permission_api.is_master(chat_stream.platform, chat_stream.user_info.user_id) + # 检查管理权限 can_manage = permission_api.check_permission( - chat_stream.user_platform, - chat_stream.user_id, + chat_stream.platform, + chat_stream.user_info.user_id, "plugin.permission.manage" - ) or permission_api.is_master(chat_stream.user_platform, chat_stream.user_id) + ) or permission_api.is_master(chat_stream.platform, chat_stream.user_info.user_id) if subcommand in ["grant", "授权", "give"]: if not can_manage: @@ -108,7 +107,7 @@ class PermissionCommand(BaseCommand): else: await self.send_text(f"❌ 未知的子命令: {subcommand}\n使用 /permission help 查看帮助") - async def _show_help(self, chat_stream: ChatStream): + async def _show_help(self): """显示帮助信息""" help_text = """📋 权限管理命令帮助 @@ -143,8 +142,8 @@ class PermissionCommand(BaseCommand): return mention return None - - async def _grant_permission(self, chat_stream: ChatStream, args: List[str]): + + async def _grant_permission(self, chat_stream , args: List[str]): """授权用户权限""" if len(args) < 2: await self.send_text("❌ 用法: /permission grant <@用户|QQ号> <权限节点>") @@ -160,14 +159,14 @@ class PermissionCommand(BaseCommand): return # 执行授权 - success = permission_api.grant_permission(chat_stream.user_platform, user_id, permission_node) + success = permission_api.grant_permission(chat_stream.platform, user_id, permission_node) if success: await self.send_text(f"✅ 已授权用户 {user_id} 权限节点 {permission_node}") else: await self.send_text("❌ 授权失败,请检查权限节点是否存在") - - async def _revoke_permission(self, chat_stream: ChatStream, args: List[str]): + + async def _revoke_permission(self, chat_stream, args: List[str]): """撤销用户权限""" if len(args) < 2: await self.send_text("❌ 用法: /permission revoke <@用户|QQ号> <权限节点>") @@ -183,14 +182,14 @@ class PermissionCommand(BaseCommand): return # 执行撤销 - success = permission_api.revoke_permission(chat_stream.user_platform, user_id, permission_node) + success = permission_api.revoke_permission(chat_stream.platform, user_id, permission_node) if success: await self.send_text(f"✅ 已撤销用户 {user_id} 权限节点 {permission_node}") else: await self.send_text("❌ 撤销失败,请检查权限节点是否存在") - async def _list_permissions(self, chat_stream: ChatStream, args: List[str]): + async def _list_permissions(self, chat_stream, args: List[str]): """列出用户权限""" target_user_id = None @@ -203,13 +202,13 @@ class PermissionCommand(BaseCommand): return else: # 查看自己的权限 - target_user_id = chat_stream.user_id + target_user_id = chat_stream.user_info.user_id # 检查是否为Master用户 - is_master = permission_api.is_master(chat_stream.user_platform, target_user_id) + is_master = permission_api.is_master(chat_stream.platform, target_user_id) # 获取用户权限 - permissions = permission_api.get_user_permissions(chat_stream.user_platform, target_user_id) + permissions = permission_api.get_user_permissions(chat_stream.platform, target_user_id) if is_master: response = f"👑 用户 {target_user_id} 是Master用户,拥有所有权限" @@ -221,8 +220,8 @@ class PermissionCommand(BaseCommand): response = f"📋 用户 {target_user_id} 没有任何权限" await self.send_text(response) - - async def _check_permission(self, chat_stream: ChatStream, args: List[str]): + + async def _check_permission(self, chat_stream, args: List[str]): """检查用户权限""" if len(args) < 2: await self.send_text("❌ 用法: /permission check <@用户|QQ号> <权限节点>") @@ -238,8 +237,8 @@ class PermissionCommand(BaseCommand): return # 检查权限 - has_permission = permission_api.check_permission(chat_stream.user_platform, user_id, permission_node) - is_master = permission_api.is_master(chat_stream.user_platform, user_id) + has_permission = permission_api.check_permission(chat_stream.platform, user_id, permission_node) + is_master = permission_api.is_master(chat_stream.platform, user_id) if has_permission: if is_master: @@ -250,8 +249,8 @@ class PermissionCommand(BaseCommand): response = f"❌ 用户 {user_id} 没有权限 {permission_node}" await self.send_text(response) - - async def _list_nodes(self, chat_stream: ChatStream, args: List[str]): + + async def _list_nodes(self, chat_stream, args: List[str]): """列出权限节点""" plugin_name = args[0] if args else None diff --git a/src/plugins/built_in/plugin_management/plugin.py b/src/plugins/built_in/plugin_management/plugin.py index c2489a380..34d33bfba 100644 --- a/src/plugins/built_in/plugin_management/plugin.py +++ b/src/plugins/built_in/plugin_management/plugin.py @@ -433,13 +433,13 @@ class ManagementCommand(BaseCommand): @register_plugin class PluginManagementPlugin(BasePlugin): plugin_name: str = "plugin_management_plugin" - enable_plugin: bool = False + enable_plugin: bool = True dependencies: list[str] = [] python_dependencies: list[str] = [] config_file_name: str = "config.toml" config_schema: dict = { "plugin": { - "enabled": ConfigField(bool, default=False, description="是否启用插件"), + "enabled": ConfigField(bool, default=True, description="是否启用插件"), "config_version": ConfigField(type=str, default="1.1.0", description="配置文件版本"), "permission": ConfigField( list, default=[], description="有权限使用插件管理命令的用户列表,请填写字符串形式的用户ID" diff --git a/src/utils/message_chunker.py b/src/utils/message_chunker.py new file mode 100644 index 000000000..66a854f0f --- /dev/null +++ b/src/utils/message_chunker.py @@ -0,0 +1,160 @@ +""" +MaiBot 端的消息切片处理模块 +用于接收和重组来自 Napcat-Adapter 的切片消息 +""" + +import json +import time +import asyncio +from typing import Dict, Any, Optional +from src.common.logger import get_logger + +logger = get_logger("message_chunker") + + +class MessageReassembler: + """消息重组器,用于重组来自 Ada 的切片消息""" + + def __init__(self, timeout: int = 30): + self.timeout = timeout + self.chunk_buffers: Dict[str, Dict[str, Any]] = {} + self._cleanup_task = None + + async def start_cleanup_task(self): + """启动清理任务""" + if self._cleanup_task is None: + self._cleanup_task = asyncio.create_task(self._cleanup_expired_chunks()) + logger.info("消息重组器清理任务已启动") + + async def stop_cleanup_task(self): + """停止清理任务""" + if self._cleanup_task: + self._cleanup_task.cancel() + try: + await self._cleanup_task + except asyncio.CancelledError: + pass + self._cleanup_task = None + logger.info("消息重组器清理任务已停止") + + async def _cleanup_expired_chunks(self): + """清理过期的切片缓冲区""" + while True: + try: + await asyncio.sleep(10) # 每10秒检查一次 + current_time = time.time() + + expired_chunks = [] + for chunk_id, buffer_info in self.chunk_buffers.items(): + if current_time - buffer_info['timestamp'] > self.timeout: + expired_chunks.append(chunk_id) + + for chunk_id in expired_chunks: + logger.warning(f"清理过期的切片缓冲区: {chunk_id}") + del self.chunk_buffers[chunk_id] + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"清理过期切片时出错: {e}") + + def is_chunk_message(self, message: Dict[str, Any]) -> bool: + """检查是否是来自 Ada 的切片消息""" + return ( + isinstance(message, dict) and + "__mmc_chunk_info__" in message and + "__mmc_chunk_data__" in message and + "__mmc_is_chunked__" in message + ) + + async def process_chunk(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """ + 处理切片消息,如果切片完整则返回重组后的消息 + + Args: + message: 可能的切片消息 + + Returns: + 如果切片完整则返回重组后的原始消息,否则返回None + """ + # 如果不是切片消息,直接返回 + if not self.is_chunk_message(message): + return message + + try: + chunk_info = message["__mmc_chunk_info__"] + chunk_content = message["__mmc_chunk_data__"] + + chunk_id = chunk_info["chunk_id"] + chunk_index = chunk_info["chunk_index"] + total_chunks = chunk_info["total_chunks"] + chunk_timestamp = chunk_info.get("timestamp", time.time()) + + # 初始化缓冲区 + if chunk_id not in self.chunk_buffers: + self.chunk_buffers[chunk_id] = { + "chunks": {}, + "total_chunks": total_chunks, + "received_chunks": 0, + "timestamp": chunk_timestamp + } + logger.debug(f"初始化切片缓冲区: {chunk_id} (总计 {total_chunks} 个切片)") + + buffer = self.chunk_buffers[chunk_id] + + # 检查切片是否已经接收过 + if chunk_index in buffer["chunks"]: + logger.warning(f"重复接收切片: {chunk_id}#{chunk_index}") + return None + + # 添加切片 + buffer["chunks"][chunk_index] = chunk_content + buffer["received_chunks"] += 1 + buffer["timestamp"] = time.time() # 更新时间戳 + + logger.debug(f"接收切片: {chunk_id}#{chunk_index} ({buffer['received_chunks']}/{total_chunks})") + + # 检查是否接收完整 + if buffer["received_chunks"] == total_chunks: + # 重组消息 + reassembled_message = "" + for i in range(total_chunks): + if i not in buffer["chunks"]: + logger.error(f"切片 {chunk_id}#{i} 缺失,无法重组") + return None + reassembled_message += buffer["chunks"][i] + + # 清理缓冲区 + del self.chunk_buffers[chunk_id] + + logger.info(f"消息重组完成: {chunk_id} ({len(reassembled_message)} chars)") + + # 尝试反序列化重组后的消息 + try: + return json.loads(reassembled_message) + except json.JSONDecodeError as e: + logger.error(f"重组消息反序列化失败: {e}") + return None + + # 还没收集完所有切片,返回None表示继续等待 + return None + + except (KeyError, TypeError, ValueError) as e: + logger.error(f"处理切片消息时出错: {e}") + return None + + def get_pending_chunks_info(self) -> Dict[str, Any]: + """获取待处理切片信息""" + info = {} + for chunk_id, buffer in self.chunk_buffers.items(): + info[chunk_id] = { + "received": buffer["received_chunks"], + "total": buffer["total_chunks"], + "progress": f"{buffer['received_chunks']}/{buffer['total_chunks']}", + "age_seconds": time.time() - buffer["timestamp"] + } + return info + + +# 全局重组器实例 +reassembler = MessageReassembler()