From 05cba1bb09bb285bbc9b9dec77f27ee351d50269 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9B=85=E8=AF=BA=E7=8B=90?= <212194964+foxcyber907@users.noreply.github.com> Date: Tue, 12 Aug 2025 17:32:45 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=86=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=E5=8F=91=E9=80=81api?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/plugins/api/adapter-command-api.md | 241 ++++++++++++++++++++++++ src/chat/message_receive/bot.py | 26 +++ src/chat/message_receive/storage.py | 5 + src/plugin_system/apis/send_api.py | 167 +++++++++++++++- 4 files changed, 438 insertions(+), 1 deletion(-) create mode 100644 docs/plugins/api/adapter-command-api.md diff --git a/docs/plugins/api/adapter-command-api.md b/docs/plugins/api/adapter-command-api.md new file mode 100644 index 000000000..b3e9863f3 --- /dev/null +++ b/docs/plugins/api/adapter-command-api.md @@ -0,0 +1,241 @@ +# 适配器命令API + +这个API允许插件向适配器发送命令并获取返回值,可以用于获取群列表、好友列表等功能。 + +## 主要功能 + +### 向适配器发送命令并获取返回值 + +```python +async def adapter_command_to_stream( + action: str, + params: dict, + stream_id: Optional[str] = None, + timeout: float = 30.0, + storage_message: bool = False, + show_log: bool = True, +) -> dict: +``` + +向适配器发送命令并等待响应。 + +**Args:** +- `action` (str): 适配器命令动作,如"get_group_list"、"get_friend_list"等 +- `params` (dict): 命令参数字典 +- `stream_id` (Optional[str]): 聊天流ID,可选,如果不提供则自动生成一个 +- `timeout` (float): 超时时间(秒),默认30秒 +- `storage_message` (bool): 是否存储消息到数据库,默认False +- `show_log` (bool): 是否显示日志,默认True + + +简化版的适配器命令调用,自动生成stream_id。 + +**Args:** +- `action` (str): 适配器命令动作 +- `params` (dict): 命令参数字典,默认为空字典 +- `timeout` (float): 超时时间(秒),默认30秒 +- `show_log` (bool): 是否显示日志,默认True + +**Returns:** +- `dict` - 适配器返回的响应,格式为: + - 成功: `{"status": "ok", "data": {...}, "message": "..."}` + - 失败: `{"status": "failed", "message": "错误信息"}` + - 错误: `{"status": "error", "message": "错误信息"}` + +## 使用示例 + +### 1. 获取群列表 + +```python +from src.plugin_system.apis import send_api + +async def get_group_list(stream_id: str): + """获取机器人加入的群列表""" + + response = await send_api.adapter_command_to_stream( + action="get_group_list", + params={}, + stream_id=stream_id + ) + + if response["status"] == "ok": + group_list = response.get("data", []) + logger.info(f"获取到 {len(group_list)} 个群") + return group_list + else: + logger.error(f"获取群列表失败: {response['message']}") + return [] +``` + +### 2. 获取好友列表 + +```python +async def get_friend_list(stream_id: str): + """获取机器人的好友列表""" + + response = await send_api.adapter_command_to_stream( + action="get_friend_list", + params={}, + stream_id=stream_id + ) + + if response["status"] == "ok": + friend_list = response.get("data", []) + logger.info(f"获取到 {len(friend_list)} 个好友") + return friend_list + else: + logger.error(f"获取好友列表失败: {response['message']}") + return [] +``` + +### 3. 获取群成员信息 + +```python +async def get_group_member_info(stream_id: str, group_id: int, user_id: int): + """获取群成员信息""" + + response = await send_api.adapter_command_to_stream( + action="get_group_member_info", + params={ + "group_id": group_id, + "user_id": user_id + }, + stream_id=stream_id + ) + + if response["status"] == "ok": + member_info = response.get("data", {}) + logger.info(f"获取到群成员信息: {member_info}") + return member_info + else: + logger.error(f"获取群成员信息失败: {response['message']}") + return None +``` + +### 4. 在命令插件中使用 + +```python +from src.plugin_system.base.base_command import BaseCommand +from src.plugin_system.apis import send_api +from typing import Tuple + +class GetGroupListCommand(BaseCommand): + """获取群列表命令""" + + command_name = "get_groups" + command_description = "获取机器人加入的群列表" + command_pattern = r"^/get_groups$" + command_help = "获取机器人加入的群列表" + command_examples = ["/get_groups"] + intercept_message = True + + async def execute(self) -> Tuple[bool, str, bool]: + try: + # 获取聊天流ID + stream_id = self.message.chat_stream.stream_id + + # 调用适配器命令API + response = await send_api.adapter_command_to_stream( + action="get_group_list", + params={}, + stream_id=stream_id + ) + + if response["status"] == "ok": + group_list = response.get("data", []) + + if group_list: + # 格式化群列表信息 + group_info = "\\n".join([ + f"群号: {group['group_id']}, 群名: {group['group_name']}" + for group in group_list + ]) + await self.send_text(f"机器人加入的群列表:\\n{group_info}") + else: + await self.send_text("机器人未加入任何群") + + return True, "获取群列表成功", True + else: + await self.send_text(f"获取群列表失败: {response['message']}") + return False, "获取群列表失败", True + + except Exception as e: + logger.error(f"执行获取群列表命令时出错: {e}") + await self.send_text("命令执行失败") + return False, "命令执行失败", True +``` + +### 5. 带超时设置的调用 + +```python +async def get_group_info_with_timeout(stream_id: str, group_id: int): + """获取群信息,设置较短的超时时间""" + + response = await send_api.adapter_command_to_stream( + action="get_group_info", + params={"group_id": group_id}, + stream_id=stream_id, + timeout=10.0, # 10秒超时 + show_log=True + ) + + if response["status"] == "ok": + return response.get("data", {}) + elif response["status"] == "error" and "timeout" in response["message"]: + logger.warning("获取群信息超时") + return None + else: + logger.error(f"获取群信息失败: {response['message']}") + return None +``` + +## 支持的适配器命令 + +以下是一些常用的适配器命令(具体支持的命令取决于适配器实现): + +- `get_group_list` - 获取群列表 +- `get_friend_list` - 获取好友列表 +- `get_group_info` - 获取群信息 +- `get_group_member_list` - 获取群成员列表 +- `get_group_member_info` - 获取群成员信息 +- `get_stranger_info` - 获取陌生人信息 +- `get_msg` - 获取消息 +- `get_forward_msg` - 获取转发消息 + +## 注意事项 + +1. 所有命令都是异步执行的,需要使用`await`关键字 +2. 超时时间建议根据命令复杂度设置,默认30秒 +3. 命令失败时会返回错误信息,建议进行错误处理 +4. 不建议在消息存储中记录适配器命令消息 +5. 适配器命令的具体参数和返回值格式请参考NapCat文档 + +## 错误处理 + +```python +async def safe_adapter_command(stream_id: str, action: str, params: dict): + """安全的适配器命令调用""" + try: + response = await send_api.adapter_command_to_stream( + action=action, + params=params, + stream_id=stream_id, + timeout=15.0 + ) + + if response["status"] == "ok": + return response.get("data") + elif response["status"] == "error": + if "timeout" in response["message"]: + logger.warning(f"适配器命令 {action} 超时") + else: + logger.error(f"适配器命令 {action} 执行错误: {response['message']}") + else: + logger.error(f"适配器命令 {action} 执行失败: {response['message']}") + + return None + + except Exception as e: + logger.error(f"调用适配器命令时出错: {e}") + return None +``` diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 9a8c1b630..e1997bd07 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -151,6 +151,32 @@ class ChatBot: # print(message) return True + + # 处理适配器响应消息 + if hasattr(message, 'message_segment') and message.message_segment: + if message.message_segment.type == "adapter_response": + await self.handle_adapter_response(message) + return True + + return False + + async def handle_adapter_response(self, message: MessageRecv): + """处理适配器命令响应""" + try: + from src.plugin_system.apis.send_api import put_adapter_response + + seg_data = message.message_segment.data + request_id = seg_data.get("request_id") + response_data = seg_data.get("response") + + if request_id and response_data: + logger.debug(f"收到适配器响应: request_id={request_id}") + put_adapter_response(request_id, response_data) + else: + logger.warning("适配器响应消息格式不正确") + + except Exception as e: + logger.error(f"处理适配器响应时出错: {e}") async def do_s4u(self, message_data: Dict[str, Any]): message = MessageRecvS4U(message_data) diff --git a/src/chat/message_receive/storage.py b/src/chat/message_receive/storage.py index b09c55e75..c0041efa2 100644 --- a/src/chat/message_receive/storage.py +++ b/src/chat/message_receive/storage.py @@ -118,7 +118,12 @@ class MessageStorage: mmc_message_id = message.message_info.message_id # 修复:正确访问message_id if message.message_segment.type == "notify": qq_message_id = message.message_segment.data.get("id") + elif message.message_segment.type == "text": + qq_message_id = message.message_segment.data.get("id") + elif message.message_segment.type == "reply": + qq_message_id = message.message_segment.data.get("id") logger.info(f"更新消息ID完成,消息ID为{qq_message_id}") + else: logger.info(f"更新消息ID错误,seg类型为{message.message_segment.type}") return diff --git a/src/plugin_system/apis/send_api.py b/src/plugin_system/apis/send_api.py index 10fbd804e..9a1ce212f 100644 --- a/src/plugin_system/apis/send_api.py +++ b/src/plugin_system/apis/send_api.py @@ -17,16 +17,28 @@ # 方式3:使用通用custom_message函数 await send_api.custom_message("video", video_data, "123456", True) + + # 方式4:向适配器发送命令并获取返回值 + response = await send_api.adapter_command_to_stream( + "get_group_list", {}, stream_id + ) + if response["status"] == "ok": + group_list = response.get("data", []) + + """ import traceback import time import difflib -from typing import Optional, Union +import asyncio +from typing import Optional, Union, Dict, Any from src.common.logger import get_logger # 导入依赖 from src.chat.message_receive.chat_stream import get_chat_manager +from maim_message import UserInfo, GroupInfo +from src.chat.message_receive.chat_stream import ChatStream from src.chat.message_receive.uni_message_sender import HeartFCSender from src.chat.message_receive.message import MessageSending, MessageRecv from src.chat.utils.chat_message_builder import get_raw_msg_before_timestamp_with_chat, replace_user_references_async @@ -36,6 +48,33 @@ from src.config.config import global_config logger = get_logger("send_api") +# 适配器命令响应等待池 +_adapter_response_pool: Dict[str, asyncio.Future] = {} + + +def put_adapter_response(request_id: str, response_data: dict) -> None: + """将适配器响应放入响应池""" + if request_id in _adapter_response_pool: + future = _adapter_response_pool.pop(request_id) + if not future.done(): + future.set_result(response_data) + + +async def wait_adapter_response(request_id: str, timeout: float = 30.0) -> dict: + """等待适配器响应""" + future = asyncio.Future() + _adapter_response_pool[request_id] = future + + try: + response = await asyncio.wait_for(future, timeout=timeout) + return response + except asyncio.TimeoutError: + _adapter_response_pool.pop(request_id, None) + return {"status": "error", "message": "timeout"} + except Exception as e: + _adapter_response_pool.pop(request_id, None) + return {"status": "error", "message": str(e)} + # ============================================================================= # 内部实现函数(不暴露给外部) @@ -367,3 +406,129 @@ async def custom_to_stream( storage_message=storage_message, show_log=show_log, ) + + +async def adapter_command_to_stream( + action: str, + params: dict, + stream_id: Optional[str] = None, + timeout: float = 30.0, + storage_message: bool = False +) -> dict: + """向适配器发送命令并获取返回值 + 雅诺狐的耳朵特别软 + + Args: + action: 适配器命令动作,如"get_group_list"、"get_friend_list"等 + params: 命令参数字典 + stream_id: 聊天流ID,可选,如果不提供则自动生成一个 + timeout: 超时时间(秒) + storage_message: 是否存储消息到数据库 + show_log: 是否显示日志 + + Returns: + dict: 适配器返回的响应,格式为 {"status": "ok/failed", "data": {...}, "message": "..."} + 如果发送失败则返回 {"status": "error", "message": "错误信息"} + """ + try: + + logger.debug(f"[SendAPI] 向适配器发送命令: {action}") + + # 如果没有提供stream_id,则生成一个临时的 + if stream_id is None: + import uuid + stream_id = f"adapter_temp_{uuid.uuid4().hex[:8]}" + logger.debug(f"[SendAPI] 自动生成临时stream_id: {stream_id}") + + # 查找目标聊天流 + target_stream = get_chat_manager().get_stream(stream_id) + if not target_stream: + # 如果是自动生成的stream_id且找不到聊天流,创建一个临时的虚拟流 + if stream_id.startswith("adapter_temp_"): + logger.debug(f"[SendAPI] 创建临时虚拟聊天流: {stream_id}") + + # 创建临时的用户信息和聊天流 + + temp_user_info = UserInfo( + user_id="system", + user_nickname="System", + platform="adapter_command" + ) + + temp_chat_stream = ChatStream( + stream_id=stream_id, + platform="adapter_command", + user_info=temp_user_info, + group_info=None + ) + + target_stream = temp_chat_stream + else: + logger.error(f"[SendAPI] 未找到聊天流: {stream_id}") + return {"status": "error", "message": f"未找到聊天流: {stream_id}"} + + # 创建发送器 + heart_fc_sender = HeartFCSender() + + # 生成消息ID + current_time = time.time() + message_id = f"adapter_cmd_{int(current_time * 1000)}" + + # 构建机器人用户信息 + bot_user_info = UserInfo( + user_id=global_config.bot.qq_account, + user_nickname=global_config.bot.nickname, + platform=target_stream.platform, + ) + + # 构建适配器命令数据 + adapter_command_data = { + "action": action, + "params": params, + "timeout": timeout, + "request_id": message_id, + } + + # 创建消息段 + message_segment = Seg(type="adapter_command", data=adapter_command_data) + + # 构建发送消息对象 + bot_message = MessageSending( + message_id=message_id, + chat_stream=target_stream, + bot_user_info=bot_user_info, + sender_info=target_stream.user_info, + message_segment=message_segment, + display_message=f"适配器命令: {action}", + reply=None, + is_head=True, + is_emoji=False, + thinking_start_time=current_time, + reply_to=None, + ) + + # 发送消息 + sent_msg = await heart_fc_sender.send_message( + bot_message, + typing=False, + set_reply=False, + storage_message=storage_message + ) + + if not sent_msg: + logger.error("[SendAPI] 发送适配器命令失败") + return {"status": "error", "message": "发送适配器命令失败"} + + logger.debug(f"[SendAPI] 已发送适配器命令,等待响应...") + + # 等待适配器响应 + response = await wait_adapter_response(message_id, timeout) + + logger.debug(f"[SendAPI] 收到适配器响应: {response}") + + return response + + except Exception as e: + logger.error(f"[SendAPI] 发送适配器命令时出错: {e}") + traceback.print_exc() + return {"status": "error", "message": f"发送适配器命令时出错: {str(e)}"}