Files
Mofox-Core/src/plugin_system/apis/send_api.py
tt-P607 103a6edaf3 feat(emoji): 增强消息上下文下的表情选择并修复存储用户信息的问题
该提交对表情操作及相关消息发送 API 进行了重大改进。

现在,表情选择逻辑更加具备上下文感知能力。LLM 提示现在包括机器人的待发送回复内容,使其能够根据对话历史和自身回复选择更合适的表情。用于上下文的近期聊天记录也有所增加。

此外,修复了插件发送 API (`send_api`) 中的一个错误。之前,当机器人发送消息时,消息在数据库中存储的是接收者的用户信息,而非机器人的信息。本次提交通过显式传递并使用机器人的用户信息进行存储,确保消息历史准确反映发送者。

其他更改包括:
- 重构表情操作的激活逻辑以提高清晰度。
- 改进 LLM 选择的表情描述匹配算法。- 为配置访问添加必要的类型安全检查。
2025-11-28 15:24:08 +08:00

578 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
发送API模块
专门负责发送各种类型的消息采用标准Python包设计模式
使用方式:
from src.plugin_system.apis import send_api
# 方式1直接使用stream_id推荐
await send_api.text_to_stream("hello", stream_id)
await send_api.emoji_to_stream(emoji_base64, stream_id)
await send_api.custom_to_stream("video", video_data, stream_id)
# 方式2使用群聊/私聊指定函数
await send_api.text_to_group("hello", "123456")
await send_api.text_to_user("hello", "987654")
# 方式3使用通用custom_message函数
await send_api.custom_message("video", video_data, "123456", True)
# 方式4向适配器发送命令并获取返回值
response = await send_api.adapter_command_to_stream(
"get_group_list", {}, stream_id
)
if response["status"] == "ok":
group_list = response.get("data", [])
"""
from pathlib import Path
async def file_to_stream(
file_path: str,
stream_id: str,
file_name: str | None = None,
storage_message: bool = True,
set_reply: bool = True
) -> bool:
"""向指定流发送文件
Args:
file_path: 文件的本地路径
stream_id: 聊天流ID
file_name: 发送到对方时显示的文件名,如果为 None 则使用原始文件名
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
target_stream = await get_chat_manager().get_stream(stream_id)
if not target_stream:
logger.error(f"[SendAPI] 未找到聊天流: {stream_id}")
return False
if not file_name:
file_name = Path(file_path).name
params = {
"file": file_path,
"name": file_name,
}
action = ""
if target_stream.group_info and target_stream.group_info.group_id:
action = "upload_group_file"
params["group_id"] = target_stream.group_info.group_id
elif target_stream.user_info and target_stream.user_info.user_id:
action = "upload_private_file"
params["user_id"] = target_stream.user_info.user_id
else:
logger.error(f"[SendAPI] 无法确定文件发送目标: {stream_id}")
return False
response = await adapter_command_to_stream(
action=action,
params=params,
stream_id=stream_id,
timeout=300.0 # 文件上传可能需要更长时间
)
if response.get("status") == "ok":
logger.info(f"文件 {file_name} 已成功发送到 {stream_id}")
return True
else:
logger.error(f"文件 {file_name} 发送到 {stream_id} 失败: {response.get('message')}")
return False
import asyncio
import time
import traceback
import uuid
from typing import TYPE_CHECKING, Any
from mofox_wire import MessageEnvelope
from src.common.data_models.database_data_model import DatabaseUserInfo
if TYPE_CHECKING:
from src.common.data_models.database_data_model import DatabaseMessages
# 导入依赖
from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager
from src.chat.message_receive.uni_message_sender import HeartFCSender
from src.common.logger import get_logger
from src.config.config import global_config
# 日志记录器
logger = get_logger("send_api")
# 适配器命令响应等待池
_adapter_response_pool: dict[str, asyncio.Future] = {}
def message_dict_to_db_message(message_dict: dict[str, Any]) -> "DatabaseMessages | None":
"""从消息字典构建 DatabaseMessages 对象
Args:
message_dict: 消息字典或 DatabaseMessages 对象
Returns:
Optional[DatabaseMessages]: 构建的消息对象如果构建失败则返回None
"""
from src.common.data_models.database_data_model import DatabaseMessages
# 如果已经是 DatabaseMessages直接返回
if isinstance(message_dict, DatabaseMessages):
return message_dict
# 从字典提取信息
user_platform = message_dict.get("user_platform", "")
user_id = message_dict.get("user_id", "")
user_nickname = message_dict.get("user_nickname", "")
user_cardname = message_dict.get("user_cardname", "")
chat_info_group_id = message_dict.get("chat_info_group_id")
chat_info_group_platform = message_dict.get("chat_info_group_platform", "")
chat_info_group_name = message_dict.get("chat_info_group_name", "")
chat_info_platform = message_dict.get("chat_info_platform", "")
message_id = message_dict.get("message_id") or message_dict.get("chat_info_message_id") or message_dict.get("id")
time_val = message_dict.get("time", time.time())
additional_config = message_dict.get("additional_config")
processed_plain_text = message_dict.get("processed_plain_text", "")
# DatabaseMessages 使用扁平参数构造
db_message = DatabaseMessages(
message_id=message_id or "temp_reply_id",
time=time_val,
user_id=user_id,
user_nickname=user_nickname,
user_cardname=user_cardname,
user_platform=user_platform,
chat_info_group_id=chat_info_group_id,
chat_info_group_name=chat_info_group_name,
chat_info_group_platform=chat_info_group_platform,
chat_info_platform=chat_info_platform,
processed_plain_text=processed_plain_text,
additional_config=additional_config
)
logger.info(f"[SendAPI] 构建回复消息对象,发送者: {user_nickname}")
return db_message
def put_adapter_response(request_id: str, response_data: dict) -> None:
"""将适配器响应放入响应池"""
if request_id in _adapter_response_pool:
future = _adapter_response_pool.pop(request_id)
if not future.done():
future.set_result(response_data)
async def wait_adapter_response(request_id: str, timeout: float = 30.0) -> dict:
"""等待适配器响应"""
future = asyncio.Future()
_adapter_response_pool[request_id] = future
try:
response = await asyncio.wait_for(future, timeout=timeout)
return response
except asyncio.TimeoutError:
_adapter_response_pool.pop(request_id, None)
return {"status": "error", "message": "timeout"}
except Exception as e:
_adapter_response_pool.pop(request_id, None)
return {"status": "error", "message": str(e)}
def _build_message_envelope(
*,
message_id: str,
target_stream: "ChatStream",
bot_user_info: DatabaseUserInfo,
message_segment: dict[str, Any],
timestamp: float,
) -> MessageEnvelope:
"""构建发送的 MessageEnvelope 数据结构"""
# 这里的 user_info 决定了消息要发给谁,所以在私聊场景下必须是目标用户
target_user_info = target_stream.user_info or bot_user_info
message_info: dict[str, Any] = {
"message_id": message_id,
"time": timestamp,
"platform": target_stream.platform,
"user_info": {
"user_id": target_user_info.user_id,
"user_nickname": target_user_info.user_nickname,
"user_cardname": getattr(target_user_info, "user_cardname", None),
"platform": target_user_info.platform,
},
}
if target_stream.group_info:
message_info["group_info"] = {
"group_id": target_stream.group_info.group_id,
"group_name": target_stream.group_info.group_name,
"platform": target_stream.group_info.platform,
}
return { # type: ignore
"id": str(uuid.uuid4()),
"direction": "outgoing",
"platform": target_stream.platform,
"message_info": message_info,
"message_segment": message_segment,
}
# =============================================================================
# 内部实现函数(不暴露给外部)
# =============================================================================
async def _send_to_target(
message_type: str,
content: str | dict,
stream_id: str,
display_message: str = "",
typing: bool = False,
reply_to: str = "",
set_reply: bool = False,
reply_to_message: dict[str, Any] | None = None,
storage_message: bool = True,
show_log: bool = True,
) -> bool:
"""向指定目标发送消息的内部实现"""
try:
if reply_to:
logger.warning("[SendAPI] 自 0.10.0 起 reply_to 已弃用,请使用 reply_to_message")
if show_log:
logger.debug(f"[SendAPI] 发送 {message_type} 消息到 {stream_id}")
target_stream = await get_chat_manager().get_stream(stream_id)
if not target_stream:
logger.error(f"[SendAPI] 未找到聊天流: {stream_id}")
return False
heart_fc_sender = HeartFCSender()
current_time = time.time()
message_id = f"send_api_{int(current_time * 1000)}"
bot_config = global_config.bot
if not bot_config:
logger.error("机器人配置丢失,无法构建机器人用户信息")
return False
bot_user_info = DatabaseUserInfo(
user_id=str(bot_config.qq_account),
user_nickname=bot_config.nickname,
platform=target_stream.platform,
)
anchor_message = None
reply_to_platform_id = None
if reply_to:
import re
match = re.match(r"(.+)\((\d+)\)", reply_to)
if match:
sender_name, sender_id = match.groups()
temp_message_dict = {
"user_nickname": sender_name,
"user_id": sender_id,
"chat_info_platform": target_stream.platform,
"message_id": "temp_reply_id",
"time": time.time(),
}
anchor_message = message_dict_to_db_message(message_dict=temp_message_dict)
if anchor_message:
reply_to_platform_id = f"{target_stream.platform}:{sender_id}"
elif reply_to_message:
anchor_message = message_dict_to_db_message(message_dict=reply_to_message)
if anchor_message:
reply_to_platform_id = f"{anchor_message.chat_info.platform}:{anchor_message.user_info.user_id}"
base_segment: dict[str, Any] = {"type": message_type, "data": content}
message_segment: dict[str, Any]
if set_reply and anchor_message and anchor_message.message_id:
message_segment = {
"type": "seglist",
"data": [
{"type": "reply", "data": anchor_message.message_id},
base_segment,
],
}
else:
message_segment = base_segment
if reply_to_platform_id:
message_segment["reply_to"] = reply_to_platform_id
envelope = _build_message_envelope(
message_id=message_id,
target_stream=target_stream,
bot_user_info=bot_user_info,
message_segment=message_segment,
timestamp=current_time,
)
# Use readable display text so binary/base64 payloads are not stored directly
display_message_for_db = display_message or ""
if not display_message_for_db:
if message_type in {"emoji", "image", "voice", "video", "file"}:
# Leave empty to keep processed_plain_text (e.g., generated descriptions) instead of raw payloads
display_message_for_db = ""
elif isinstance(content, str):
display_message_for_db = content
sent_msg = await heart_fc_sender.send_message(
envelope,
chat_stream=target_stream,
typing=typing,
storage_message=storage_message,
show_log=show_log,
thinking_start_time=current_time,
display_message=display_message_for_db,
storage_user_info=bot_user_info,
)
if sent_msg:
logger.debug(f"[SendAPI] 成功发送消息到 {stream_id}")
return True
logger.error("[SendAPI] 发送消息失败")
return False
except Exception as e:
logger.error(f"[SendAPI] 发送消息时出错: {e}")
traceback.print_exc()
return False
async def text_to_stream(
text: str,
stream_id: str,
typing: bool = False,
reply_to: str = "",
reply_to_message: dict[str, Any] | None = None,
set_reply: bool = True,
storage_message: bool = True,
) -> bool:
"""向指定流发送文本消息
Args:
text: 要发送的文本内容
stream_id: 聊天流ID
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
return await _send_to_target(
"text",
text,
stream_id,
"",
typing,
reply_to,
set_reply=set_reply,
reply_to_message=reply_to_message,
storage_message=storage_message,
)
async def emoji_to_stream(
emoji_base64: str, stream_id: str, storage_message: bool = True, set_reply: bool = True
) -> bool:
"""向指定流发送表情包
Args:
emoji_base64: 表情包的base64编码
stream_id: 聊天流ID
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
return await _send_to_target(
"emoji", emoji_base64, stream_id, "", typing=False, storage_message=storage_message, set_reply=set_reply
)
async def image_to_stream(
image_base64: str, stream_id: str, storage_message: bool = True, set_reply: bool = True
) -> bool:
"""向指定流发送图片
Args:
image_base64: 图片的base64编码
stream_id: 聊天流ID
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
return await _send_to_target(
"image", image_base64, stream_id, "", typing=False, storage_message=storage_message, set_reply=set_reply
)
async def command_to_stream(
command: str | dict,
stream_id: str,
storage_message: bool = True,
display_message: str = "",
set_reply: bool = True,
) -> bool:
"""向指定流发送命令
Args:
command: 命令
stream_id: 聊天流ID
storage_message: 是否存储消息到数据库
Returns:
bool: 是否发送成功
"""
return await _send_to_target(
"command", command, stream_id, display_message, typing=False, storage_message=storage_message
)
async def custom_to_stream(
message_type: str,
content: str | dict,
stream_id: str,
display_message: str = "",
typing: bool = False,
reply_to: str = "",
reply_to_message: dict[str, Any] | None = None,
set_reply: bool = True,
storage_message: bool = True,
show_log: bool = True,
) -> bool:
"""向指定流发送自定义类型消息
Args:
message_type: 消息类型,如"text""image""emoji""video""file"
content: 消息内容通常是base64编码或文本
stream_id: 聊天流ID
display_message: 显示消息
typing: 是否显示正在输入
reply_to: 回复消息,格式为"发送者:消息内容"
storage_message: 是否存储消息到数据库
show_log: 是否显示日志
Returns:
bool: 是否发送成功
"""
return await _send_to_target(
message_type=message_type,
content=content,
stream_id=stream_id,
display_message=display_message,
typing=typing,
reply_to=reply_to,
reply_to_message=reply_to_message,
set_reply=set_reply,
storage_message=storage_message,
show_log=show_log,
)
async def adapter_command_to_stream(
action: str,
params: dict,
platform: str | None = "qq",
stream_id: str | None = None,
timeout: float = 30.0,
storage_message: bool = False,
) -> dict:
"""向适配器发送命令并获取返回值"""
if not stream_id and not platform:
raise ValueError("必须提供stream_id或platform")
try:
logger.debug(f"[SendAPI] 准备发送适配器命令: {action}")
if stream_id is None:
stream_id = f"adapter_temp_{uuid.uuid4().hex[:8]}"
logger.debug(f"[SendAPI] 自动生成临时stream_id: {stream_id}")
target_stream = await get_chat_manager().get_stream(stream_id)
if not target_stream:
if stream_id.startswith("adapter_temp_"):
logger.debug(f"[SendAPI] 创建临时聊天流: {stream_id}")
if not platform:
logger.error("[SendAPI] 创建临时聊天流失败: platform 未提供")
return {"status": "error", "message": "platform 未提供"}
temp_user_info = DatabaseUserInfo(user_id="system", user_nickname="System", platform=platform)
temp_chat_stream = ChatStream(
stream_id=stream_id, platform=platform, user_info=temp_user_info, group_info=None
)
target_stream = temp_chat_stream
else:
logger.error(f"[SendAPI] 未找到聊天流: {stream_id}")
return {"status": "error", "message": f"未找到聊天流: {stream_id}"}
heart_fc_sender = HeartFCSender()
current_time = time.time()
message_id = f"adapter_cmd_{int(current_time * 1000)}"
bot_user_info = DatabaseUserInfo(
user_id=str(global_config.bot.qq_account),
user_nickname=global_config.bot.nickname,
platform=target_stream.platform,
)
adapter_command_data = {
"action": action,
"params": params,
"timeout": timeout,
"request_id": message_id,
}
message_segment = {"type": "adapter_command", "data": adapter_command_data}
envelope = _build_message_envelope(
message_id=message_id,
target_stream=target_stream,
bot_user_info=bot_user_info,
message_segment=message_segment,
timestamp=current_time,
)
sent_msg = await heart_fc_sender.send_message(
envelope,
chat_stream=target_stream,
typing=False,
storage_message=storage_message,
show_log=True,
thinking_start_time=current_time,
display_message=f"发送适配器命令: {action}",
)
if not sent_msg:
logger.error("[SendAPI] 发送适配器命令失败")
return {"status": "error", "message": "发送适配器命令失败"}
logger.debug("[SendAPI] 已发送适配器命令,等待响应...")
response = await wait_adapter_response(message_id, timeout)
logger.debug(f"[SendAPI] 收到适配器响应: {response}")
return response
except Exception as e:
logger.error(f"[SendAPI] 发送适配器命令时出错: {e}")
traceback.print_exc()
return {"status": "error", "message": f"发送适配器命令时出错: {e!s}"}