feat(chat): 通过动作参数实现专用的 @用户 功能

此提交引入了一种在回复中提及(@)用户的稳健机制。此功能不再由 LLM 直接生成“@”文本,而是通过 `reply` 和 `respond` 动作中新增加的 `at_user_id` 参数来处理。

主要变化包括:
- **核心动作:** 在 `ReplyAction` 和 `RespondAction` 中添加可选的 `at_user_id` 参数,用于指定要提及的用户。
- **提示工程:** 更新了 `default_generator` 提示,以指导 LLM 如何使用新的 `at_user_id` 参数,而不是生成“@”文本。
- **动作管理器:** `ChatterActionManager` 现在会检查动作数据中的 `at_user_id`。如果存在,它会使用新的 `SEND_AT_MESSAGE` 命令,确保提及格式正确并发送。
- **发送 API:** 在 `send_api` 中引入新的 `at_user_to_stream` 函数,用于处理发送独立的 `@` 段落。
- **类型安全与重构:** 改进了类型提示跨多个文件(`mofox_wire` 类型、`cast` 等)进行了修改,并增加了对 `global_config` 是否存在的检查,以防止运行时可能出现的 `NoneType` 错误,从而提高了整体代码的稳定性。
This commit is contained in:
tt-P607
2025-11-28 02:54:47 +08:00
parent e7cd20e3f9
commit e5117720c6
8 changed files with 257 additions and 152 deletions

View File

@@ -63,7 +63,7 @@ class CoreActionsPlugin(BasePlugin):
"""返回插件包含的组件列表"""
# --- 根据配置注册组件 ---
components: ClassVar = []
components = []
# 注册 reply 动作
if self.get_config("components.enable_reply", True):

View File

@@ -37,6 +37,7 @@ class ReplyAction(BaseAction):
"target_message_id": "要回复的目标消息ID必需来自未读消息的 <m...> 标签)",
"content": "回复的具体内容可选由LLM生成",
"should_quote_reply": "是否引用原消息可选true/false默认false。群聊中回复较早消息或需要明确指向时使用true",
"at_user_id": "需要@的用户的QQ号可选string。如果需要在回复中@某个用户,请提供此参数。",
}
# 动作使用场景
@@ -47,6 +48,7 @@ class ReplyAction(BaseAction):
"私聊场景必须使用此动作(不支持 respond",
"群聊中需要明确回应某个特定用户或问题时使用",
"关注单条消息的具体内容和上下文细节",
"如果回复时需要@某个用户,请在参数中提供'at_user_id'",
]
# 关联类型
@@ -83,6 +85,7 @@ class RespondAction(BaseAction):
# 动作参数定义
action_parameters: ClassVar = {
"content": "回复的具体内容可选由LLM生成",
"at_user_id": "需要@的用户的QQ号可选string。如果需要在回复中@某个用户,请提供此参数。",
}
# 动作使用场景
@@ -93,6 +96,7 @@ class RespondAction(BaseAction):
"关注对话流程、话题走向和整体氛围",
"适合群聊中的自然对话流,无需精确指向特定消息",
"可以同时回应多个话题或参与者",
"如果回复时需要@某个用户,请在参数中提供'at_user_id'",
]
# 关联类型

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
import base64
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast
import uuid
from mofox_wire import MessageBuilder
@@ -224,7 +224,7 @@ class MessageHandler:
if not messages:
logger.warning("转发消息内容为空或获取失败")
return None
return await self.handle_forward_message(messages)
return await self.handle_forward_message(cast(list, messages))
case RealMessageType.json:
return await self._handle_json_message(segment)
case RealMessageType.file:
@@ -331,10 +331,13 @@ class MessageHandler:
{"type": seg.get("type", "text"), "data": seg.get("data", "")} for seg in reply_segments
] or [{"type": "text", "data": "[无法获取被引用的消息]"}]
return {
"type": "seglist",
"data": [{"type": "text", "data": prefix_text}, *brief_segments, {"type": "text", "data": suffix_text}],
}
return cast(
SegPayload,
{
"type": "seglist",
"data": [{"type": "text", "data": prefix_text}, *brief_segments, {"type": "text", "data": suffix_text}],
},
)
async def _handle_record_message(self, segment: dict) -> SegPayload | None:
"""处理语音消息"""
@@ -380,14 +383,17 @@ class MessageHandler:
video_base64 = base64.b64encode(video_data).decode("utf-8")
logger.debug(f"视频文件大小: {len(video_data) / (1024 * 1024):.2f} MB")
return {
"type": "video",
"data": {
"base64": video_base64,
"filename": Path(file_path).name,
"size_mb": len(video_data) / (1024 * 1024),
return cast(
SegPayload,
{
"type": "video",
"data": {
"base64": video_base64,
"filename": Path(file_path).name,
"size_mb": len(video_data) / (1024 * 1024),
},
},
}
)
elif video_url:
# URL下载处理
from ..video_handler import get_video_downloader
@@ -401,15 +407,18 @@ class MessageHandler:
video_base64 = base64.b64encode(download_result["data"]).decode("utf-8")
logger.debug(f"视频下载成功,大小: {len(download_result['data']) / (1024 * 1024):.2f} MB")
return {
"type": "video",
"data": {
"base64": video_base64,
"filename": download_result.get("filename", "video.mp4"),
"size_mb": len(download_result["data"]) / (1024 * 1024),
"url": video_url,
return cast(
SegPayload,
{
"type": "video",
"data": {
"base64": video_base64,
"filename": download_result.get("filename", "video.mp4"),
"size_mb": len(download_result["data"]) / (1024 * 1024),
"url": video_url,
},
},
}
)
else:
logger.warning("既没有有效的本地文件路径也没有有效的视频URL")
return None
@@ -454,14 +463,14 @@ class MessageHandler:
processed_message = handled_message
forward_hint = {"type": "text", "data": "这是一条转发消息:\n"}
return {"type": "seglist", "data": [forward_hint, processed_message]}
return cast(SegPayload, {"type": "seglist", "data": [forward_hint, processed_message]})
async def _recursive_parse_image_seg(self, seg_data: SegPayload, to_image: bool) -> SegPayload:
# sourcery skip: merge-else-if-into-elif
if seg_data.get("type") == "seglist":
new_seg_list = []
for i_seg in seg_data.get("data", []):
parsed_seg = await self._recursive_parse_image_seg(i_seg, to_image)
parsed_seg = await self._recursive_parse_image_seg(cast(SegPayload, i_seg), to_image)
new_seg_list.append(parsed_seg)
return {"type": "seglist", "data": new_seg_list}
@@ -469,7 +478,7 @@ class MessageHandler:
if seg_data.get("type") == "image":
image_url = seg_data.get("data")
try:
encoded_image = await get_image_base64(image_url)
encoded_image = await get_image_base64(cast(str, image_url))
except Exception as e:
logger.error(f"图片处理失败: {str(e)}")
return {"type": "text", "data": "[图片]"}
@@ -477,7 +486,7 @@ class MessageHandler:
if seg_data.get("type") == "emoji":
image_url = seg_data.get("data")
try:
encoded_image = await get_image_base64(image_url)
encoded_image = await get_image_base64(cast(str, image_url))
except Exception as e:
logger.error(f"图片处理失败: {str(e)}")
return {"type": "text", "data": "[表情包]"}
@@ -492,7 +501,7 @@ class MessageHandler:
logger.debug(f"不处理类型: {seg_data.get('type')}")
return seg_data
async def _handle_forward_message(self, message_list: list, layer: int) -> Tuple[SegPayload | None, int]:
async def _handle_forward_message(self, message_list: list, layer: int) -> Tuple[Optional[SegPayload], int]:
# sourcery skip: low-code-quality
"""
递归处理实际转发消息
@@ -530,7 +539,7 @@ class MessageHandler:
continue
contents = sub_message_data.get("content")
seg_data, count = await self._handle_forward_message(contents, layer + 1)
if seg_data is None:
if not seg_data:
continue
image_count += count
head_tip: SegPayload = {
@@ -595,7 +604,7 @@ class MessageHandler:
"id": file_id,
}
return {"type": "file", "data": file_data}
return cast(SegPayload, {"type": "file", "data": file_data})
async def _handle_json_message(self, segment: dict) -> SegPayload | None:
"""
@@ -623,7 +632,7 @@ class MessageHandler:
# 从回声消息中提取文件信息
file_info = self._extract_file_info_from_echo(nested_data)
if file_info:
return {"type": "file", "data": file_info}
return cast(SegPayload, {"type": "file", "data": file_info})
# 检查是否是QQ小程序分享消息
if "app" in nested_data and "com.tencent.miniapp" in str(nested_data.get("app", "")):

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
import random
import time
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast, TypedDict
from mofox_wire import MessageEnvelope, SegPayload, GroupInfoPayload, UserInfoPayload, MessageInfoPayload
from src.common.logger import get_logger
@@ -19,6 +19,12 @@ if TYPE_CHECKING:
from ....plugin import NapcatAdapter
class AtPayload(TypedDict, total=False):
"""@ 消息段数据"""
user_id: str
class SendHandler:
"""负责向 Napcat 发送消息"""
@@ -41,10 +47,11 @@ class SendHandler:
return
message_segment = envelope.get("message_segment")
segment: SegPayload
if isinstance(message_segment, list):
segment: SegPayload = {"type": "seglist", "data": message_segment}
segment = {"type": "seglist", "data": message_segment}
else:
segment = message_segment or {}
segment = message_segment or {} # type: ignore
if segment:
seg_type = segment.get("type")
@@ -66,11 +73,12 @@ class SendHandler:
处理普通消息发送
"""
logger.info("处理普通信息中")
message_info: MessageInfoPayload = envelope.get("message_info", {})
message_segment: SegPayload = envelope.get("message_segment", {}) # type: ignore[assignment]
message_info: MessageInfoPayload = envelope.get("message_info", {}) or {}
message_segment: Union[SegPayload, List[SegPayload]] = envelope.get("message_segment") or cast(SegPayload, {})
seg_data: SegPayload
if isinstance(message_segment, list):
seg_data: SegPayload = {"type": "seglist", "data": message_segment}
seg_data = {"type": "seglist", "data": message_segment}
else:
seg_data = message_segment
@@ -81,7 +89,9 @@ class SendHandler:
id_name: Optional[str] = None
processed_message: list = []
try:
processed_message = await self.handle_seg_recursive(seg_data, user_info or {})
processed_message = await self.handle_seg_recursive(
seg_data, cast(UserInfoPayload, user_info if user_info is not None else {})
)
except Exception as e:
logger.error(f"处理消息时发生错误: {e}")
return None
@@ -123,10 +133,10 @@ class SendHandler:
处理命令类
"""
logger.debug("处理命令中")
message_info: Dict[str, Any] = envelope.get("message_info", {})
group_info: Optional[Dict[str, Any]] = message_info.get("group_info")
segment: SegPayload = envelope.get("message_segment", {}) # type: ignore[assignment]
seg_data: Dict[str, Any] = segment.get("data", {}) if isinstance(segment, dict) else {}
message_info: MessageInfoPayload = envelope.get("message_info", {}) or {}
group_info: Optional[GroupInfoPayload] = message_info.get("group_info")
segment: SegPayload = envelope.get("message_segment", {}) # type: ignore
seg_data: Dict[str, Any] = segment.get("data", {}) if isinstance(segment, dict) else {} # type: ignore
command_name: Optional[str] = seg_data.get("name")
try:
args = seg_data.get("args", {})
@@ -147,10 +157,10 @@ class SendHandler:
command, args_dict = self.handle_ai_voice_send_command(args, group_info)
elif command_name == CommandType.SET_EMOJI_LIKE.name:
command, args_dict = self.handle_set_emoji_like_command(args)
elif command_name == CommandType.SEND_AT_MESSAGE.name:
command, args_dict = self.handle_at_message_command(args, group_info)
elif command_name == CommandType.SEND_LIKE.name:
command, args_dict = self.handle_send_like_command(args)
elif command_name == CommandType.SEND_AT_MESSAGE.name:
command, args_dict = self.handle_at_message_command(args, group_info)
else:
logger.error(f"未知命令: {command_name}")
return
@@ -176,8 +186,8 @@ class SendHandler:
处理适配器命令类 - 用于直接向Napcat发送命令并返回结果
"""
logger.info("处理适配器命令中")
segment: SegPayload = envelope.get("message_segment", {}) # type: ignore[assignment]
seg_data: Dict[str, Any] = segment.get("data", {}) if isinstance(segment, dict) else {}
segment: SegPayload = envelope.get("message_segment", {}) # type: ignore
seg_data: Dict[str, Any] = segment.get("data", {}) if isinstance(segment, dict) else {} # type: ignore
try:
action = seg_data.get("action")
@@ -245,6 +255,9 @@ class SendHandler:
if not text:
return payload
new_payload = self.build_payload(payload, self.handle_text_message(str(text)), False)
elif seg_type == "at":
at_data: AtPayload = seg.get("data", {}) # type: ignore
new_payload = self.build_payload(payload, self.handle_at_message(at_data), False)
elif seg_type == "face":
logger.warning("MoFox-Bot 发送了qq原生表情暂时不支持")
elif seg_type == "image":
@@ -299,50 +312,21 @@ class SendHandler:
payload.append(addon)
return payload
async def handle_reply_message(self, message_id: str, user_info: UserInfoPayload) -> dict | list:
async def handle_reply_message(self, message_id: str, user_info: UserInfoPayload) -> dict:
"""处理回复消息"""
logger.debug(f"开始处理回复消息消息ID: {message_id}")
reply_seg = {"type": "reply", "data": {"id": message_id}}
# 检查是否启用引用艾特功能
if not config_api.get_plugin_config(self.plugin_config, "features.enable_reply_at", False):
logger.info("引用艾特功能未启用,仅发送普通回复")
return reply_seg
try:
msg_info_response = await self.send_message_to_napcat("get_msg", {"message_id": message_id})
logger.debug(f"获取消息 {message_id} 的详情响应: {msg_info_response}")
replied_user_id = None
if msg_info_response and msg_info_response.get("status") == "ok":
sender_info = msg_info_response.get("data", {}).get("sender")
if sender_info:
replied_user_id = sender_info.get("user_id")
if not replied_user_id:
logger.warning(f"无法获取消息 {message_id} 的发送者信息,跳过 @")
logger.debug(f"最终返回的回复段: {reply_seg}")
return reply_seg
if random.random() < config_api.get_plugin_config(self.plugin_config, "features.reply_at_rate", 0.5):
at_seg = {"type": "at", "data": {"qq": str(replied_user_id)}}
text_seg = {"type": "text", "data": {"text": " "}}
result_seg = [reply_seg, at_seg, text_seg]
logger.debug(f"最终返回的回复段: {result_seg}")
return result_seg
except Exception as e:
logger.error(f"处理引用回复并尝试@时出错: {e}")
logger.debug(f"最终返回的回复段: {reply_seg}")
return reply_seg
logger.debug(f"最终返回的回复段: {reply_seg}")
return reply_seg
def handle_text_message(self, message: str) -> dict:
"""处理文本消息"""
return {"type": "text", "data": {"text": message}}
def handle_at_message(self, at_data: AtPayload) -> dict:
"""处理@消息"""
user_id = at_data.get("user_id")
return {"type": "at", "data": {"qq": str(user_id)}}
def handle_image_message(self, encoded_image: str) -> dict:
"""处理图片消息"""
return {
@@ -370,14 +354,8 @@ class SendHandler:
def handle_voice_message(self, encoded_voice: str) -> dict:
"""处理语音消息"""
use_tts = False
if self.plugin_config:
use_tts = config_api.get_plugin_config(self.plugin_config, "voice.use_tts", False)
if not use_tts:
logger.warning("未启用语音消息处理")
return {}
if not encoded_voice:
logger.warning("接收到空的语音消息,跳过处理")
return {}
return {
"type": "record",
@@ -416,7 +394,7 @@ class SendHandler:
"""处理删除消息命令"""
return "delete_msg", {"message_id": args["message_id"]}
def handle_ban_command(self, args: Dict[str, Any], group_info: Optional[Dict[str, Any]]) -> tuple[str, Dict[str, Any]]:
def handle_ban_command(self, args: Dict[str, Any], group_info: Optional[GroupInfoPayload]) -> tuple[str, Dict[str, Any]]:
"""处理封禁命令"""
duration: int = int(args["duration"])
user_id: int = int(args["qq_id"])
@@ -436,7 +414,7 @@ class SendHandler:
},
)
def handle_whole_ban_command(self, args: Dict[str, Any], group_info: Optional[Dict[str, Any]]) -> tuple[str, Dict[str, Any]]:
def handle_whole_ban_command(self, args: Dict[str, Any], group_info: Optional[GroupInfoPayload]) -> tuple[str, Dict[str, Any]]:
"""处理全体禁言命令"""
enable = args["enable"]
assert isinstance(enable, bool), "enable参数必须是布尔值"
@@ -451,7 +429,7 @@ class SendHandler:
},
)
def handle_kick_command(self, args: Dict[str, Any], group_info: Optional[Dict[str, Any]]) -> tuple[str, Dict[str, Any]]:
def handle_kick_command(self, args: Dict[str, Any], group_info: Optional[GroupInfoPayload]) -> tuple[str, Dict[str, Any]]:
"""处理群成员踢出命令"""
user_id: int = int(args["qq_id"])
group_id: int = int(group_info["group_id"]) if group_info and group_info.get("group_id") else 0
@@ -468,7 +446,7 @@ class SendHandler:
},
)
def handle_poke_command(self, args: Dict[str, Any], group_info: Optional[Dict[str, Any]]) -> tuple[str, Dict[str, Any]]:
def handle_poke_command(self, args: Dict[str, Any], group_info: Optional[GroupInfoPayload]) -> tuple[str, Dict[str, Any]]:
"""处理戳一戳命令"""
user_id: int = int(args["qq_id"])
group_id: Optional[int] = None
@@ -515,31 +493,7 @@ class SendHandler:
{"user_id": user_id, "times": times},
)
def handle_at_message_command(self, args: Dict[str, Any], group_info: Optional[Dict[str, Any]]) -> tuple[str, Dict[str, Any]]:
"""处理艾特并发送消息命令"""
at_user_id = args.get("qq_id")
text = args.get("text")
if not at_user_id or not text:
raise ValueError("艾特消息命令缺少 qq_id 或 text 参数")
if not group_info or not group_info.get("group_id"):
raise ValueError("艾特消息命令必须在群聊上下文中使用")
message_payload = [
{"type": "at", "data": {"qq": str(at_user_id)}},
{"type": "text", "data": {"text": " " + str(text)}},
]
return (
"send_group_msg",
{
"group_id": group_info["group_id"],
"message": message_payload,
},
)
def handle_ai_voice_send_command(self, args: Dict[str, Any], group_info: Optional[Dict[str, Any]]) -> tuple[str, Dict[str, Any]]:
def handle_ai_voice_send_command(self, args: Dict[str, Any], group_info: Optional[GroupInfoPayload]) -> tuple[str, Dict[str, Any]]:
"""
处理AI语音发送命令的逻辑。
并返回 NapCat 兼容的 (action, params) 元组。
@@ -565,6 +519,30 @@ class SendHandler:
},
)
def handle_at_message_command(self, args: Dict[str, Any], group_info: Optional[GroupInfoPayload]) -> tuple[str, Dict[str, Any]]:
"""处理艾特并发送消息命令"""
at_user_id = args.get("qq_id")
text = args.get("text")
if not at_user_id or not text:
raise ValueError("艾特消息命令缺少 qq_id 或 text 参数")
if not group_info or not group_info.get("group_id"):
raise ValueError("艾特消息命令必须在群聊上下文中使用")
message_payload = [
{"type": "at", "data": {"qq": str(at_user_id)}},
{"type": "text", "data": {"text": " " + str(text)}},
]
return (
"send_group_msg",
{
"group_id": group_info["group_id"],
"message": message_payload,
},
)
async def send_message_to_napcat(self, action: str, params: dict, timeout: float = 20.0) -> dict:
"""通过 adapter API 发送到 napcat"""
try: