""" 发送API模块 专门负责发送各种类型的消息,采用标准Python包设计模式 使用方式: from src.plugin_system.apis import send_api # 方式1:直接使用stream_id(推荐) await send_api.text_to_stream("hello", stream_id) await send_api.emoji_to_stream(emoji_base64, stream_id) await send_api.custom_to_stream("video", video_data, stream_id) # 方式2:使用群聊/私聊指定函数 await send_api.text_to_group("hello", "123456") await send_api.text_to_user("hello", "987654") # 方式3:使用通用custom_message函数 await send_api.custom_message("video", video_data, "123456", True) # 方式4:向适配器发送命令并获取返回值 response = await send_api.adapter_command_to_stream( "get_group_list", {}, stream_id ) if response["status"] == "ok": group_list = response.get("data", []) """ from pathlib import Path async def file_to_stream( file_path: str, stream_id: str, file_name: str | None = None, storage_message: bool = True, set_reply: bool = True ) -> bool: """向指定流发送文件 Args: file_path: 文件的本地路径 stream_id: 聊天流ID file_name: 发送到对方时显示的文件名,如果为 None 则使用原始文件名 storage_message: 是否存储消息到数据库 Returns: bool: 是否发送成功 """ target_stream = await get_chat_manager().get_stream(stream_id) if not target_stream: logger.error(f"[SendAPI] 未找到聊天流: {stream_id}") return False if not file_name: file_name = Path(file_path).name params = { "file": file_path, "name": file_name, } action = "" if target_stream.group_info and target_stream.group_info.group_id: action = "upload_group_file" params["group_id"] = target_stream.group_info.group_id elif target_stream.user_info and target_stream.user_info.user_id: action = "upload_private_file" params["user_id"] = target_stream.user_info.user_id else: logger.error(f"[SendAPI] 无法确定文件发送目标: {stream_id}") return False response = await adapter_command_to_stream( action=action, params=params, stream_id=stream_id, timeout=300.0 # 文件上传可能需要更长时间 ) if response.get("status") == "ok": logger.info(f"文件 {file_name} 已成功发送到 {stream_id}") return True else: logger.error(f"文件 {file_name} 发送到 {stream_id} 失败: {response.get('message')}") return False import asyncio import time import traceback import uuid from typing import TYPE_CHECKING, Any from mofox_wire import MessageEnvelope from src.common.data_models.database_data_model import DatabaseUserInfo if TYPE_CHECKING: from src.common.data_models.database_data_model import DatabaseMessages # 导入依赖 from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager from src.chat.message_receive.uni_message_sender import HeartFCSender from src.common.logger import get_logger from src.config.config import global_config # 日志记录器 logger = get_logger("send_api") # 适配器命令响应等待池 _adapter_response_pool: dict[str, asyncio.Future] = {} def message_dict_to_db_message(message_dict: dict[str, Any]) -> "DatabaseMessages | None": """从消息字典构建 DatabaseMessages 对象 Args: message_dict: 消息字典或 DatabaseMessages 对象 Returns: Optional[DatabaseMessages]: 构建的消息对象,如果构建失败则返回None """ from src.common.data_models.database_data_model import DatabaseMessages # 如果已经是 DatabaseMessages,直接返回 if isinstance(message_dict, DatabaseMessages): return message_dict # 从字典提取信息 user_platform = message_dict.get("user_platform", "") user_id = message_dict.get("user_id", "") user_nickname = message_dict.get("user_nickname", "") user_cardname = message_dict.get("user_cardname", "") chat_info_group_id = message_dict.get("chat_info_group_id") chat_info_group_platform = message_dict.get("chat_info_group_platform", "") chat_info_group_name = message_dict.get("chat_info_group_name", "") chat_info_platform = message_dict.get("chat_info_platform", "") message_id = message_dict.get("message_id") or message_dict.get("chat_info_message_id") or message_dict.get("id") time_val = message_dict.get("time", time.time()) additional_config = message_dict.get("additional_config") processed_plain_text = message_dict.get("processed_plain_text", "") # DatabaseMessages 使用扁平参数构造 db_message = DatabaseMessages( message_id=message_id or "temp_reply_id", time=time_val, user_id=user_id, user_nickname=user_nickname, user_cardname=user_cardname, user_platform=user_platform, chat_info_group_id=chat_info_group_id, chat_info_group_name=chat_info_group_name, chat_info_group_platform=chat_info_group_platform, chat_info_platform=chat_info_platform, processed_plain_text=processed_plain_text, additional_config=additional_config ) logger.info(f"[SendAPI] 构建回复消息对象,发送者: {user_nickname}") return db_message def put_adapter_response(request_id: str, response_data: dict) -> None: """将适配器响应放入响应池""" if request_id in _adapter_response_pool: future = _adapter_response_pool.pop(request_id) if not future.done(): future.set_result(response_data) async def wait_adapter_response(request_id: str, timeout: float = 30.0) -> dict: """等待适配器响应""" future = asyncio.Future() _adapter_response_pool[request_id] = future try: response = await asyncio.wait_for(future, timeout=timeout) return response except asyncio.TimeoutError: _adapter_response_pool.pop(request_id, None) return {"status": "error", "message": "timeout"} except Exception as e: _adapter_response_pool.pop(request_id, None) return {"status": "error", "message": str(e)} def _build_message_envelope( *, message_id: str, target_stream: "ChatStream", bot_user_info: DatabaseUserInfo, message_segment: dict[str, Any], timestamp: float, ) -> MessageEnvelope: """构建发送的 MessageEnvelope 数据结构""" message_info: dict[str, Any] = { "message_id": message_id, "time": timestamp, "platform": target_stream.platform, "user_info": { "user_id": bot_user_info.user_id, "user_nickname": bot_user_info.user_nickname, "user_cardname": getattr(bot_user_info, "user_cardname", None), "platform": bot_user_info.platform, }, } if target_stream.group_info: message_info["group_info"] = { "group_id": target_stream.group_info.group_id, "group_name": target_stream.group_info.group_name, "platform": target_stream.group_info.group_platform, } return { "id": str(uuid.uuid4()), "direction": "outgoing", "platform": target_stream.platform, "message_info": message_info, "message_segment": message_segment, } # ============================================================================= # 内部实现函数(不暴露给外部) # ============================================================================= async def _send_to_target( message_type: str, content: str | dict, stream_id: str, display_message: str = "", typing: bool = False, reply_to: str = "", set_reply: bool = False, reply_to_message: dict[str, Any] | None = None, storage_message: bool = True, show_log: bool = True, ) -> bool: """向指定目标发送消息的内部实现""" try: if reply_to: logger.warning("[SendAPI] 自 0.10.0 起 reply_to 已弃用,请使用 reply_to_message") if show_log: logger.debug(f"[SendAPI] 发送 {message_type} 消息到 {stream_id}") target_stream = await get_chat_manager().get_stream(stream_id) if not target_stream: logger.error(f"[SendAPI] 未找到聊天流: {stream_id}") return False heart_fc_sender = HeartFCSender() current_time = time.time() message_id = f"send_api_{int(current_time * 1000)}" bot_user_info = DatabaseUserInfo( user_id=str(global_config.bot.qq_account), user_nickname=global_config.bot.nickname, platform=target_stream.platform, ) anchor_message = None reply_to_platform_id = None if reply_to: import re match = re.match(r"(.+)\((\d+)\)", reply_to) if match: sender_name, sender_id = match.groups() temp_message_dict = { "user_nickname": sender_name, "user_id": sender_id, "chat_info_platform": target_stream.platform, "message_id": "temp_reply_id", "time": time.time(), } anchor_message = message_dict_to_db_message(message_dict=temp_message_dict) if anchor_message: reply_to_platform_id = f"{target_stream.platform}:{sender_id}" elif reply_to_message: anchor_message = message_dict_to_db_message(message_dict=reply_to_message) if anchor_message: reply_to_platform_id = f"{anchor_message.chat_info.platform}:{anchor_message.user_info.user_id}" base_segment: dict[str, Any] = {"type": message_type, "data": content} message_segment: dict[str, Any] if set_reply and anchor_message and anchor_message.message_id: message_segment = { "type": "seglist", "data": [ {"type": "reply", "data": anchor_message.message_id}, base_segment, ], } else: message_segment = base_segment if reply_to_platform_id: message_segment["reply_to"] = reply_to_platform_id envelope = _build_message_envelope( message_id=message_id, target_stream=target_stream, bot_user_info=bot_user_info, message_segment=message_segment, timestamp=current_time, ) sent_msg = await heart_fc_sender.send_message( envelope, chat_stream=target_stream, typing=typing, storage_message=storage_message, show_log=show_log, thinking_start_time=current_time, display_message=display_message or (content if isinstance(content, str) else ""), ) if sent_msg: logger.debug(f"[SendAPI] 成功发送消息到 {stream_id}") return True logger.error("[SendAPI] 发送消息失败") return False except Exception as e: logger.error(f"[SendAPI] 发送消息时出错: {e}") traceback.print_exc() return False async def text_to_stream( text: str, stream_id: str, typing: bool = False, reply_to: str = "", reply_to_message: dict[str, Any] | None = None, set_reply: bool = True, storage_message: bool = True, ) -> bool: """向指定流发送文本消息 Args: text: 要发送的文本内容 stream_id: 聊天流ID typing: 是否显示正在输入 reply_to: 回复消息,格式为"发送者:消息内容" storage_message: 是否存储消息到数据库 Returns: bool: 是否发送成功 """ return await _send_to_target( "text", text, stream_id, "", typing, reply_to, set_reply=set_reply, reply_to_message=reply_to_message, storage_message=storage_message, ) async def emoji_to_stream( emoji_base64: str, stream_id: str, storage_message: bool = True, set_reply: bool = True ) -> bool: """向指定流发送表情包 Args: emoji_base64: 表情包的base64编码 stream_id: 聊天流ID storage_message: 是否存储消息到数据库 Returns: bool: 是否发送成功 """ return await _send_to_target( "emoji", emoji_base64, stream_id, "", typing=False, storage_message=storage_message, set_reply=set_reply ) async def image_to_stream( image_base64: str, stream_id: str, storage_message: bool = True, set_reply: bool = True ) -> bool: """向指定流发送图片 Args: image_base64: 图片的base64编码 stream_id: 聊天流ID storage_message: 是否存储消息到数据库 Returns: bool: 是否发送成功 """ return await _send_to_target( "image", image_base64, stream_id, "", typing=False, storage_message=storage_message, set_reply=set_reply ) async def command_to_stream( command: str | dict, stream_id: str, storage_message: bool = True, display_message: str = "", set_reply: bool = True, ) -> bool: """向指定流发送命令 Args: command: 命令 stream_id: 聊天流ID storage_message: 是否存储消息到数据库 Returns: bool: 是否发送成功 """ return await _send_to_target( "command", command, stream_id, display_message, typing=False, storage_message=storage_message ) async def custom_to_stream( message_type: str, content: str | dict, stream_id: str, display_message: str = "", typing: bool = False, reply_to: str = "", reply_to_message: dict[str, Any] | None = None, set_reply: bool = True, storage_message: bool = True, show_log: bool = True, ) -> bool: """向指定流发送自定义类型消息 Args: message_type: 消息类型,如"text"、"image"、"emoji"、"video"、"file"等 content: 消息内容(通常是base64编码或文本) stream_id: 聊天流ID display_message: 显示消息 typing: 是否显示正在输入 reply_to: 回复消息,格式为"发送者:消息内容" storage_message: 是否存储消息到数据库 show_log: 是否显示日志 Returns: bool: 是否发送成功 """ return await _send_to_target( message_type=message_type, content=content, stream_id=stream_id, display_message=display_message, typing=typing, reply_to=reply_to, reply_to_message=reply_to_message, set_reply=set_reply, storage_message=storage_message, show_log=show_log, ) async def adapter_command_to_stream( action: str, params: dict, platform: str | None = "qq", stream_id: str | None = None, timeout: float = 30.0, storage_message: bool = False, ) -> dict: """向适配器发送命令并获取返回值""" if not stream_id and not platform: raise ValueError("必须提供stream_id或platform") try: logger.debug(f"[SendAPI] 准备发送适配器命令: {action}") if stream_id is None: stream_id = f"adapter_temp_{uuid.uuid4().hex[:8]}" logger.debug(f"[SendAPI] 自动生成临时stream_id: {stream_id}") target_stream = await get_chat_manager().get_stream(stream_id) if not target_stream: if stream_id.startswith("adapter_temp_"): logger.debug(f"[SendAPI] 创建临时聊天流: {stream_id}") if not platform: logger.error("[SendAPI] 创建临时聊天流失败: platform 未提供") return {"status": "error", "message": "platform 未提供"} temp_user_info = DatabaseUserInfo(user_id="system", user_nickname="System", platform=platform) temp_chat_stream = ChatStream( stream_id=stream_id, platform=platform, user_info=temp_user_info, group_info=None ) target_stream = temp_chat_stream else: logger.error(f"[SendAPI] 未找到聊天流: {stream_id}") return {"status": "error", "message": f"未找到聊天流: {stream_id}"} heart_fc_sender = HeartFCSender() current_time = time.time() message_id = f"adapter_cmd_{int(current_time * 1000)}" bot_user_info = DatabaseUserInfo( user_id=str(global_config.bot.qq_account), user_nickname=global_config.bot.nickname, platform=target_stream.platform, ) adapter_command_data = { "action": action, "params": params, "timeout": timeout, "request_id": message_id, } message_segment = {"type": "adapter_command", "data": adapter_command_data} envelope = _build_message_envelope( message_id=message_id, target_stream=target_stream, bot_user_info=bot_user_info, message_segment=message_segment, timestamp=current_time, ) sent_msg = await heart_fc_sender.send_message( envelope, chat_stream=target_stream, typing=False, storage_message=storage_message, show_log=True, thinking_start_time=current_time, display_message=f"发送适配器命令: {action}", ) if not sent_msg: logger.error("[SendAPI] 发送适配器命令失败") return {"status": "error", "message": "发送适配器命令失败"} logger.debug("[SendAPI] 已发送适配器命令,等待响应...") response = await wait_adapter_response(message_id, timeout) logger.debug(f"[SendAPI] 收到适配器响应: {response}") return response except Exception as e: logger.error(f"[SendAPI] 发送适配器命令时出错: {e}") traceback.print_exc() return {"status": "error", "message": f"发送适配器命令时出错: {e!s}"}