diff --git a/plugins/napcat_adapter_plugin/plugin.py b/plugins/napcat_adapter_plugin/plugin.py index 79e13e1b3..b5f31e167 100644 --- a/plugins/napcat_adapter_plugin/plugin.py +++ b/plugins/napcat_adapter_plugin/plugin.py @@ -12,6 +12,7 @@ from src.plugin_system.core.event_manager import event_manager from src.common.logger import get_logger +from .src.message_chunker import chunker, reassembler from .src.recv_handler.message_handler import message_handler from .src.recv_handler.meta_event_handler import meta_event_handler from .src.recv_handler.notice_handler import notice_handler @@ -20,7 +21,7 @@ from .src.send_handler import send_handler from .src.config import global_config from .src.config.features_config import features_manager from .src.config.migrate_features import auto_migrate_features -from .src.mmc_com_layer import mmc_start_com, router +from .src.mmc_com_layer import mmc_start_com, router, mmc_stop_com from .src.response_pool import put_response, check_timeout_response from .src.websocket_manager import websocket_manager @@ -36,6 +37,182 @@ def get_classes_in_module(module): classes.append(member) return classes +async def message_recv(server_connection: Server.ServerConnection): + await message_handler.set_server_connection(server_connection) + asyncio.create_task(notice_handler.set_server_connection(server_connection)) + await send_handler.set_server_connection(server_connection) + async for raw_message in server_connection: + logger.debug(f"{raw_message[:1500]}..." if (len(raw_message) > 1500) else raw_message) + decoded_raw_message: dict = json.loads(raw_message) + try: + # 首先尝试解析原始消息 + decoded_raw_message: dict = json.loads(raw_message) + + # 检查是否是切片消息 (来自 MMC) + if chunker.is_chunk_message(decoded_raw_message): + logger.debug("接收到切片消息,尝试重组") + # 尝试重组消息 + reassembled_message = await reassembler.add_chunk(decoded_raw_message) + if reassembled_message: + # 重组完成,处理完整消息 + logger.debug("消息重组完成,处理完整消息") + decoded_raw_message = reassembled_message + else: + # 切片尚未完整,继续等待更多切片 + logger.debug("等待更多切片...") + continue + + # 处理完整消息(可能是重组后的,也可能是原本就完整的) + post_type = decoded_raw_message.get("post_type") + if post_type in ["meta_event", "message", "notice"]: + await message_queue.put(decoded_raw_message) + elif post_type is None: + await put_response(decoded_raw_message) + + except json.JSONDecodeError as e: + logger.error(f"消息解析失败: {e}") + logger.debug(f"原始消息: {raw_message[:500]}...") + except Exception as e: + logger.error(f"处理消息时出错: {e}") + logger.debug(f"原始消息: {raw_message[:500]}...") + +async def message_process(): + """消息处理主循环""" + logger.info("消息处理器已启动") + try: + while True: + try: + # 使用超时等待,以便能够响应取消请求 + message = await asyncio.wait_for(message_queue.get(), timeout=1.0) + + post_type = message.get("post_type") + if post_type == "message": + await message_handler.handle_raw_message(message) + elif post_type == "meta_event": + await meta_event_handler.handle_meta_event(message) + elif post_type == "notice": + await notice_handler.handle_notice(message) + else: + logger.warning(f"未知的post_type: {post_type}") + + message_queue.task_done() + await asyncio.sleep(0.05) + + except asyncio.TimeoutError: + # 超时是正常的,继续循环 + continue + except asyncio.CancelledError: + logger.info("消息处理器收到取消信号") + break + except Exception as e: + logger.error(f"处理消息时出错: {e}") + # 即使出错也标记任务完成,避免队列阻塞 + try: + message_queue.task_done() + except ValueError: + pass + await asyncio.sleep(0.1) + + except asyncio.CancelledError: + logger.info("消息处理器已停止") + raise + except Exception as e: + logger.error(f"消息处理器异常: {e}") + raise + finally: + logger.info("消息处理器正在清理...") + # 清空剩余的队列项目 + try: + while not message_queue.empty(): + try: + message_queue.get_nowait() + message_queue.task_done() + except asyncio.QueueEmpty: + break + except Exception as e: + logger.debug(f"清理消息队列时出错: {e}") + +async def napcat_server(): + """启动 Napcat WebSocket 连接(支持正向和反向连接)""" + mode = global_config.napcat_server.mode + logger.info(f"正在启动 adapter,连接模式: {mode}") + + try: + await websocket_manager.start_connection(message_recv) + except Exception as e: + logger.error(f"启动 WebSocket 连接失败: {e}") + raise + +async def graceful_shutdown(): + """优雅关闭所有组件""" + try: + logger.info("正在关闭adapter...") + + # 停止消息重组器的清理任务 + try: + await reassembler.stop_cleanup_task() + except Exception as e: + logger.warning(f"停止消息重组器清理任务时出错: {e}") + + # 停止功能管理器文件监控 + try: + await features_manager.stop_file_watcher() + except Exception as e: + logger.warning(f"停止功能管理器文件监控时出错: {e}") + + # 关闭消息处理器(包括消息缓冲器) + try: + await message_handler.shutdown() + except Exception as e: + logger.warning(f"关闭消息处理器时出错: {e}") + + # 关闭 WebSocket 连接 + try: + await websocket_manager.stop_connection() + except Exception as e: + logger.warning(f"关闭WebSocket连接时出错: {e}") + + # 关闭 MaiBot 连接 + try: + await mmc_stop_com() + except Exception as e: + logger.warning(f"关闭MaiBot连接时出错: {e}") + + # 取消所有剩余任务 + current_task = asyncio.current_task() + tasks = [t for t in asyncio.all_tasks() if t is not current_task and not t.done()] + + if tasks: + logger.info(f"正在取消 {len(tasks)} 个剩余任务...") + for task in tasks: + task.cancel() + + # 等待任务取消完成,忽略 CancelledError + try: + await asyncio.wait_for( + asyncio.gather(*tasks, return_exceptions=True), + timeout=10 + ) + except asyncio.TimeoutError: + logger.warning("部分任务取消超时") + except Exception as e: + logger.debug(f"任务取消过程中的异常(可忽略): {e}") + + logger.info("Adapter已成功关闭") + + except Exception as e: + logger.error(f"Adapter关闭中出现错误: {e}") + finally: + # 确保消息队列被清空 + try: + while not message_queue.empty(): + try: + message_queue.get_nowait() + message_queue.task_done() + except asyncio.QueueEmpty: + break + except Exception: + pass class LauchNapcatAdapterHandler(BaseEventHandler): """自动启动Adapter""" @@ -46,50 +223,15 @@ class LauchNapcatAdapterHandler(BaseEventHandler): intercept_message: bool = False init_subscribe = [EventType.ON_START] - async def message_recv(self, server_connection: Server.ServerConnection): - await message_handler.set_server_connection(server_connection) - asyncio.create_task(notice_handler.set_server_connection(server_connection)) - await send_handler.set_server_connection(server_connection) - async for raw_message in server_connection: - logger.debug(f"{raw_message[:1500]}..." if (len(raw_message) > 1500) else raw_message) - decoded_raw_message: dict = json.loads(raw_message) - post_type = decoded_raw_message.get("post_type") - if post_type in ["meta_event", "message", "notice"]: - await message_queue.put(decoded_raw_message) - elif post_type is None: - await put_response(decoded_raw_message) - - async def message_process(self): - while True: - message = await message_queue.get() - post_type = message.get("post_type") - if post_type == "message": - await message_handler.handle_raw_message(message) - elif post_type == "meta_event": - await meta_event_handler.handle_meta_event(message) - elif post_type == "notice": - await notice_handler.handle_notice(message) - else: - logger.warning(f"未知的post_type: {post_type}") - message_queue.task_done() - await asyncio.sleep(0.05) - - async def napcat_server(self): - """启动 Napcat WebSocket 连接(支持正向和反向连接)""" - mode = global_config.napcat_server.mode - logger.info(f"正在启动 adapter,连接模式: {mode}") - - try: - await websocket_manager.start_connection(self.message_recv) - except Exception as e: - logger.error(f"启动 WebSocket 连接失败: {e}") - raise - async def execute(self, kwargs): # 执行功能配置迁移(如果需要) logger.info("检查功能配置迁移...") auto_migrate_features() + # 启动消息重组器的清理任务 + logger.info("启动消息重组器...") + await reassembler.start_cleanup_task() + # 初始化功能管理器 logger.info("正在初始化功能管理器...") features_manager.load_config() @@ -98,11 +240,25 @@ class LauchNapcatAdapterHandler(BaseEventHandler): logger.info("开始启动Napcat Adapter") message_send_instance.maibot_router = router # 创建单独的异步任务,防止阻塞主线程 - asyncio.create_task(self.napcat_server()) + asyncio.create_task(napcat_server()) asyncio.create_task(mmc_start_com()) - asyncio.create_task(self.message_process()) + asyncio.create_task(message_process()) asyncio.create_task(check_timeout_response()) +class StopNapcatAdapterHandler(BaseEventHandler): + """关闭Adapter""" + + handler_name: str = "stop_napcat_adapter_handler" + handler_description: str = "关闭napcat adapter" + weight: int = 100 + intercept_message: bool = False + init_subscribe = [EventType.ON_STOP] + + async def execute(self, kwargs): + await graceful_shutdown() + return + + @register_plugin class NapcatAdapterPlugin(BasePlugin): plugin_name = CONSTS.PLUGIN_NAME @@ -142,7 +298,7 @@ class NapcatAdapterPlugin(BasePlugin): components = [] components.append((LauchNapcatAdapterHandler.get_handler_info(), LauchNapcatAdapterHandler)) - + components.append((StopNapcatAdapterHandler.get_handler_info(), StopNapcatAdapterHandler)) for handler in get_classes_in_module(event_handlers): if issubclass(handler, BaseEventHandler): components.append((handler.get_handler_info(), handler)) diff --git a/plugins/napcat_adapter_plugin/src/config/config.py b/plugins/napcat_adapter_plugin/src/config/config.py index 5f3c2b8d7..97b3f57e3 100644 --- a/plugins/napcat_adapter_plugin/src/config/config.py +++ b/plugins/napcat_adapter_plugin/src/config/config.py @@ -18,6 +18,7 @@ from .official_configs import ( MaiBotServerConfig, NapcatServerConfig, NicknameConfig, + SlicingConfig, VoiceConfig, ) @@ -120,6 +121,7 @@ class Config(ConfigBase): napcat_server: NapcatServerConfig maibot_server: MaiBotServerConfig voice: VoiceConfig + slicing: SlicingConfig debug: DebugConfig diff --git a/plugins/napcat_adapter_plugin/src/config/official_configs.py b/plugins/napcat_adapter_plugin/src/config/official_configs.py index 23be6c312..ba2c19e12 100644 --- a/plugins/napcat_adapter_plugin/src/config/official_configs.py +++ b/plugins/napcat_adapter_plugin/src/config/official_configs.py @@ -58,7 +58,14 @@ class VoiceConfig(ConfigBase): use_tts: bool = False """是否启用TTS功能""" - +@dataclass +class SlicingConfig(ConfigBase): + max_frame_size: int = 64 + """WebSocket帧的最大大小,单位为字节,默认64KB""" + + delay_ms: int = 10 + """切片发送间隔时间,单位为毫秒""" + @dataclass class DebugConfig(ConfigBase): level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO" diff --git a/plugins/napcat_adapter_plugin/src/message_chunker.py b/plugins/napcat_adapter_plugin/src/message_chunker.py new file mode 100644 index 000000000..9ba27f3c0 --- /dev/null +++ b/plugins/napcat_adapter_plugin/src/message_chunker.py @@ -0,0 +1,270 @@ +""" +消息切片处理模块 +用于在 Ada 发送给 MMC 时进行消息切片,利用 WebSocket 协议的自动重组特性 +仅在 Ada -> MMC 方向进行切片,其他方向(MMC -> Ada,Ada <-> Napcat)不切片 +""" +import json +import uuid +import asyncio +import time +from typing import List, Dict, Any, Optional, Union +from .config import global_config + +from src.common.logger import get_logger + +logger = get_logger("napcat_adapter") + + + +class MessageChunker: + """消息切片器,用于处理大消息的分片发送""" + + def __init__(self): + self.max_chunk_size = global_config.slicing.max_frame_size * 1024 + + def should_chunk_message(self, message: Union[str, Dict[str, Any]]) -> bool: + """判断消息是否需要切片""" + try: + if isinstance(message, dict): + message_str = json.dumps(message, ensure_ascii=False) + else: + message_str = message + return len(message_str.encode('utf-8')) > self.max_chunk_size + except Exception as e: + logger.error(f"检查消息大小时出错: {e}") + return False + + def chunk_message(self, message: Union[str, Dict[str, Any]], chunk_id: Optional[str] = None) -> List[Dict[str, Any]]: + """ + 将消息切片 + + Args: + message: 要切片的消息(字符串或字典) + chunk_id: 切片组ID,如果不提供则自动生成 + + Returns: + 切片后的消息字典列表 + """ + try: + # 统一转换为字符串 + if isinstance(message, dict): + message_str = json.dumps(message, ensure_ascii=False) + else: + message_str = message + + if not self.should_chunk_message(message_str): + # 不需要切片的情况,如果输入是字典则返回字典,如果是字符串则包装成非切片标记的字典 + if isinstance(message, dict): + return [message] + else: + return [{"_original_message": message_str}] + + if chunk_id is None: + chunk_id = str(uuid.uuid4()) + + message_bytes = message_str.encode('utf-8') + total_size = len(message_bytes) + + # 计算需要多少个切片 + num_chunks = (total_size + self.max_chunk_size - 1) // self.max_chunk_size + + chunks = [] + for i in range(num_chunks): + start_pos = i * self.max_chunk_size + end_pos = min(start_pos + self.max_chunk_size, total_size) + + chunk_data = message_bytes[start_pos:end_pos] + + # 构建切片消息 + chunk_message = { + "__mmc_chunk_info__": { + "chunk_id": chunk_id, + "chunk_index": i, + "total_chunks": num_chunks, + "chunk_size": len(chunk_data), + "total_size": total_size, + "timestamp": time.time() + }, + "__mmc_chunk_data__": chunk_data.decode('utf-8', errors='ignore'), + "__mmc_is_chunked__": True + } + + chunks.append(chunk_message) + + logger.debug(f"消息切片完成: {total_size} bytes -> {num_chunks} chunks (ID: {chunk_id})") + return chunks + + except Exception as e: + logger.error(f"消息切片时出错: {e}") + # 出错时返回原消息 + if isinstance(message, dict): + return [message] + else: + return [{"_original_message": message}] + + def is_chunk_message(self, message: Union[str, Dict[str, Any]]) -> bool: + """判断是否是切片消息""" + try: + if isinstance(message, str): + data = json.loads(message) + else: + data = message + + return ( + isinstance(data, dict) and + "__mmc_chunk_info__" in data and + "__mmc_chunk_data__" in data and + "__mmc_is_chunked__" in data + ) + except (json.JSONDecodeError, TypeError): + return False + + +class MessageReassembler: + """消息重组器,用于重组接收到的切片消息""" + + def __init__(self, timeout: int = 30): + self.timeout = timeout + self.chunk_buffers: Dict[str, Dict[str, Any]] = {} + self._cleanup_task = None + + async def start_cleanup_task(self): + """启动清理任务""" + if self._cleanup_task is None: + self._cleanup_task = asyncio.create_task(self._cleanup_expired_chunks()) + + async def stop_cleanup_task(self): + """停止清理任务""" + if self._cleanup_task: + self._cleanup_task.cancel() + try: + await self._cleanup_task + except asyncio.CancelledError: + pass + self._cleanup_task = None + + async def _cleanup_expired_chunks(self): + """清理过期的切片缓冲区""" + while True: + try: + await asyncio.sleep(10) # 每10秒检查一次 + current_time = time.time() + + expired_chunks = [] + for chunk_id, buffer_info in self.chunk_buffers.items(): + if current_time - buffer_info['timestamp'] > self.timeout: + expired_chunks.append(chunk_id) + + for chunk_id in expired_chunks: + logger.warning(f"清理过期的切片缓冲区: {chunk_id}") + del self.chunk_buffers[chunk_id] + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"清理过期切片时出错: {e}") + + async def add_chunk(self, message: Union[str, Dict[str, Any]]) -> Optional[Dict[str, Any]]: + """ + 添加切片,如果切片完整则返回重组后的消息 + + Args: + message: 切片消息(字符串或字典) + + Returns: + 如果切片完整则返回重组后的原始消息字典,否则返回None + """ + try: + # 统一转换为字典 + if isinstance(message, str): + chunk_data = json.loads(message) + else: + chunk_data = message + + # 检查是否是切片消息 + if not chunker.is_chunk_message(chunk_data): + # 不是切片消息,直接返回 + if "_original_message" in chunk_data: + # 这是一个被包装的非切片消息,解包返回 + try: + return json.loads(chunk_data["_original_message"]) + except json.JSONDecodeError: + return {"text_message": chunk_data["_original_message"]} + else: + return chunk_data + + chunk_info = chunk_data["__mmc_chunk_info__"] + chunk_content = chunk_data["__mmc_chunk_data__"] + + chunk_id = chunk_info["chunk_id"] + chunk_index = chunk_info["chunk_index"] + total_chunks = chunk_info["total_chunks"] + chunk_timestamp = chunk_info.get("timestamp", time.time()) + + # 初始化缓冲区 + if chunk_id not in self.chunk_buffers: + self.chunk_buffers[chunk_id] = { + "chunks": {}, + "total_chunks": total_chunks, + "received_chunks": 0, + "timestamp": chunk_timestamp + } + + buffer = self.chunk_buffers[chunk_id] + + # 检查切片是否已经接收过 + if chunk_index in buffer["chunks"]: + logger.warning(f"重复接收切片: {chunk_id}#{chunk_index}") + return None + + # 添加切片 + buffer["chunks"][chunk_index] = chunk_content + buffer["received_chunks"] += 1 + buffer["timestamp"] = time.time() # 更新时间戳 + + logger.debug(f"接收切片: {chunk_id}#{chunk_index} ({buffer['received_chunks']}/{total_chunks})") + + # 检查是否接收完整 + if buffer["received_chunks"] == total_chunks: + # 重组消息 + reassembled_message = "" + for i in range(total_chunks): + if i not in buffer["chunks"]: + logger.error(f"切片 {chunk_id}#{i} 缺失,无法重组") + return None + reassembled_message += buffer["chunks"][i] + + # 清理缓冲区 + del self.chunk_buffers[chunk_id] + + logger.debug(f"消息重组完成: {chunk_id} ({len(reassembled_message)} chars)") + + # 尝试反序列化重组后的消息 + try: + return json.loads(reassembled_message) + except json.JSONDecodeError: + # 如果不能反序列化为JSON,则作为文本消息返回 + return {"text_message": reassembled_message} + + return None + + except (json.JSONDecodeError, KeyError, TypeError) as e: + logger.error(f"处理切片消息时出错: {e}") + return None + + def get_pending_chunks_info(self) -> Dict[str, Any]: + """获取待处理切片信息""" + info = {} + for chunk_id, buffer in self.chunk_buffers.items(): + info[chunk_id] = { + "received": buffer["received_chunks"], + "total": buffer["total_chunks"], + "progress": f"{buffer['received_chunks']}/{buffer['total_chunks']}", + "age_seconds": time.time() - buffer["timestamp"] + } + return info + + +# 全局实例 +chunker = MessageChunker() +reassembler = MessageReassembler() \ No newline at end of file diff --git a/plugins/napcat_adapter_plugin/src/recv_handler/message_handler.py b/plugins/napcat_adapter_plugin/src/recv_handler/message_handler.py index 9d58828ed..aad211106 100644 --- a/plugins/napcat_adapter_plugin/src/recv_handler/message_handler.py +++ b/plugins/napcat_adapter_plugin/src/recv_handler/message_handler.py @@ -734,9 +734,83 @@ class MessageHandler: return Seg(type="text", data="[向你发送了窗口抖动,现在你的屏幕猛烈地震了一下!]") async def handle_json_message(self, raw_message: dict) -> Seg: - message_data: str = raw_message.get("data", "").get("data", "") - res = json.loads(message_data) - return Seg(type="json", data=res) + """ + 处理JSON消息 + Parameters: + raw_message: dict: 原始消息 + Returns: + seg_data: Seg: 处理后的消息段 + """ + message_data: dict = raw_message.get("data", {}) + json_data = message_data.get("data", "") + + # 检查JSON消息格式 + if not message_data or "data" not in message_data: + logger.warning("JSON消息格式不正确") + return Seg(type="json", data=json.dumps(message_data)) + + try: + nested_data = json.loads(json_data) + + # 检查是否是QQ小程序分享消息 + if "app" in nested_data and "com.tencent.miniapp" in str(nested_data.get("app", "")): + logger.debug("检测到QQ小程序分享消息,开始提取信息") + + # 提取目标字段 + extracted_info = {} + + # 提取 meta.detail_1 中的信息 + meta = nested_data.get("meta", {}) + detail_1 = meta.get("detail_1", {}) + + if detail_1: + extracted_info["title"] = detail_1.get("title", "") + extracted_info["desc"] = detail_1.get("desc", "") + qqdocurl = detail_1.get("qqdocurl", "") + + # 从qqdocurl中提取b23.tv短链接 + if qqdocurl and "b23.tv" in qqdocurl: + # 查找b23.tv链接的起始位置 + start_pos = qqdocurl.find("https://b23.tv/") + if start_pos != -1: + # 提取从https://b23.tv/开始的部分 + b23_part = qqdocurl[start_pos:] + # 查找第一个?的位置,截取到?之前 + question_pos = b23_part.find("?") + if question_pos != -1: + extracted_info["short_url"] = b23_part[:question_pos] + else: + extracted_info["short_url"] = b23_part + else: + extracted_info["short_url"] = qqdocurl + else: + extracted_info["short_url"] = qqdocurl + + # 如果成功提取到关键信息,返回格式化的文本 + if extracted_info.get("title") or extracted_info.get("desc") or extracted_info.get("short_url"): + content_parts = [] + + if extracted_info.get("title"): + content_parts.append(f"来源: {extracted_info['title']}") + + if extracted_info.get("desc"): + content_parts.append(f"标题: {extracted_info['desc']}") + + if extracted_info.get("short_url"): + content_parts.append(f"链接: {extracted_info['short_url']}") + + formatted_content = "\n".join(content_parts) + return Seg(type="text", data=f"这是一条小程序分享消息,可以根据来源,考虑使用对应解析工具\n{formatted_content}") + + # 如果没有提取到关键信息,返回None + return None + + except json.JSONDecodeError as e: + logger.error(f"解析JSON消息失败: {e}") + return None + except Exception as e: + logger.error(f"处理JSON消息时出错: {e}") + return None async def handle_rps_message(self, raw_message: dict) -> Seg: message_data: dict = raw_message.get("data", {}) diff --git a/plugins/napcat_adapter_plugin/src/recv_handler/message_sending.py b/plugins/napcat_adapter_plugin/src/recv_handler/message_sending.py index 989e541d5..4e6ccba64 100644 --- a/plugins/napcat_adapter_plugin/src/recv_handler/message_sending.py +++ b/plugins/napcat_adapter_plugin/src/recv_handler/message_sending.py @@ -1,4 +1,9 @@ +import json +import asyncio + from src.common.logger import get_logger +from src.message_chunker import chunker +from src.config import global_config logger = get_logger("napcat_adapter") from maim_message import MessageBase, Router @@ -16,18 +21,56 @@ class MessageSending: async def message_send(self, message_base: MessageBase) -> bool: """ - 发送消息 + 发送消息(Ada -> MMC 方向,需要实现切片) Parameters: message_base: MessageBase: 消息基类,包含发送目标和消息内容等信息 """ try: - send_status = await self.maibot_router.send_message(message_base) - if not send_status: - raise RuntimeError("可能是路由未正确配置或连接异常") - return send_status + # 检查是否需要切片发送 + message_dict = message_base.to_dict() + + if chunker.should_chunk_message(message_dict): + logger.info(f"消息过大,进行切片发送到 MaiBot") + + # 切片消息 + chunks = chunker.chunk_message(message_dict) + + # 逐个发送切片 + for i, chunk in enumerate(chunks): + logger.debug(f"发送切片 {i+1}/{len(chunks)} 到 MaiBot") + + # 获取对应的客户端并发送切片 + platform = message_base.message_info.platform + if platform not in self.maibot_router.clients: + logger.error(f"平台 {platform} 未连接") + return False + + client = self.maibot_router.clients[platform] + send_status = await client.send_message(chunk) + + if not send_status: + logger.error(f"发送切片 {i+1}/{len(chunks)} 失败") + return False + + # 使用配置中的延迟时间 + if i < len(chunks) - 1: + delay_seconds = global_config.slicing.delay_ms / 1000.0 + logger.debug(f"切片发送延迟: {global_config.slicing.delay_ms}毫秒") + await asyncio.sleep(delay_seconds) + + logger.debug("所有切片发送完成") + return True + else: + # 直接发送小消息 + send_status = await self.maibot_router.send_message(message_base) + if not send_status: + raise RuntimeError("可能是路由未正确配置或连接异常") + return send_status + except Exception as e: logger.error(f"发送消息失败: {str(e)}") logger.error("请检查与MaiBot之间的连接") + return False message_send_instance = MessageSending() diff --git a/plugins/napcat_adapter_plugin/template/features_template.toml b/plugins/napcat_adapter_plugin/template/features_template.toml index 195cedacd..679267ab2 100644 --- a/plugins/napcat_adapter_plugin/template/features_template.toml +++ b/plugins/napcat_adapter_plugin/template/features_template.toml @@ -40,4 +40,4 @@ message_buffer_enable_private = true # 是否启用私聊消息缓冲 message_buffer_interval = 3.0 # 消息合并间隔时间(秒),在此时间内的连续消息将被合并 message_buffer_initial_delay = 0.5 # 消息缓冲初始延迟(秒),收到第一条消息后等待此时间开始合并 message_buffer_max_components = 50 # 单个会话最大缓冲消息组件数量,超过此数量将强制合并 -message_buffer_block_prefixes = ["/", "!", "!", ".", "。", "#", "%"] # 消息缓冲屏蔽前缀,以这些前缀开头的消息不会被缓冲 \ No newline at end of file +message_buffer_block_prefixes = ["/"] # 消息缓冲屏蔽前缀,以这些前缀开头的消息不会被缓冲 \ No newline at end of file diff --git a/plugins/napcat_adapter_plugin/template/template_config.toml b/plugins/napcat_adapter_plugin/template/template_config.toml index 1ddca6cf5..a06906ad3 100644 --- a/plugins/napcat_adapter_plugin/template/template_config.toml +++ b/plugins/napcat_adapter_plugin/template/template_config.toml @@ -1,5 +1,5 @@ [inner] -version = "0.2.0" # 版本号 +version = "0.2.1" # 版本号 # 请勿修改版本号,除非你知道自己在做什么 [nickname] # 现在没用 @@ -20,6 +20,10 @@ port = 8000 # 麦麦在.env文件中设置的端口,即PORT字段 [voice] # 发送语音设置 use_tts = false # 是否使用tts语音(请确保你配置了tts并有对应的adapter) +[slicing] # WebSocket消息切片设置 +max_frame_size = 64 # WebSocket帧的最大大小,单位为字节,默认64KB +delay_ms = 10 # 切片发送间隔时间,单位为毫秒 + [debug] level = "INFO" # 日志等级(DEBUG, INFO, WARNING, ERROR, CRITICAL)