diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index 41654971c..784521b9d 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -11,7 +11,7 @@ from src.chat.chatter_manager import ChatterManager from src.chat.energy_system import energy_manager from src.common.logger import get_logger from src.config.config import global_config -from src.plugin_system.apis.chat_api import get_chat_manager +from src.chat.message_receive.chat_stream import get_chat_manager if TYPE_CHECKING: from src.common.data_models.message_manager_data_model import StreamContext diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 0bd809d4a..0919ed90c 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -286,7 +286,7 @@ class MessageManager: except Exception as e: logger.error(f"清理不活跃聊天流时发生错误: {e}") - async def _check_and_handle_interruption(self, chat_stream: ChatStream | None = None, message: DatabaseMessages | None = None): + async def _check_and_handle_interruption(self, chat_stream: "ChatStream | None" = None, message: DatabaseMessages | None = None): """检查并处理消息打断 - 通过取消 stream_loop_task 实现""" if not global_config.chat.interruption_enabled or not chat_stream or not message: return @@ -371,7 +371,7 @@ class MessageManager: else: logger.debug(f"聊天流 {chat_stream.stream_id} 未触发打断,打断概率: {interruption_probability:.2f}") - async def _trigger_reprocess(self, chat_stream: ChatStream): + async def _trigger_reprocess(self, chat_stream: "ChatStream"): """重新处理聊天流的核心逻辑 - 重新创建 stream_loop 任务""" try: stream_id = chat_stream.stream_id diff --git a/src/chat/message_receive/message.py b/src/chat/message_receive/message.py deleted file mode 100644 index 8baf1705c..000000000 --- a/src/chat/message_receive/message.py +++ /dev/null @@ -1,299 +0,0 @@ -import time -from abc import ABCMeta, abstractmethod -from dataclasses import dataclass -from typing import Optional, TYPE_CHECKING - -import urllib3 -from rich.traceback import install - -from src.chat.utils.self_voice_cache import consume_self_voice_text -from src.chat.utils.utils_image import get_image_manager -from src.chat.utils.utils_voice import get_voice_text -from src.common.data_models.database_data_model import DatabaseMessages -from src.common.logger import get_logger -from src.config.config import global_config - -if TYPE_CHECKING: - from src.chat.message_receive.chat_stream import ChatStream - -install(extra_lines=3) - - -logger = get_logger("chat_message") - -# 禁用SSL警告 -urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - -# 这个类是消息数据类,用于存储和管理消息数据。 -# 它定义了消息的属性,包括群组ID、用户ID、消息ID、原始消息内容、纯文本内容和时间戳。 -# 它还定义了两个辅助属性:keywords用于提取消息的关键词,is_plain_text用于判断消息是否为纯文本。 - - -@dataclass -class Message(MessageBase, metaclass=ABCMeta): - chat_stream: Optional["ChatStream"] = None - reply: Optional["Message"] = None - processed_plain_text: str = "" - memorized_times: int = 0 - - def __init__( - self, - message_id: str, - chat_stream: "ChatStream", - user_info: UserInfo, - message_segment: Seg | None = None, - timestamp: float | None = None, - reply: Optional["DatabaseMessages"] = None, - processed_plain_text: str = "", - ): - # 使用传入的时间戳或当前时间 - current_timestamp = timestamp if timestamp is not None else round(time.time(), 3) - # 构造基础消息信息 - message_info = BaseMessageInfo( - platform=chat_stream.platform, - message_id=message_id, - time=current_timestamp, - group_info=chat_stream.group_info, - user_info=user_info, - ) - - # 调用父类初始化 - super().__init__(message_info=message_info, message_segment=message_segment, raw_message=None) # type: ignore - - self.chat_stream = chat_stream - # 文本处理相关属性 - self.processed_plain_text = processed_plain_text - - # 回复消息 - self.reply = reply - - async def _process_message_segments(self, segment: Seg) -> str: - # sourcery skip: remove-unnecessary-else, swap-if-else-branches - """递归处理消息段,转换为文字描述 - - Args: - segment: 要处理的消息段 - - Returns: - str: 处理后的文本 - """ - if segment.type == "seglist": - # 处理消息段列表 - segments_text = [] - for seg in segment.data: - processed = await self._process_message_segments(seg) # type: ignore - if processed: - segments_text.append(processed) - return " ".join(segments_text) - else: - # 处理单个消息段 - return await self._process_single_segment(segment) # type: ignore - - @abstractmethod - async def _process_single_segment(self, segment): - pass - - -@dataclass - -# MessageRecv 类已被完全移除,现在统一使用 DatabaseMessages -# 如需从消息字典创建 DatabaseMessages,请使用: -# from src.chat.message_receive.message_processor import process_message_from_dict -# -# 迁移完成日期: 2025-10-31 - - -@dataclass -class MessageProcessBase(Message): - """消息处理基类,用于处理中和发送中的消息""" - - def __init__( - self, - message_id: str, - chat_stream: "ChatStream", - bot_user_info: UserInfo, - message_segment: Seg | None = None, - reply: Optional["DatabaseMessages"] = None, - thinking_start_time: float = 0, - timestamp: float | None = None, - ): - # 调用父类初始化,传递时间戳 - super().__init__( - message_id=message_id, - timestamp=timestamp, - chat_stream=chat_stream, - user_info=bot_user_info, - message_segment=message_segment, - reply=reply, - ) - - # 处理状态相关属性 - self.thinking_start_time = thinking_start_time - self.thinking_time = 0 - - def update_thinking_time(self) -> float: - """更新思考时间""" - self.thinking_time = round(time.time() - self.thinking_start_time, 2) - return self.thinking_time - - async def _process_single_segment(self, seg: Seg) -> str | None: - """处理单个消息段 - - Args: - seg: 要处理的消息段 - - Returns: - str: 处理后的文本 - """ - try: - if seg.type == "text": - return seg.data # type: ignore - elif seg.type == "image": - # 如果是base64图片数据 - if isinstance(seg.data, str): - return await get_image_manager().get_image_description(seg.data) - return "[图片,网卡了加载不出来]" - elif seg.type == "emoji": - if isinstance(seg.data, str): - return await get_image_manager().get_emoji_tag(seg.data) - return "[表情,网卡了加载不出来]" - elif seg.type == "voice": - # 检查消息是否由机器人自己发送 - # self.message_info 来自 MessageBase,指当前消息的信息 - if self.message_info and self.message_info.user_info and str(self.message_info.user_info.user_id) == str(global_config.bot.qq_account): - logger.info(f"检测到机器人自身发送的语音消息 (User ID: {self.message_info.user_info.user_id}),尝试从缓存获取文本。") - if isinstance(seg.data, str): - cached_text = consume_self_voice_text(seg.data) - if cached_text: - logger.info(f"成功从缓存中获取语音文本: '{cached_text[:70]}...'") - return f"[语音:{cached_text}]" - else: - logger.warning("机器人自身语音消息缓存未命中,将回退到标准语音识别。") - - # 标准语音识别流程 (也作为缓存未命中的后备方案) - if isinstance(seg.data, str): - return await get_voice_text(seg.data) - return "[发了一段语音,网卡了加载不出来]" - elif seg.type == "at": - # 处理at消息,格式为"昵称:QQ号" - if isinstance(seg.data, str) and ":" in seg.data: - nickname, qq_id = seg.data.split(":", 1) - return f"@{nickname}" - return f"@{seg.data}" if isinstance(seg.data, str) else "@未知用户" - elif seg.type == "reply": - # 处理回复消息段 - if self.reply: - # 检查 reply 对象是否有必要的属性 - if hasattr(self.reply, "processed_plain_text") and self.reply.processed_plain_text: - # DatabaseMessages 使用 user_info 而不是 message_info.user_info - user_nickname = self.reply.user_info.user_nickname if self.reply.user_info else "未知用户" - user_id = self.reply.user_info.user_id if self.reply.user_info else "" - return f"[回复<{user_nickname}({user_id})> 的消息:{self.reply.processed_plain_text}]" - else: - # reply 对象存在但没有 processed_plain_text,返回简化的回复标识 - logger.debug(f"reply 消息段没有 processed_plain_text 属性,message_id: {getattr(self.reply, 'message_id', 'unknown')}") - return "[回复消息]" - else: - # 没有 reply 对象,但有 reply 消息段(可能是机器人自己发送的消息) - # 这种情况下 seg.data 应该包含被回复消息的 message_id - if isinstance(seg.data, str): - logger.debug(f"处理 reply 消息段,但 self.reply 为 None,reply_to message_id: {seg.data}") - return f"[回复消息 {seg.data}]" - return None - else: - return f"[{seg.type}:{seg.data!s}]" - except Exception as e: - logger.error(f"处理消息段失败: {e!s}, 类型: {seg.type}, 数据: {seg.data}") - return f"[处理失败的{seg.type}消息]" - - def _generate_detailed_text(self) -> str: - """生成详细文本,包含时间和用户信息""" - # time_str = time.strftime("%m-%d %H:%M:%S", time.localtime(self.message_info.time)) - timestamp = self.message_info.time - user_info = self.message_info.user_info - - name = f"<{self.message_info.platform}:{user_info.user_id}:{user_info.user_nickname}:{user_info.user_cardname}>" # type: ignore - return f"[{timestamp}],{name} 说:{self.processed_plain_text}\n" - - -@dataclass -class MessageSending(MessageProcessBase): - """发送状态的消息类""" - - def __init__( - self, - message_id: str, - chat_stream: "ChatStream", - bot_user_info: UserInfo, - sender_info: UserInfo | None, # 用来记录发送者信息 - message_segment: Seg, - display_message: str = "", - reply: Optional["DatabaseMessages"] = None, - is_head: bool = False, - is_emoji: bool = False, - thinking_start_time: float = 0, - apply_set_reply_logic: bool = False, - reply_to: str | None = None, - ): - # 调用父类初始化 - super().__init__( - message_id=message_id, - chat_stream=chat_stream, - bot_user_info=bot_user_info, - message_segment=message_segment, - reply=reply, - thinking_start_time=thinking_start_time, - ) - - # 发送状态特有属性 - self.sender_info = sender_info - # 从 DatabaseMessages 获取 message_id - if reply: - self.reply_to_message_id = reply.message_id - else: - self.reply_to_message_id = None - self.is_head = is_head - self.is_emoji = is_emoji - self.apply_set_reply_logic = apply_set_reply_logic - - self.reply_to = reply_to - - # 用于显示发送内容与显示不一致的情况 - self.display_message = display_message - - self.interest_value = 0.0 - - def build_reply(self): - """设置回复消息""" - if self.reply: - # 从 DatabaseMessages 获取 message_id - message_id = self.reply.message_id - - if message_id: - self.reply_to_message_id = message_id - self.message_segment = Seg( - type="seglist", - data=[ - Seg(type="reply", data=message_id), # type: ignore - self.message_segment, - ], - ) - - async def process(self) -> None: - """处理消息内容,生成纯文本和详细文本""" - if self.message_segment: - self.processed_plain_text = await self._process_message_segments(self.message_segment) - - def to_dict(self): - ret = super().to_dict() - if self.chat_stream and self.chat_stream.user_info: - ret["message_info"]["user_info"] = self.chat_stream.user_info.to_dict() - return ret - - def is_private_message(self) -> bool: - """判断是否为私聊消息""" - return self.message_info.group_info is None or self.message_info.group_info.group_id is None - - -# message_recv_from_dict 和 message_from_db_dict 函数已被移除 -# 请使用: from src.chat.message_receive.message_processor import process_message_from_dict diff --git a/src/chat/message_receive/storage.py b/src/chat/message_receive/storage.py index 8e65245c7..0ffc1d8da 100644 --- a/src/chat/message_receive/storage.py +++ b/src/chat/message_receive/storage.py @@ -13,8 +13,6 @@ from src.common.database.core import get_db_session from src.common.database.core.models import Images, Messages from src.common.logger import get_logger -from .message import MessageSending - if TYPE_CHECKING: from src.chat.message_receive.chat_stream import ChatStream @@ -73,7 +71,7 @@ class MessageStorageBatcher: Args: message_data: 包含消息对象和chat_stream的字典 { - 'message': DatabaseMessages | MessageSending, + 'message': DatabaseMessages, 'chat_stream': ChatStream } """ @@ -153,150 +151,61 @@ class MessageStorageBatcher: async def _prepare_message_object(self, message, chat_stream): """准备消息对象(从原 store_message 逻辑提取)""" try: - # 过滤敏感信息的正则模式 pattern = r".*?|.*?|.*?" - # 如果是 DatabaseMessages,直接使用它的字段 - if isinstance(message, DatabaseMessages): - processed_plain_text = message.processed_plain_text - if processed_plain_text: - processed_plain_text = await MessageStorage.replace_image_descriptions(processed_plain_text) - safe_processed_plain_text = processed_plain_text or "" - filtered_processed_plain_text = re.sub(pattern, "", safe_processed_plain_text, flags=re.DOTALL) - else: - filtered_processed_plain_text = "" + if not isinstance(message, DatabaseMessages): + logger.error("MessageStorageBatcher expects DatabaseMessages instances") + return None - display_message = message.display_message or message.processed_plain_text or "" - filtered_display_message = re.sub(pattern, "", display_message, flags=re.DOTALL) + processed_plain_text = message.processed_plain_text or "" + if processed_plain_text: + processed_plain_text = await MessageStorage.replace_image_descriptions(processed_plain_text) + filtered_processed_plain_text = re.sub( + pattern, processed_plain_text or "", flags=re.DOTALL + ) - msg_id = message.message_id - msg_time = message.time - chat_id = message.chat_id - reply_to = "" - is_mentioned = message.is_mentioned - interest_value = message.interest_value or 0.0 - priority_mode = "" - priority_info_json = None - is_emoji = message.is_emoji or False - is_picid = message.is_picid or False - is_notify = message.is_notify or False - is_command = message.is_command or False - is_public_notice = message.is_public_notice or False - notice_type = message.notice_type - # 序列化actions列表为JSON字符串 - actions = orjson.dumps(message.actions).decode("utf-8") if message.actions else None - should_reply = message.should_reply - should_act = message.should_act - additional_config = message.additional_config - # 确保关键词字段是字符串格式(如果不是,则序列化) - key_words = MessageStorage._serialize_keywords(message.key_words) - key_words_lite = MessageStorage._serialize_keywords(message.key_words_lite) - memorized_times = 0 + display_message = message.display_message or message.processed_plain_text or "" + filtered_display_message = re.sub(pattern, display_message, flags=re.DOTALL) - user_platform = message.user_info.platform if message.user_info else "" - user_id = message.user_info.user_id if message.user_info else "" - user_nickname = message.user_info.user_nickname if message.user_info else "" - user_cardname = message.user_info.user_cardname if message.user_info else None + msg_id = message.message_id + msg_time = message.time + chat_id = message.chat_id + reply_to = message.reply_to or "" + is_mentioned = message.is_mentioned + interest_value = message.interest_value or 0.0 + priority_mode = message.priority_mode + priority_info_json = message.priority_info + is_emoji = message.is_emoji or False + is_picid = message.is_picid or False + is_notify = message.is_notify or False + is_command = message.is_command or False + is_public_notice = message.is_public_notice or False + notice_type = message.notice_type + actions = orjson.dumps(message.actions).decode("utf-8") if message.actions else None + should_reply = message.should_reply + should_act = message.should_act + additional_config = message.additional_config + key_words = MessageStorage._serialize_keywords(message.key_words) + key_words_lite = MessageStorage._serialize_keywords(message.key_words_lite) + memorized_times = getattr(message, 'memorized_times', 0) - chat_info_stream_id = message.chat_info.stream_id if message.chat_info else "" - chat_info_platform = message.chat_info.platform if message.chat_info else "" - chat_info_create_time = message.chat_info.create_time if message.chat_info else 0.0 - chat_info_last_active_time = message.chat_info.last_active_time if message.chat_info else 0.0 - chat_info_user_platform = message.chat_info.user_info.platform if message.chat_info and message.chat_info.user_info else "" - chat_info_user_id = message.chat_info.user_info.user_id if message.chat_info and message.chat_info.user_info else "" - chat_info_user_nickname = message.chat_info.user_info.user_nickname if message.chat_info and message.chat_info.user_info else "" - chat_info_user_cardname = message.chat_info.user_info.user_cardname if message.chat_info and message.chat_info.user_info else None - chat_info_group_platform = message.group_info.group_platform if message.group_info else None - chat_info_group_id = message.group_info.group_id if message.group_info else None - chat_info_group_name = message.group_info.group_name if message.group_info else None + user_platform = message.user_info.platform if message.user_info else "" + user_id = message.user_info.user_id if message.user_info else "" + user_nickname = message.user_info.user_nickname if message.user_info else "" + user_cardname = message.user_info.user_cardname if message.user_info else None - else: - # MessageSending 处理逻辑 - processed_plain_text = message.processed_plain_text + chat_info_stream_id = message.chat_info.stream_id if message.chat_info else "" + chat_info_platform = message.chat_info.platform if message.chat_info else "" + chat_info_create_time = message.chat_info.create_time if message.chat_info else 0.0 + chat_info_last_active_time = message.chat_info.last_active_time if message.chat_info else 0.0 + chat_info_user_platform = message.chat_info.user_info.platform if message.chat_info and message.chat_info.user_info else "" + chat_info_user_id = message.chat_info.user_info.user_id if message.chat_info and message.chat_info.user_info else "" + chat_info_user_nickname = message.chat_info.user_info.user_nickname if message.chat_info and message.chat_info.user_info else "" + chat_info_user_cardname = message.chat_info.user_info.user_cardname if message.chat_info and message.chat_info.user_info else None + chat_info_group_platform = message.group_info.group_platform if message.group_info else None + chat_info_group_id = message.group_info.group_id if message.group_info else None + chat_info_group_name = message.group_info.group_name if message.group_info else None - if processed_plain_text: - processed_plain_text = await MessageStorage.replace_image_descriptions(processed_plain_text) - safe_processed_plain_text = processed_plain_text or "" - filtered_processed_plain_text = re.sub(pattern, "", safe_processed_plain_text, flags=re.DOTALL) - else: - filtered_processed_plain_text = "" - - if isinstance(message, MessageSending): - display_message = message.display_message - if display_message: - filtered_display_message = re.sub(pattern, "", display_message, flags=re.DOTALL) - else: - filtered_display_message = re.sub(pattern, "", (message.processed_plain_text or ""), flags=re.DOTALL) - interest_value = 0 - is_mentioned = False - reply_to = message.reply_to - priority_mode = "" - priority_info = {} - is_emoji = False - is_picid = False - is_notify = False - is_command = False - is_public_notice = False - notice_type = None - actions = None - should_reply = None - should_act = None - additional_config = None - key_words = "" - key_words_lite = "" - else: - filtered_display_message = "" - interest_value = message.interest_value - is_mentioned = message.is_mentioned - reply_to = "" - priority_mode = message.priority_mode - priority_info = message.priority_info - is_emoji = message.is_emoji - is_picid = message.is_picid - is_notify = message.is_notify - is_command = message.is_command - is_public_notice = getattr(message, "is_public_notice", False) - notice_type = getattr(message, "notice_type", None) - # 序列化actions列表为JSON字符串 - actions = orjson.dumps(getattr(message, "actions", None)).decode("utf-8") if getattr(message, "actions", None) else None - should_reply = getattr(message, "should_reply", None) - should_act = getattr(message, "should_act", None) - additional_config = getattr(message, "additional_config", None) - key_words = MessageStorage._serialize_keywords(message.key_words) - key_words_lite = MessageStorage._serialize_keywords(message.key_words_lite) - - chat_info_dict = chat_stream.to_dict() - user_info_dict = message.message_info.user_info.to_dict() - - msg_id = message.message_info.message_id - msg_time = float(message.message_info.time or time.time()) - chat_id = chat_stream.stream_id - memorized_times = message.memorized_times - - group_info_from_chat = chat_info_dict.get("group_info") or {} - user_info_from_chat = chat_info_dict.get("user_info") or {} - - priority_info_json = orjson.dumps(priority_info).decode("utf-8") if priority_info else None - - user_platform = user_info_dict.get("platform") - user_id = user_info_dict.get("user_id") - # 将机器人自己的user_id标记为"SELF",增强对自我身份的识别 - user_nickname = user_info_dict.get("user_nickname") - user_cardname = user_info_dict.get("user_cardname") - - chat_info_stream_id = chat_info_dict.get("stream_id") - chat_info_platform = chat_info_dict.get("platform") - chat_info_create_time = float(chat_info_dict.get("create_time", 0.0)) - chat_info_last_active_time = float(chat_info_dict.get("last_active_time", 0.0)) - chat_info_user_platform = user_info_from_chat.get("platform") - chat_info_user_id = user_info_from_chat.get("user_id") - chat_info_user_nickname = user_info_from_chat.get("user_nickname") - chat_info_user_cardname = user_info_from_chat.get("user_cardname") - chat_info_group_platform = group_info_from_chat.get("platform") - chat_info_group_id = group_info_from_chat.get("group_id") - chat_info_group_name = group_info_from_chat.get("group_name") - - # 创建消息对象 return Messages( message_id=msg_id, time=msg_time, @@ -481,216 +390,34 @@ class MessageStorage: return [] @staticmethod - async def store_message(message: DatabaseMessages | MessageSending, chat_stream: "ChatStream", use_batch: bool = True) -> None: + async def store_message(message: DatabaseMessages, chat_stream: "ChatStream", use_batch: bool = True) -> None: """ 存储消息到数据库 Args: message: 消息对象 chat_stream: 聊天流对象 - use_batch: 是否使用批处理(默认True,推荐)。设为False时立即写入数据库。 + use_batch: 是否使用批处理,默认True,设置为False时直接写入数据库 """ - # 使用批处理器(推荐) if use_batch: batcher = get_message_storage_batcher() - await batcher.add_message({ - "message": message, - "chat_stream": chat_stream - }) + await batcher.add_message({"message": message, "chat_stream": chat_stream}) return - # 直接写入模式(保留用于特殊场景) try: - # 过滤敏感信息的正则模式 - pattern = r".*?|.*?|.*?" + # 直接存储消息(非批处理模式) + batcher = MessageStorageBatcher() + message_obj = await batcher._prepare_message_object(message, chat_stream) + if message_obj is None: + return - # 如果是 DatabaseMessages,直接使用它的字段 - if isinstance(message, DatabaseMessages): - processed_plain_text = message.processed_plain_text - if processed_plain_text: - processed_plain_text = await MessageStorage.replace_image_descriptions(processed_plain_text) - safe_processed_plain_text = processed_plain_text or "" - filtered_processed_plain_text = re.sub(pattern, "", safe_processed_plain_text, flags=re.DOTALL) - else: - filtered_processed_plain_text = "" - - display_message = message.display_message or message.processed_plain_text or "" - filtered_display_message = re.sub(pattern, "", display_message, flags=re.DOTALL) - - # 直接从 DatabaseMessages 获取所有字段 - msg_id = message.message_id - msg_time = message.time - chat_id = message.chat_id - reply_to = "" # DatabaseMessages 没有 reply_to 字段 - is_mentioned = message.is_mentioned - interest_value = message.interest_value or 0.0 - priority_mode = "" # DatabaseMessages 没有 priority_mode - priority_info_json = None # DatabaseMessages 没有 priority_info - is_emoji = message.is_emoji or False - is_picid = message.is_picid or False - is_notify = message.is_notify or False - is_command = message.is_command or False - key_words = "" # DatabaseMessages 没有 key_words - key_words_lite = "" - memorized_times = 0 # DatabaseMessages 没有 memorized_times - - # 使用 DatabaseMessages 中的嵌套对象信息 - user_platform = message.user_info.platform if message.user_info else "" - user_id = message.user_info.user_id if message.user_info else "" - user_nickname = message.user_info.user_nickname if message.user_info else "" - user_cardname = message.user_info.user_cardname if message.user_info else None - - chat_info_stream_id = message.chat_info.stream_id if message.chat_info else "" - chat_info_platform = message.chat_info.platform if message.chat_info else "" - chat_info_create_time = message.chat_info.create_time if message.chat_info else 0.0 - chat_info_last_active_time = message.chat_info.last_active_time if message.chat_info else 0.0 - chat_info_user_platform = message.chat_info.user_info.platform if message.chat_info and message.chat_info.user_info else "" - chat_info_user_id = message.chat_info.user_info.user_id if message.chat_info and message.chat_info.user_info else "" - chat_info_user_nickname = message.chat_info.user_info.user_nickname if message.chat_info and message.chat_info.user_info else "" - chat_info_user_cardname = message.chat_info.user_info.user_cardname if message.chat_info and message.chat_info.user_info else None - chat_info_group_platform = message.group_info.group_platform if message.group_info else None - chat_info_group_id = message.group_info.group_id if message.group_info else None - chat_info_group_name = message.group_info.group_name if message.group_info else None - - else: - # MessageSending 处理逻辑 - processed_plain_text = message.processed_plain_text - - if processed_plain_text: - processed_plain_text = await MessageStorage.replace_image_descriptions(processed_plain_text) - # 增加对None的防御性处理 - safe_processed_plain_text = processed_plain_text or "" - filtered_processed_plain_text = re.sub(pattern, "", safe_processed_plain_text, flags=re.DOTALL) - else: - filtered_processed_plain_text = "" - - if isinstance(message, MessageSending): - display_message = message.display_message - if display_message: - filtered_display_message = re.sub(pattern, "", display_message, flags=re.DOTALL) - else: - # 如果没有设置display_message,使用processed_plain_text作为显示消息 - filtered_display_message = ( - re.sub(pattern, "", (message.processed_plain_text or ""), flags=re.DOTALL) - ) - interest_value = 0 - is_mentioned = False - reply_to = message.reply_to - priority_mode = "" - priority_info = {} - is_emoji = False - is_picid = False - is_notify = False - is_command = False - is_public_notice = False - notice_type = None - actions = None - should_reply = False - should_act = False - key_words = "" - key_words_lite = "" - else: - filtered_display_message = "" - interest_value = message.interest_value - is_mentioned = message.is_mentioned - reply_to = "" - priority_mode = message.priority_mode - priority_info = message.priority_info - is_emoji = message.is_emoji - is_picid = message.is_picid - is_notify = message.is_notify - is_command = message.is_command - is_public_notice = getattr(message, "is_public_notice", False) - notice_type = getattr(message, "notice_type", None) - # 序列化actions列表为JSON字符串 - actions = orjson.dumps(getattr(message, "actions", None)).decode("utf-8") if getattr(message, "actions", None) else None - should_reply = getattr(message, "should_reply", False) - should_act = getattr(message, "should_act", False) - # 序列化关键词列表为JSON字符串 - key_words = MessageStorage._serialize_keywords(message.key_words) - key_words_lite = MessageStorage._serialize_keywords(message.key_words_lite) - - chat_info_dict = chat_stream.to_dict() - user_info_dict = message.message_info.user_info.to_dict() # type: ignore - - # message_id 现在是 TextField,直接使用字符串值 - msg_id = message.message_info.message_id - msg_time = float(message.message_info.time or time.time()) - chat_id = chat_stream.stream_id - memorized_times = message.memorized_times - - # 安全地获取 group_info, 如果为 None 则视为空字典 - group_info_from_chat = chat_info_dict.get("group_info") or {} - # 安全地获取 user_info, 如果为 None 则视为空字典 (以防万一) - user_info_from_chat = chat_info_dict.get("user_info") or {} - - # 将priority_info字典序列化为JSON字符串,以便存储到数据库的Text字段 - priority_info_json = orjson.dumps(priority_info).decode("utf-8") if priority_info else None - - user_platform = user_info_dict.get("platform") - user_id = user_info_dict.get("user_id") - user_nickname = user_info_dict.get("user_nickname") - user_cardname = user_info_dict.get("user_cardname") - - chat_info_stream_id = chat_info_dict.get("stream_id") - chat_info_platform = chat_info_dict.get("platform") - chat_info_create_time = float(chat_info_dict.get("create_time", 0.0)) - chat_info_last_active_time = float(chat_info_dict.get("last_active_time", 0.0)) - chat_info_user_platform = user_info_from_chat.get("platform") - chat_info_user_id = user_info_from_chat.get("user_id") - chat_info_user_nickname = user_info_from_chat.get("user_nickname") - chat_info_user_cardname = user_info_from_chat.get("user_cardname") - chat_info_group_platform = group_info_from_chat.get("platform") - chat_info_group_id = group_info_from_chat.get("group_id") - chat_info_group_name = group_info_from_chat.get("group_name") - - # 获取数据库会话 - new_message = Messages( - message_id=msg_id, - time=msg_time, - chat_id=chat_id, - reply_to=reply_to, - is_mentioned=is_mentioned, - chat_info_stream_id=chat_info_stream_id, - chat_info_platform=chat_info_platform, - chat_info_user_platform=chat_info_user_platform, - chat_info_user_id=chat_info_user_id, - chat_info_user_nickname=chat_info_user_nickname, - chat_info_user_cardname=chat_info_user_cardname, - chat_info_group_platform=chat_info_group_platform, - chat_info_group_id=chat_info_group_id, - chat_info_group_name=chat_info_group_name, - chat_info_create_time=chat_info_create_time, - chat_info_last_active_time=chat_info_last_active_time, - user_platform=user_platform, - user_id=user_id, - user_nickname=user_nickname, - user_cardname=user_cardname, - processed_plain_text=filtered_processed_plain_text, - display_message=filtered_display_message, - memorized_times=memorized_times, - interest_value=interest_value, - priority_mode=priority_mode, - priority_info=priority_info_json, - is_emoji=is_emoji, - is_picid=is_picid, - is_notify=is_notify, - is_command=is_command, - is_public_notice=is_public_notice, - notice_type=notice_type, - actions=actions, - should_reply=should_reply, - should_act=should_act, - key_words=key_words, - key_words_lite=key_words_lite, - ) async with get_db_session() as session: - session.add(new_message) + session.add(message_obj) await session.commit() except Exception: logger.exception("存储消息失败") - logger.error(f"消息:{message}") + logger.error(f"消息: {message}") traceback.print_exc() @staticmethod diff --git a/src/chat/message_receive/uni_message_sender.py b/src/chat/message_receive/uni_message_sender.py index 9d72504c4..99f5246e6 100644 --- a/src/chat/message_receive/uni_message_sender.py +++ b/src/chat/message_receive/uni_message_sender.py @@ -1,71 +1,61 @@ -""" -统一消息发送器 +"""统一消息发送器""" -重构说明(2025-11): -- 使用 CoreSinkManager 发送消息,而不是直接通过 WS 连接 -- MessageServer 仅作为与旧适配器的兼容层 -- 所有发送的消息都通过 CoreSinkManager.send_outgoing() 路由到适配器 -""" +from __future__ import annotations import asyncio import traceback -import time -import uuid -from typing import Any, cast +from typing import TYPE_CHECKING from rich.traceback import install from mofox_bus import MessageEnvelope -from src.chat.message_receive.message import MessageSending +from src.chat.message_receive.message_processor import process_message_from_dict from src.chat.message_receive.storage import MessageStorage from src.chat.utils.utils import calculate_typing_time, truncate_message +from src.common.data_models.database_data_model import DatabaseMessages from src.common.logger import get_logger +from src.config.config import global_config + +if TYPE_CHECKING: + from src.chat.message_receive.chat_stream import ChatStream install(extra_lines=3) logger = get_logger("sender") -async def send_message(message: MessageSending, show_log=True) -> bool: - """ - 合并后的消息发送函数 - - 重构后使用 CoreSinkManager 发送消息,而不是直接调用 MessageServer - - Args: - message: 要发送的消息 - show_log: 是否显示日志 - - Returns: - bool: 是否发送成功 - """ - message_preview = truncate_message(message.processed_plain_text, max_length=120) +async def send_envelope( + envelope: MessageEnvelope, + chat_stream: "ChatStream" | None = None, + db_message: DatabaseMessages | None = None, + show_log: bool = True, +) -> bool: + """发送消息""" + message_preview = truncate_message( + (db_message.processed_plain_text if db_message else str(envelope.get("message_segment", ""))), + max_length=120, + ) try: - # 将 MessageSending 转换为 MessageEnvelope - envelope = _message_sending_to_envelope(message) - - # 通过 CoreSinkManager 发送 from src.common.core_sink_manager import get_core_sink_manager - + manager = get_core_sink_manager() await manager.send_outgoing(envelope) - - if show_log: - logger.info(f"已将消息 '{message_preview}' 发往平台'{message.message_info.platform}'") - # 触发 AFTER_SEND 事件 + if show_log: + logger.info(f"已将消息 '{message_preview}' 发送到平台'{envelope.get('platform')}'") + try: from src.plugin_system.base.component_types import EventType from src.plugin_system.core.event_manager import event_manager - if message.chat_stream: + if chat_stream: event_manager.emit_event( EventType.AFTER_SEND, permission_group="SYSTEM", - stream_id=message.chat_stream.stream_id, - message=message, + stream_id=chat_stream.stream_id, + message=db_message or envelope, ) except Exception as event_error: logger.error(f"触发 AFTER_SEND 事件时出错: {event_error}", exc_info=True) @@ -73,201 +63,83 @@ async def send_message(message: MessageSending, show_log=True) -> bool: return True except Exception as e: - logger.error(f"发送消息 '{message_preview}' 发往平台'{message.message_info.platform}' 失败: {e!s}") + logger.error(f"发送消息 '{message_preview}' 到平台'{envelope.get('platform')}' 失败: {e!s}") traceback.print_exc() - raise e - - -def _message_sending_to_envelope(message: MessageSending) -> MessageEnvelope: - """ - 将 MessageSending 转换为 MessageEnvelope - - Args: - message: MessageSending 对象 - - Returns: - MessageEnvelope: 消息信封 - """ - # 构建消息信息 - message_info: dict[str, Any] = { - "message_id": message.message_info.message_id, - "time": message.message_info.time or time.time(), - "platform": message.message_info.platform, - "user_info": { - "user_id": message.message_info.user_info.user_id, - "user_nickname": message.message_info.user_info.user_nickname, - "platform": message.message_info.user_info.platform, - } if message.message_info.user_info else None, - } - - # 添加群组信息(如果有) - if message.chat_stream and message.chat_stream.group_info: - message_info["group_info"] = { - "group_id": message.chat_stream.group_info.group_id, - "group_name": message.chat_stream.group_info.group_name, - "platform": message.chat_stream.group_info.group_platform, - } - - # 构建消息段 - message_segment: dict[str, Any] - if message.message_segment: - message_segment = { - "type": message.message_segment.type, - "data": message.message_segment.data, - } - else: - # 默认为文本消息 - message_segment = { - "type": "text", - "data": message.processed_plain_text or "", - } - - # 添加回复信息(如果有) - if message.reply_to: - message_segment["reply_to"] = message.reply_to - - # 构建消息信封 - envelope = cast(MessageEnvelope, { - "id": str(uuid.uuid4()), - "direction": "outgoing", - "platform": message.message_info.platform, - "message_info": message_info, - "message_segment": message_segment, - }) - - return envelope + raise class HeartFCSender: - """管理消息的注册、即时处理、发送和存储,并跟踪思考状态。""" - - def __init__(self): - self.storage = MessageStorage() + """发送消息并负责存储、上下文更新等后续处理.""" async def send_message( - self, message: MessageSending, typing=False, set_reply=False, storage_message=True, show_log=True - ): - """ - 处理、发送并存储一条消息。 - - 参数: - message: MessageSending 对象,待发送的消息。 - typing: 是否模拟打字等待。 - - 用法: - - typing=True 时,发送前会有打字等待。 - """ - if not message.chat_stream: + self, + envelope: MessageEnvelope, + chat_stream: "ChatStream", + *, + typing: bool = False, + storage_message: bool = True, + show_log: bool = True, + thinking_start_time: float = 0.0, + display_message: str | None = None, + ) -> bool: + if not chat_stream: logger.error("消息缺少 chat_stream,无法发送") raise ValueError("消息缺少 chat_stream,无法发送") - if not message.message_info or not message.message_info.message_id: - logger.error("消息缺少 message_info 或 message_id,无法发送") - raise ValueError("消息缺少 message_info 或 message_id,无法发送") - - chat_id = message.chat_stream.stream_id - message_id = message.message_info.message_id try: - if set_reply: - message.build_reply() - logger.debug(f"[{chat_id}] 选择回复引用消息: {message.processed_plain_text[:20]}...") + db_message = await process_message_from_dict( + message_dict=envelope, + stream_id=chat_stream.stream_id, + platform=chat_stream.platform, + ) - await message.process() + # 使用调用方指定的展示文本 + if display_message: + db_message.display_message = display_message + if db_message.processed_plain_text is None: + db_message.processed_plain_text = "" + # 填充基础字段,确保上下文和存储一致 + db_message.is_read = True + db_message.should_reply = False + db_message.should_act = False + if db_message.interest_value is None: + db_message.interest_value = 0.5 + + db_message.chat_info.create_time = chat_stream.create_time + db_message.chat_info.last_active_time = chat_stream.last_active_time + + # 可选的打字机等待 if typing: typing_time = calculate_typing_time( - input_string=message.processed_plain_text, - thinking_start_time=message.thinking_start_time, - is_emoji=message.is_emoji, + input_string=db_message.processed_plain_text or "", + thinking_start_time=thinking_start_time, + is_emoji=bool(getattr(db_message, "is_emoji", False)), ) await asyncio.sleep(typing_time) - sent_msg = await send_message(message, show_log=show_log) - if not sent_msg: - return False + await send_envelope(envelope, chat_stream=chat_stream, db_message=db_message, show_log=show_log) if storage_message: - await self.storage.store_message(message, message.chat_stream) + await MessageStorage.store_message(db_message, chat_stream) - # 修复Send API消息不入流上下文的问题 - # 将Send API发送的消息也添加到流上下文中,确保后续对话可以引用 + # 将发送的消息写入上下文历史 try: - # 将MessageSending转换为DatabaseMessages - db_message = await self._convert_to_database_message(message) - if db_message and message.chat_stream.context: - context = message.chat_stream.context - - # 应用历史消息长度限制 - from src.config.config import global_config + if chat_stream.context: + context = chat_stream.context max_context_size = getattr(global_config.chat, "max_context_size", 40) if len(context.history_messages) >= max_context_size: - # 移除最旧的历史消息以保持长度限制 - removed_count = 1 - context.history_messages = context.history_messages[removed_count:] - logger.debug(f"[{chat_id}] Send API添加前移除了 {removed_count} 条历史消息以保持上下文大小限制") + context.history_messages = context.history_messages[1:] + logger.debug(f"[{chat_stream.stream_id}] Send API发送前移除 1 条历史消息以控制上下文大小") context.history_messages.append(db_message) - logger.debug(f"[{chat_id}] Send API消息已添加到流上下文: {message_id}") + logger.debug(f"[{chat_stream.stream_id}] Send API消息已写入上下文: {db_message.message_id}") except Exception as context_error: - logger.warning(f"[{chat_id}] 将Send API消息添加到流上下文失败: {context_error}") + logger.warning(f"[{chat_stream.stream_id}] 将消息写入上下文失败: {context_error}") - return sent_msg + return True except Exception as e: - logger.error(f"[{chat_id}] 处理或存储消息 {message_id} 时出错: {e}") - raise e - - async def _convert_to_database_message(self, message: MessageSending): - """将MessageSending对象转换为DatabaseMessages对象 - - Args: - message: MessageSending对象 - - Returns: - DatabaseMessages: 转换后的数据库消息对象,如果转换失败则返回None - """ - try: - from src.common.data_models.database_data_model import DatabaseMessages - - # 构建用户信息 - Send API发送的消息,bot是发送者 - # bot_user_info 存储在 message_info.user_info 中,而不是单独的 bot_user_info 属性 - bot_user_info = message.message_info.user_info - - # 构建聊天信息 - chat_info = message.message_info - chat_stream = message.chat_stream - - # 获取群组信息 - group_id = None - group_name = None - if chat_stream and chat_stream.group_info: - group_id = chat_stream.group_info.group_id - group_name = chat_stream.group_info.group_name - - # 创建DatabaseMessages对象 - db_message = DatabaseMessages( - message_id=message.message_info.message_id, - time=chat_info.time or 0.0, - user_id=bot_user_info.user_id, - user_nickname=bot_user_info.user_nickname, - user_cardname=bot_user_info.user_nickname, # 使用nickname作为cardname - user_platform=chat_info.platform or "", - chat_info_group_id=group_id, - chat_info_group_name=group_name, - chat_info_group_platform=chat_info.platform if group_id else None, - chat_info_platform=chat_info.platform or "", - processed_plain_text=message.processed_plain_text or "", - display_message=message.display_message or "", - is_read=True, # 新消息标记为已读 - interest_value=0.5, # 默认兴趣值 - should_reply=False, # 自己发送的消息不需要回复 - should_act=False, # 自己发送的消息不需要执行动作 - is_mentioned=False, # 自己发送的消息默认不提及 - ) - - return db_message - - except Exception as e: - logger.error(f"转换MessageSending到DatabaseMessages失败: {e}", exc_info=True) - return None + logger.error(f"[{chat_stream.stream_id}] 发送或存储消息时出错: {e}") + raise diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index baebbcfa5..c1334d08c 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -8,11 +8,13 @@ import random import re import time import traceback +import uuid from datetime import datetime, timedelta from typing import Any, Literal, TYPE_CHECKING from src.chat.express.expression_selector import expression_selector -from src.chat.message_receive.message import MessageSending, Seg, UserInfo +from mofox_bus import MessageEnvelope +from src.chat.message_receive.message import Seg, UserInfo from src.chat.message_receive.uni_message_sender import HeartFCSender from src.chat.utils.chat_message_builder import ( build_readable_messages, @@ -1770,8 +1772,8 @@ class DefaultReplyer: thinking_start_time: float, display_message: str, anchor_message: DatabaseMessages | None = None, - ) -> MessageSending: - """构建单个发送消息""" + ) -> MessageEnvelope: + """构造单条发送消息的信封""" bot_user_info = UserInfo( user_id=str(global_config.bot.qq_account), @@ -1779,29 +1781,44 @@ class DefaultReplyer: platform=self.chat_stream.platform, ) - # 从 DatabaseMessages 获取 sender_info 并转换为 UserInfo - sender_info = None - if anchor_message and anchor_message.user_info: - db_user_info = anchor_message.user_info - sender_info = UserInfo( - platform=db_user_info.platform, - user_id=db_user_info.user_id, - user_nickname=db_user_info.user_nickname, - user_cardname=db_user_info.user_cardname, - ) + base_segment = {"type": message_segment.type, "data": message_segment.data} + if reply_to and anchor_message and anchor_message.message_id: + segment_payload = { + "type": "seglist", + "data": [ + {"type": "reply", "data": anchor_message.message_id}, + base_segment, + ], + } + else: + segment_payload = base_segment - return MessageSending( - message_id=message_id, # 使用片段的唯一ID - chat_stream=self.chat_stream, - bot_user_info=bot_user_info, - sender_info=sender_info, - message_segment=message_segment, - reply=anchor_message, # 回复原始锚点 - is_head=reply_to, - is_emoji=is_emoji, - thinking_start_time=thinking_start_time, # 传递原始思考开始时间 - display_message=display_message, - ) + timestamp = thinking_start_time or time.time() + message_info = { + "message_id": message_id, + "time": timestamp, + "platform": self.chat_stream.platform, + "user_info": { + "user_id": bot_user_info.user_id, + "user_nickname": bot_user_info.user_nickname, + "platform": bot_user_info.platform, + }, + } + + if self.chat_stream.group_info: + message_info["group_info"] = { + "group_id": self.chat_stream.group_info.group_id, + "group_name": self.chat_stream.group_info.group_name, + "platform": self.chat_stream.group_info.group_platform, + } + + return { + "id": str(uuid.uuid4()), + "direction": "outgoing", + "platform": self.chat_stream.platform, + "message_info": message_info, + "message_segment": segment_payload, + } async def llm_generate_content(self, prompt: str): with Timer("LLM生成", {}): # 内部计时器,可选保留 diff --git a/src/chat/utils/utils.py b/src/chat/utils/utils.py index 30217d79b..4c37690c8 100644 --- a/src/chat/utils/utils.py +++ b/src/chat/utils/utils.py @@ -8,7 +8,6 @@ from typing import Any import numpy as np import rjieba -from mofox_bus import UserInfo # MessageRecv 已被移除,现在使用 DatabaseMessages from src.common.logger import get_logger @@ -16,7 +15,7 @@ from src.common.message_repository import count_messages, find_messages from src.config.config import global_config, model_config from src.llm_models.utils_model import LLMRequest from src.person_info.person_info import PersonInfoManager, get_person_info_manager - +from src.common.data_models.database_data_model import DatabaseUserInfo from .typo_generator import ChineseTypoGenerator logger = get_logger("chat_utils") @@ -154,7 +153,7 @@ async def get_recent_group_speaker(chat_stream_id: str, sender, limit: int = 12) who_chat_in_group = [] for msg_db_data in recent_messages: - user_info = UserInfo.from_dict( + user_info = DatabaseUserInfo.from_dict( { "platform": msg_db_data["user_platform"], "user_id": msg_db_data["user_id"], diff --git a/src/main.py b/src/main.py index 7721e9c44..6dbbc4ae1 100644 --- a/src/main.py +++ b/src/main.py @@ -12,7 +12,7 @@ from typing import Any from rich.traceback import install from src.chat.emoji_system.emoji_manager import get_emoji_manager -from chat.message_receive.message_handler import get_message_handler, shutdown_message_handler +from src.chat.message_receive.message_handler import get_message_handler, shutdown_message_handler from src.chat.utils.statistic import OnlineTimeRecordTask, StatisticOutputTask from src.common.core_sink_manager import ( CoreSinkManager, diff --git a/src/plugin_system/__init__.py b/src/plugin_system/__init__.py index 3a8c92966..aaca76746 100644 --- a/src/plugin_system/__init__.py +++ b/src/plugin_system/__init__.py @@ -40,7 +40,6 @@ from .base import ( ConfigField, EventHandlerInfo, EventType, - MaiMessages, PluginInfo, # 新增的增强命令系统 PlusCommand, @@ -77,8 +76,6 @@ __all__ = [ # noqa: RUF022 "ConfigField", "EventHandlerInfo", "EventType", - # 消息 - "MaiMessages", # 工具函数 "PluginInfo", # 增强命令系统 diff --git a/src/plugin_system/apis/chat_api.py b/src/plugin_system/apis/chat_api.py index 71926384f..265ebc45a 100644 --- a/src/plugin_system/apis/chat_api.py +++ b/src/plugin_system/apis/chat_api.py @@ -336,3 +336,8 @@ def get_stream_info(chat_stream: "ChatStream") -> dict[str, Any]: def get_streams_summary() -> dict[str, int]: """获取聊天流统计摘要的便捷函数""" return ChatManager.get_streams_summary() + +def get_chat_manager(): + """获取聊天管理器实例的便捷函数""" + from src.chat.message_receive.chat_stream import get_chat_manager + return get_chat_manager() diff --git a/src/plugin_system/apis/send_api.py b/src/plugin_system/apis/send_api.py index fb8fac8f8..612893c21 100644 --- a/src/plugin_system/apis/send_api.py +++ b/src/plugin_system/apis/send_api.py @@ -89,16 +89,16 @@ async def file_to_stream( import asyncio import time import traceback +import uuid from typing import TYPE_CHECKING, Any -from mofox_bus import Seg, UserInfo - +from mofox_bus import MessageEnvelope +from src.common.data_models.database_data_model import DatabaseUserInfo if TYPE_CHECKING: from src.common.data_models.database_data_model import DatabaseMessages # 导入依赖 from src.chat.message_receive.chat_stream import ChatStream, get_chat_manager -from src.chat.message_receive.message import MessageSending from src.chat.message_receive.uni_message_sender import HeartFCSender from src.common.logger import get_logger from src.config.config import global_config @@ -183,6 +183,45 @@ async def wait_adapter_response(request_id: str, timeout: float = 30.0) -> dict: return {"status": "error", "message": str(e)} +def _build_message_envelope( + *, + message_id: str, + target_stream: "ChatStream", + bot_user_info: DatabaseUserInfo, + message_segment: dict[str, Any], + timestamp: float, +) -> MessageEnvelope: + """构建发送的 MessageEnvelope 数据结构""" + message_info: dict[str, Any] = { + "message_id": message_id, + "time": timestamp, + "platform": target_stream.platform, + "user_info": { + "user_id": bot_user_info.user_id, + "user_nickname": bot_user_info.user_nickname, + "user_cardname": getattr(bot_user_info, "user_cardname", None), + "platform": bot_user_info.platform, + }, + } + + if target_stream.group_info: + message_info["group_info"] = { + "group_id": target_stream.group_info.group_id, + "group_name": target_stream.group_info.group_name, + "platform": target_stream.group_info.group_platform, + } + + return { + "id": str(uuid.uuid4()), + "direction": "outgoing", + "platform": target_stream.platform, + "message_info": message_info, + "message_segment": message_segment, + } + + + + # ============================================================================= # 内部实现函数(不暴露给外部) # ============================================================================= @@ -200,56 +239,34 @@ async def _send_to_target( storage_message: bool = True, show_log: bool = True, ) -> bool: - """向指定目标发送消息的内部实现 - - Args: - message_type: 消息类型,如"text"、"image"、"emoji"等 - content: 消息内容 - stream_id: 目标流ID - display_message: 显示消息 - typing: 是否模拟打字等待。 - reply_to: 回复消息,格式为"发送者:消息内容" - storage_message: 是否存储消息到数据库 - show_log: 发送是否显示日志 - - Returns: - bool: 是否发送成功 - """ + """向指定目标发送消息的内部实现""" try: if reply_to: - logger.warning("[SendAPI] 在0.10.0, reply_to 参数已弃用,请使用 reply_to_message 参数") + logger.warning("[SendAPI] 自 0.10.0 起 reply_to 已弃用,请使用 reply_to_message") if show_log: - logger.debug(f"[SendAPI] 发送{message_type}消息到 {stream_id}") + logger.debug(f"[SendAPI] 发送 {message_type} 消息到 {stream_id}") - # 查找目标聊天流 target_stream = await get_chat_manager().get_stream(stream_id) if not target_stream: logger.error(f"[SendAPI] 未找到聊天流: {stream_id}") return False - # 创建发送器 heart_fc_sender = HeartFCSender() - - # 生成消息ID current_time = time.time() message_id = f"send_api_{int(current_time * 1000)}" - # 构建机器人用户信息 - bot_user_info = UserInfo( + bot_user_info = DatabaseUserInfo( user_id=str(global_config.bot.qq_account), user_nickname=global_config.bot.nickname, platform=target_stream.platform, ) - # 创建消息段 - message_segment = Seg(type=message_type, data=content) # type: ignore - - # 处理回复消息 + anchor_message = None + reply_to_platform_id = None if reply_to: - # 优先使用 reply_to 字符串构建 anchor_message - # 解析 "发送者(ID)" 格式 import re + match = re.match(r"(.+)\((\d+)\)", reply_to) if match: sender_name, sender_id = match.groups() @@ -257,69 +274,64 @@ async def _send_to_target( "user_nickname": sender_name, "user_id": sender_id, "chat_info_platform": target_stream.platform, - "message_id": "temp_reply_id", # 临时ID - "time": time.time() + "message_id": "temp_reply_id", + "time": time.time(), } anchor_message = message_dict_to_db_message(message_dict=temp_message_dict) - else: - anchor_message = None - reply_to_platform_id = f"{target_stream.platform}:{sender_id}" if anchor_message else None - + if anchor_message: + reply_to_platform_id = f"{target_stream.platform}:{sender_id}" elif reply_to_message: anchor_message = message_dict_to_db_message(message_dict=reply_to_message) if anchor_message: - # DatabaseMessages 不需要 update_chat_stream,它是纯数据对象 - reply_to_platform_id = ( - f"{anchor_message.chat_info.platform}:{anchor_message.user_info.user_id}" - ) - else: - reply_to_platform_id = None - else: - anchor_message = None - reply_to_platform_id = None + reply_to_platform_id = f"{anchor_message.chat_info.platform}:{anchor_message.user_info.user_id}" - # 构建发送消息对象 - bot_message = MessageSending( + base_segment: dict[str, Any] = {"type": message_type, "data": content} + message_segment: dict[str, Any] + + if set_reply and anchor_message and anchor_message.message_id: + message_segment = { + "type": "seglist", + "data": [ + {"type": "reply", "data": anchor_message.message_id}, + base_segment, + ], + } + else: + message_segment = base_segment + + if reply_to_platform_id: + message_segment["reply_to"] = reply_to_platform_id + + envelope = _build_message_envelope( message_id=message_id, - chat_stream=target_stream, + target_stream=target_stream, bot_user_info=bot_user_info, - sender_info=target_stream.user_info, message_segment=message_segment, - display_message=display_message, - reply=anchor_message, - is_head=True, - is_emoji=(message_type == "emoji"), - thinking_start_time=current_time, - reply_to=reply_to_platform_id, + timestamp=current_time, ) - # 发送消息 sent_msg = await heart_fc_sender.send_message( - bot_message, + envelope, + chat_stream=target_stream, typing=typing, - set_reply=set_reply, storage_message=storage_message, show_log=show_log, + thinking_start_time=current_time, + display_message=display_message or (content if isinstance(content, str) else ""), ) if sent_msg: logger.debug(f"[SendAPI] 成功发送消息到 {stream_id}") return True - else: - logger.error("[SendAPI] 发送消息失败") - return False + + logger.error("[SendAPI] 发送消息失败") + return False except Exception as e: logger.error(f"[SendAPI] 发送消息时出错: {e}") traceback.print_exc() return False - -# ============================================================================= -# 公共API函数 - 预定义类型的发送函数 -# ============================================================================= - - async def text_to_stream( text: str, stream_id: str, @@ -460,53 +472,27 @@ async def adapter_command_to_stream( timeout: float = 30.0, storage_message: bool = False, ) -> dict: - """向适配器发送命令并获取返回值 - - 雅诺狐的耳朵特别软 - - Args: - action (str): 适配器命令动作,如"get_group_list"、"get_friend_list"等 - params (dict): 命令参数字典,包含命令所需的参数 - platform (Optional[str]): 目标平台标识,可选,用于多平台支持 - stream_id (Optional[str]): 聊天流ID,可选,如果不提供则自动生成临时ID - timeout (float): 超时时间(秒),默认30.0秒 - storage_message (bool): 是否存储消息到数据库,默认False - - Returns: - dict: 适配器返回的响应,包含以下可能的状态: - - 成功: {"status": "ok", "data": {...}, "message": "..."} - - 失败: {"status": "failed", "message": "错误信息"} - - 错误: {"status": "error", "message": "错误信息"} - - Raises: - ValueError: 当stream_id和platform都未提供时抛出 - """ + """向适配器发送命令并获取返回值""" if not stream_id and not platform: - raise ValueError("必须提供stream_id或platform参数") + raise ValueError("必须提供stream_id或platform") try: - logger.debug(f"[SendAPI] 向适配器发送命令: {action}") + logger.debug(f"[SendAPI] 准备发送适配器命令: {action}") - # 如果没有提供stream_id,则生成一个临时的 if stream_id is None: - import uuid - stream_id = f"adapter_temp_{uuid.uuid4().hex[:8]}" logger.debug(f"[SendAPI] 自动生成临时stream_id: {stream_id}") - # 查找目标聊天流 target_stream = await get_chat_manager().get_stream(stream_id) if not target_stream: - # 如果是自动生成的stream_id且找不到聊天流,创建一个临时的虚拟流 if stream_id.startswith("adapter_temp_"): - logger.debug(f"[SendAPI] 创建临时虚拟聊天流: {stream_id}") + logger.debug(f"[SendAPI] 创建临时聊天流: {stream_id}") - # 创建临时的用户信息和聊天流 if not platform: logger.error("[SendAPI] 创建临时聊天流失败: platform 未提供") return {"status": "error", "message": "platform 未提供"} - temp_user_info = UserInfo(user_id="system", user_nickname="System", platform=platform) + temp_user_info = DatabaseUserInfo(user_id="system", user_nickname="System", platform=platform) temp_chat_stream = ChatStream( stream_id=stream_id, platform=platform, user_info=temp_user_info, group_info=None @@ -517,21 +503,17 @@ async def adapter_command_to_stream( logger.error(f"[SendAPI] 未找到聊天流: {stream_id}") return {"status": "error", "message": f"未找到聊天流: {stream_id}"} - # 创建发送器 heart_fc_sender = HeartFCSender() - # 生成消息ID current_time = time.time() message_id = f"adapter_cmd_{int(current_time * 1000)}" - # 构建机器人用户信息 - bot_user_info = UserInfo( + bot_user_info = DatabaseUserInfo( user_id=str(global_config.bot.qq_account), user_nickname=global_config.bot.nickname, platform=target_stream.platform, ) - # 构建适配器命令数据 adapter_command_data = { "action": action, "params": params, @@ -539,27 +521,24 @@ async def adapter_command_to_stream( "request_id": message_id, } - # 创建消息段 - message_segment = Seg(type="adapter_command", data=adapter_command_data) # type: ignore + message_segment = {"type": "adapter_command", "data": adapter_command_data} - # 构建发送消息对象 - bot_message = MessageSending( + envelope = _build_message_envelope( message_id=message_id, - chat_stream=target_stream, + target_stream=target_stream, bot_user_info=bot_user_info, - sender_info=target_stream.user_info, message_segment=message_segment, - display_message=f"适配器命令: {action}", - reply=None, - is_head=True, - is_emoji=False, - thinking_start_time=current_time, - reply_to=None, + timestamp=current_time, ) - # 发送消息 sent_msg = await heart_fc_sender.send_message( - bot_message, typing=False, set_reply=False, storage_message=storage_message + envelope, + chat_stream=target_stream, + typing=False, + storage_message=storage_message, + show_log=True, + thinking_start_time=current_time, + display_message=f"发送适配器命令: {action}", ) if not sent_msg: @@ -568,7 +547,6 @@ async def adapter_command_to_stream( logger.debug("[SendAPI] 已发送适配器命令,等待响应...") - # 等待适配器响应 response = await wait_adapter_response(message_id, timeout) logger.debug(f"[SendAPI] 收到适配器响应: {response}") @@ -579,3 +557,4 @@ async def adapter_command_to_stream( logger.error(f"[SendAPI] 发送适配器命令时出错: {e}") traceback.print_exc() return {"status": "error", "message": f"发送适配器命令时出错: {e!s}"} + diff --git a/src/plugin_system/base/__init__.py b/src/plugin_system/base/__init__.py index 56ba1f7e1..487701149 100644 --- a/src/plugin_system/base/__init__.py +++ b/src/plugin_system/base/__init__.py @@ -24,7 +24,6 @@ from .component_types import ( ComponentType, EventHandlerInfo, EventType, - MaiMessages, PluginInfo, PlusCommandInfo, PythonDependency, @@ -55,7 +54,6 @@ __all__ = [ "ConfigField", "EventHandlerInfo", "EventType", - "MaiMessages", "PluginInfo", "PluginMetadata", # 增强命令系统 diff --git a/src/plugin_system/base/component_types.py b/src/plugin_system/base/component_types.py index 47970d1cf..becbe1b63 100644 --- a/src/plugin_system/base/component_types.py +++ b/src/plugin_system/base/component_types.py @@ -33,9 +33,6 @@ class InjectionRule: ] and self.target_content is None: raise ValueError(f"'{self.injection_type.value}'类型的注入规则必须提供 'target_content'。") - -from mofox_bus import Seg - from src.llm_models.payload_content.tool_option import ToolCall as ToolCall from src.llm_models.payload_content.tool_option import ToolParamType as ToolParamType @@ -410,56 +407,6 @@ class PluginInfo: return requirements -@dataclass -class MaiMessages: - """MaiM插件消息""" - - message_segments: list[Seg] = field(default_factory=list) - """消息段列表,支持多段消息""" - - message_base_info: dict[str, Any] = field(default_factory=dict) - """消息基本信息,包含平台,用户信息等数据""" - - plain_text: str = "" - """纯文本消息内容""" - - raw_message: str | None = None - """原始消息内容""" - - is_group_message: bool = False - """是否为群组消息""" - - is_private_message: bool = False - """是否为私聊消息""" - - stream_id: str | None = None - """流ID,用于标识消息流""" - - llm_prompt: str | None = None - """LLM提示词""" - - llm_response_content: str | None = None - """LLM响应内容""" - - llm_response_reasoning: str | None = None - """LLM响应推理内容""" - - llm_response_model: str | None = None - """LLM响应模型名称""" - - llm_response_tool_call: list[ToolCall] | None = None - """LLM使用的工具调用""" - - action_usage: list[str] | None = None - """使用的Action""" - - additional_data: dict[Any, Any] = field(default_factory=dict) - """附加数据,可以存储额外信息""" - - def __post_init__(self): - if self.message_segments is None: - self.message_segments = [] - @dataclass class RouterInfo(ComponentInfo): """路由组件信息""" diff --git a/src/plugins/built_in/NEW_napcat_adapter/__init__.py b/src/plugins/built_in/NEW_napcat_adapter/__init__.py index d7f61b2f2..9de6c1088 100644 --- a/src/plugins/built_in/NEW_napcat_adapter/__init__.py +++ b/src/plugins/built_in/NEW_napcat_adapter/__init__.py @@ -1,7 +1,7 @@ from src.plugin_system.base.plugin_metadata import PluginMetadata __plugin_meta__ = PluginMetadata( - name="napcat_plugin", + name="napcat_adapter_plugin", description="基于OneBot 11协议的NapCat QQ协议插件,提供完整的QQ机器人API接口,使用现有adapter连接", usage="该插件提供 `napcat_tool` tool。", version="1.0.0", diff --git a/src/plugins/built_in/NEW_napcat_adapter/plugin.py b/src/plugins/built_in/NEW_napcat_adapter/plugin.py index 186bd6341..8a539bf69 100644 --- a/src/plugins/built_in/NEW_napcat_adapter/plugin.py +++ b/src/plugins/built_in/NEW_napcat_adapter/plugin.py @@ -129,8 +129,6 @@ class NapcatAdapter(BaseAdapter): future = self._response_pool[echo] if not future.done(): future.set_result(raw) - # API 响应不需要转换为 MessageEnvelope,返回空信封 - return self._create_empty_envelope() # 消息事件 if post_type == "message": @@ -147,7 +145,6 @@ class NapcatAdapter(BaseAdapter): # 未知事件类型 else: logger.warning(f"未知的事件类型: {post_type}") - return self._create_empty_envelope() # type: ignore[return-value] async def _send_platform_message(self, envelope: MessageEnvelope) -> None: # type: ignore[override] """ diff --git a/src/plugins/built_in/maizone_refactored/services/content_service.py b/src/plugins/built_in/maizone_refactored/services/content_service.py index bab11db14..a0fed0917 100644 --- a/src/plugins/built_in/maizone_refactored/services/content_service.py +++ b/src/plugins/built_in/maizone_refactored/services/content_service.py @@ -10,12 +10,12 @@ from collections.abc import Callable import aiohttp import filetype -from mofox_bus import UserInfo from src.chat.message_receive.chat_stream import get_chat_manager from src.common.logger import get_logger from src.llm_models.utils_model import LLMRequest from src.plugin_system.apis import config_api, generator_api, llm_api +from src.common.data_models.database_data_model import DatabaseUserInfo # 导入旧的工具函数,我们稍后会考虑是否也需要重构它 from ..utils.history_utils import get_send_history @@ -123,7 +123,7 @@ class ContentService: bot_qq = str(config_api.get_global_config("bot.qq_account")) bot_nickname = config_api.get_global_config("bot.nickname") - bot_user_info = UserInfo(platform=bot_platform, user_id=bot_qq, user_nickname=bot_nickname) + bot_user_info = DatabaseUserInfo(platform=bot_platform, user_id=bot_qq, user_nickname=bot_nickname) chat_stream = await chat_manager.get_or_create_stream(platform=bot_platform, user_info=bot_user_info) @@ -184,7 +184,7 @@ class ContentService: bot_qq = str(config_api.get_global_config("bot.qq_account")) bot_nickname = config_api.get_global_config("bot.nickname") - bot_user_info = UserInfo(platform=bot_platform, user_id=bot_qq, user_nickname=bot_nickname) + bot_user_info = DatabaseUserInfo(platform=bot_platform, user_id=bot_qq, user_nickname=bot_nickname) chat_stream = await chat_manager.get_or_create_stream(platform=bot_platform, user_info=bot_user_info) diff --git a/src/plugins/built_in/napcat_adapter_plugin/plugin.py b/src/plugins/built_in/napcat_adapter_plugin/plugin.py index 921f4619a..2437f0162 100644 --- a/src/plugins/built_in/napcat_adapter_plugin/plugin.py +++ b/src/plugins/built_in/napcat_adapter_plugin/plugin.py @@ -231,7 +231,7 @@ class NapcatAdapterPlugin(BasePlugin): dependencies: ClassVar[List[str]] = [] # 插件依赖列表 python_dependencies: ClassVar[List[str]] = [] # Python包依赖列表 config_file_name: str = "config.toml" # 配置文件名 - + @property def enable_plugin(self) -> bool: """通过配置文件动态控制插件启用状态""" diff --git a/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_handler.py b/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_handler.py index 7824aa3a0..b7e7b2c25 100644 --- a/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_handler.py +++ b/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_handler.py @@ -6,7 +6,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import websockets as Server -from mofox_bus import ( +from maim_message import ( BaseMessageInfo, FormatInfo, GroupInfo, diff --git a/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_sending.py b/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_sending.py index 02b1a0a12..b64db620e 100644 --- a/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_sending.py +++ b/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/message_sending.py @@ -1,6 +1,6 @@ import asyncio -from mofox_bus import MessageBase, Router +from maim_message import MessageBase, Router from src.common.logger import get_logger from src.plugin_system.apis import config_api diff --git a/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/notice_handler.py b/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/notice_handler.py index 7e64556aa..1f6bf104e 100644 --- a/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/notice_handler.py +++ b/src/plugins/built_in/napcat_adapter_plugin/src/recv_handler/notice_handler.py @@ -4,7 +4,7 @@ import time from typing import ClassVar, Optional, Tuple import websockets as Server -from mofox_bus import BaseMessageInfo, FormatInfo, GroupInfo, MessageBase, Seg, UserInfo +from maim_message import BaseMessageInfo, FormatInfo, GroupInfo, MessageBase, Seg, UserInfo from src.common.logger import get_logger from src.plugin_system.apis import config_api