From 2c8343b23a411227a0c763027567aa292e357a09 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Fri, 25 Apr 2025 23:45:59 +0800 Subject: [PATCH] =?UTF-8?q?better=EF=BC=9A=E6=9B=B4=E7=AE=80=E6=B4=81?= =?UTF-8?q?=E7=9A=84=E5=8F=91=E9=80=81=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/heartFC_chat/heartFC_chat.py | 175 +++++++++++---------- src/plugins/heartFC_chat/heartFC_sender.py | 161 +++++++++++++++++++ 2 files changed, 256 insertions(+), 80 deletions(-) create mode 100644 src/plugins/heartFC_chat/heartFC_sender.py diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index ba6be7eb1..b9c6209ca 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -16,7 +16,6 @@ from src.plugins.chat.utils_image import image_path_to_base64 # Local import ne from src.plugins.utils.timer_calculator import Timer # <--- Import Timer from src.plugins.heartFC_chat.heartFC_generator import HeartFCGenerator from src.do_tool.tool_use import ToolUser -from ..chat.message_sender import message_manager # <-- Import the global manager from src.plugins.emoji_system.emoji_manager import emoji_manager from src.plugins.utils.json_utils import process_llm_tool_response # 导入新的JSON工具 from src.heart_flow.sub_mind import SubMind @@ -25,6 +24,7 @@ from src.plugins.heartFC_chat.heartflow_prompt_builder import global_prompt_mana import contextlib from src.plugins.utils.chat_message_builder import num_new_messages_since from src.plugins.heartFC_chat.heartFC_Cycleinfo import CycleInfo +from .heartFC_sender import HeartFCSender # --- End import --- @@ -181,6 +181,7 @@ class HeartFChatting: # 依赖注入存储 self.gpt_instance = HeartFCGenerator() # 文本回复生成器 self.tool_user = ToolUser() # 工具使用实例 + self.heart_fc_sender = HeartFCSender() # LLM规划器配置 self.planner_llm = LLMRequest( @@ -301,11 +302,6 @@ class HeartFChatting: # 防止循环过快消耗资源 await self._handle_cycle_delay(action_taken, loop_cycle_start_time, self.log_prefix) - # 等待直到所有消息都发送完成 - with Timer("发送消息", cycle_timers): - while await self._should_skip_cycle(thinking_id): - await asyncio.sleep(0.2) - # 完成当前循环并保存历史 self._current_cycle.complete_cycle() self._cycle_history.append(self._current_cycle) @@ -593,10 +589,6 @@ class HeartFChatting: await asyncio.sleep(1.5) - async def _should_skip_cycle(self, thinking_id: str) -> bool: - """检查是否应该跳过当前循环周期""" - return message_manager.check_if_sending_message_exist(self.stream_id, thinking_id) - async def _log_cycle_timers(self, cycle_timers: dict, log_prefix: str): """记录循环周期的计时器结果""" if cycle_timers: @@ -806,26 +798,40 @@ class HeartFChatting: send_emoji: str, # Emoji query decided by planner or tools ): """ - 发送器 (Sender): 使用本类的方法发送生成的回复。 + 发送器 (Sender): 使用 HeartFCSender 实例发送生成的回复。 处理相关的操作,如发送表情和更新关系。 """ - logger.info(f"{self.log_prefix}开始发送回复") + logger.info(f"{self.log_prefix}开始发送回复 (使用 HeartFCSender)") first_bot_msg: Optional[MessageSending] = None - # 尝试发送回复消息 - first_bot_msg = await self._send_response_messages(anchor_message, response_set, thinking_id) - if first_bot_msg: - # --- 处理关联表情(如果指定) --- # - if send_emoji: - logger.info(f"{self.log_prefix}正在发送关联表情: '{send_emoji}'") - # 优先使用first_bot_msg作为锚点,否则回退到原始锚点 - emoji_anchor = first_bot_msg if first_bot_msg else anchor_message - await self._handle_emoji(emoji_anchor, response_set, send_emoji) + try: + # _send_response_messages 现在将使用 self.sender 内部处理注册和发送 + # 它需要负责创建 MessageThinking 和 MessageSending 对象 + # 并调用 self.sender.register_thinking 和 self.sender.type_and_send_message + first_bot_msg = await self._send_response_messages( + anchor_message=anchor_message, + response_set=response_set, + thinking_id=thinking_id + ) - else: - # logger.warning(f"{self.log_prefix}[Sender-{thinking_id}] 发送回复失败(_send_response_messages返回None)。思考消息{thinking_id}可能已被移除。") - # 无需清理,因为_send_response_messages返回None意味着已处理/已删除 - raise RuntimeError("发送回复失败,_send_response_messages返回None") + if first_bot_msg: + # --- 处理关联表情(如果指定) --- # + if send_emoji: + logger.info(f"{self.log_prefix}正在发送关联表情: '{send_emoji}'") + # 优先使用 first_bot_msg 作为锚点,否则回退到原始锚点 + emoji_anchor = first_bot_msg + await self._handle_emoji(emoji_anchor, response_set, send_emoji) + else: + # 如果 _send_response_messages 返回 None,表示在发送前就失败或没有消息可发送 + logger.warning(f"{self.log_prefix}[Sender-{thinking_id}] 未能发送任何回复消息 (_send_response_messages 返回 None)。") + # 这里可能不需要抛出异常,取决于 _send_response_messages 的具体实现 + + except Exception as e: + # 异常现在由 type_and_send_message 内部处理日志,这里只记录发送流程失败 + logger.error(f"{self.log_prefix}[Sender-{thinking_id}] 发送回复过程中遇到错误: {e}") + # 思考状态应已在 type_and_send_message 的 finally 块中清理 + # 可以选择重新抛出或根据业务逻辑处理 + # raise RuntimeError(f"发送回复失败: {e}") from e async def shutdown(self): """优雅关闭HeartFChatting实例,取消活动循环任务""" @@ -959,99 +965,103 @@ class HeartFChatting: thinking_start_time=thinking_time_point, ) # Access MessageManager directly - await message_manager.add_message(thinking_message) + await self.heart_fc_sender.register_thinking(thinking_message) return thinking_id async def _send_response_messages( self, anchor_message: Optional[MessageRecv], response_set: List[str], thinking_id: str ) -> Optional[MessageSending]: - """发送回复消息 (尝试锚定到 anchor_message)""" + """发送回复消息 (尝试锚定到 anchor_message),使用 HeartFCSender""" if not anchor_message or not anchor_message.chat_stream: logger.error(f"{self.log_prefix} 无法发送回复,缺少有效的锚点消息或聊天流。") return None - # 记录锚点消息ID - if self._current_cycle and anchor_message: - self._current_cycle.set_response_info( - response_text=response_set, anchor_message_id=anchor_message.message_info.message_id - ) - chat = anchor_message.chat_stream - container = await message_manager.get_container(chat.stream_id) - thinking_message = None + chat_id = chat.stream_id + stream_name = chat_manager.get_stream_name(chat_id) or chat_id # 获取流名称用于日志 - # 移除思考消息 - for msg in container.messages[:]: # Iterate over a copy - if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: - thinking_message = msg - container.messages.remove(msg) # Remove the message directly here - # logger.debug(f"{self.log_prefix} Removed thinking message {thinking_id} via iteration.") - break + # 检查思考过程是否仍在进行,并获取开始时间 + thinking_start_time = await self.heart_fc_sender.get_thinking_start_time(chat_id, thinking_id) - if not thinking_message: - stream_name = chat_manager.get_stream_name(chat.stream_id) or chat.stream_id # 获取流名称 - logger.warning(f"[{stream_name}] {thinking_id},思考太久了,超时被移除") + if thinking_start_time is None: + logger.warning(f"[{stream_name}] {thinking_id} 思考过程未找到或已结束,无法发送回复。") return None - thinking_start_time = thinking_message.thinking_start_time - message_set = MessageSet(chat, thinking_id) + # 记录锚点消息ID和回复文本(在发送前记录) + self._current_cycle.set_response_info( + response_text=response_set, anchor_message_id=anchor_message.message_info.message_id + ) + mark_head = False - first_bot_msg = None - reply_message_ids = [] # 用于记录所有回复消息的ID + first_bot_msg: Optional[MessageSending] = None + reply_message_ids = [] # 记录实际发送的消息ID bot_user_info = UserInfo( user_id=global_config.BOT_QQ, user_nickname=global_config.BOT_NICKNAME, platform=anchor_message.message_info.platform, ) - for msg_text in response_set: + + for i, msg_text in enumerate(response_set): + # 为每个消息片段生成唯一ID + part_message_id = f"{thinking_id}_{i}" message_segment = Seg(type="text", data=msg_text) bot_message = MessageSending( - message_id=thinking_id, # 使用 thinking_id 作为批次标识 + message_id=part_message_id, # 使用片段的唯一ID chat_stream=chat, bot_user_info=bot_user_info, - sender_info=anchor_message.message_info.user_info, # 发送给锚点消息的用户 + sender_info=anchor_message.message_info.user_info, message_segment=message_segment, - reply=anchor_message, # 回复锚点消息 + reply=anchor_message, # 回复原始锚点 is_head=not mark_head, is_emoji=False, - thinking_start_time=thinking_start_time, + thinking_start_time=thinking_start_time, # 传递原始思考开始时间 ) - if not mark_head: - mark_head = True - first_bot_msg = bot_message - message_set.add_message(bot_message) - reply_message_ids.append(bot_message.message_info.message_id) + try: - # 记录回复消息ID列表 - if self._current_cycle: - self._current_cycle.set_response_info(reply_message_ids=reply_message_ids) + if not mark_head: + mark_head = True + first_bot_msg = bot_message # 保存第一个成功发送的消息对象 + await self.heart_fc_sender.type_and_send_message(bot_message, type = False) + else: + await self.heart_fc_sender.type_and_send_message(bot_message, type = True) - # Access MessageManager directly - await message_manager.add_message(message_set) - return first_bot_msg + reply_message_ids.append(part_message_id) # 记录我们生成的ID + + except Exception as e: + logger.error(f"{self.log_prefix}[Sender-{thinking_id}] 发送回复片段 {i} ({part_message_id}) 时失败: {e}") + # 这里可以选择是继续发送下一个片段还是中止 + + # 在尝试发送完所有片段后,完成原始的 thinking_id 状态 + try: + await self.heart_fc_sender.complete_thinking(chat_id, thinking_id) + except Exception as e: + logger.error(f"{self.log_prefix}[Sender-{thinking_id}] 完成思考状态 {thinking_id} 时出错: {e}") + + self._current_cycle.set_response_info( + response_text=response_set, # 保留原始文本 + anchor_message_id=anchor_message.message_info.message_id, # 保留锚点ID + reply_message_ids=reply_message_ids # 添加实际发送的ID列表 + ) + + + return first_bot_msg # 返回第一个成功发送的消息对象 async def _handle_emoji(self, anchor_message: Optional[MessageRecv], response_set: List[str], send_emoji: str = ""): - """处理表情包 (尝试锚定到 anchor_message)""" + """处理表情包 (尝试锚定到 anchor_message),使用 HeartFCSender""" if not anchor_message or not anchor_message.chat_stream: logger.error(f"{self.log_prefix} 无法处理表情包,缺少有效的锚点消息或聊天流。") return chat = anchor_message.chat_stream - if send_emoji: - emoji_raw = await emoji_manager.get_emoji_for_text(send_emoji) - else: - emoji_text_source = "".join(response_set) if response_set else "" - emoji_raw = await emoji_manager.get_emoji_for_text(emoji_text_source) + emoji_raw = await emoji_manager.get_emoji_for_text(send_emoji) if emoji_raw: emoji_path, description = emoji_raw - # 记录表情信息 - if self._current_cycle: - self._current_cycle.set_response_info(emoji_info=f"表情: {description}, 路径: {emoji_path}") + emoji_cq = image_path_to_base64(emoji_path) - thinking_time_point = round(time.time(), 2) + thinking_time_point = round(time.time(), 2) # 用于唯一ID message_segment = Seg(type="emoji", data=emoji_cq) bot_user_info = UserInfo( user_id=global_config.BOT_QQ, @@ -1059,17 +1069,22 @@ class HeartFChatting: platform=anchor_message.message_info.platform, ) bot_message = MessageSending( - message_id="me" + str(thinking_time_point), # 使用不同的 ID 前缀? + message_id="me" + str(thinking_time_point), # 表情消息的唯一ID chat_stream=chat, bot_user_info=bot_user_info, sender_info=anchor_message.message_info.user_info, message_segment=message_segment, - reply=anchor_message, # 回复锚点消息 - is_head=False, + reply=anchor_message, # 回复原始锚点 + is_head=False, # 表情通常不是头部消息 is_emoji=True, + # 不需要 thinking_start_time ) - # Access MessageManager directly - await message_manager.add_message(bot_message) + + try: + await self.heart_fc_sender.send_and_store(bot_message) + except Exception as e: + logger.error(f"{self.log_prefix} 发送表情包 {bot_message.message_info.message_id} 时失败: {e}") + def get_cycle_history(self, last_n: Optional[int] = None) -> List[Dict[str, Any]]: """获取循环历史记录 diff --git a/src/plugins/heartFC_chat/heartFC_sender.py b/src/plugins/heartFC_chat/heartFC_sender.py new file mode 100644 index 000000000..000496dd9 --- /dev/null +++ b/src/plugins/heartFC_chat/heartFC_sender.py @@ -0,0 +1,161 @@ +# src/plugins/heartFC_chat/heartFC_sender.py +import asyncio # 重新导入 asyncio +import time +from typing import Dict, List, Optional, Union # 重新导入类型 + +from src.common.logger import get_module_logger +from ..message.api import global_api +from ..chat.message import MessageSending, MessageThinking # 只保留 MessageSending 和 MessageThinking +from ..storage.storage import MessageStorage +from ..chat.utils import truncate_message +from src.common.logger import LogConfig, SENDER_STYLE_CONFIG +from src.plugins.chat.utils import calculate_typing_time + +# 定义日志配置 +sender_config = LogConfig( + # 使用消息发送专用样式 + console_format=SENDER_STYLE_CONFIG["console_format"], + file_format=SENDER_STYLE_CONFIG["file_format"], +) + +logger = get_module_logger("msg_sender", config=sender_config) + + +class HeartFCSender: + """管理消息的注册、即时处理、发送和存储,并跟踪思考状态。""" + + def __init__(self): + self.storage = MessageStorage() + # 用于存储活跃的思考消息 + self.thinking_messages: Dict[str, Dict[str, MessageThinking]] = {} + self._thinking_lock = asyncio.Lock() # 保护 thinking_messages 的锁 + + async def send_message(self, message: MessageSending) -> None: + """合并后的消息发送函数,包含WS发送和日志记录""" + message_preview = truncate_message(message.processed_plain_text) + + try: + # 直接调用API发送消息 + await global_api.send_message(message) + logger.success(f"发送消息 '{message_preview}' 成功") + + except Exception as e: + logger.error(f"发送消息 '{message_preview}' 失败: {str(e)}") + if not message.message_info.platform: + raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e + raise e # 重新抛出其他异常 + + async def register_thinking(self, thinking_message: MessageThinking): + """注册一个思考中的消息。""" + if not thinking_message.chat_stream or not thinking_message.message_info.message_id: + logger.error("无法注册缺少 chat_stream 或 message_id 的思考消息") + return + + chat_id = thinking_message.chat_stream.stream_id + message_id = thinking_message.message_info.message_id + + async with self._thinking_lock: + if chat_id not in self.thinking_messages: + self.thinking_messages[chat_id] = {} + if message_id in self.thinking_messages[chat_id]: + logger.warning(f"[{chat_id}] 尝试注册已存在的思考消息 ID: {message_id}") + self.thinking_messages[chat_id][message_id] = thinking_message + logger.debug(f"[{chat_id}] Registered thinking message: {message_id}") + + async def complete_thinking(self, chat_id: str, message_id: str): + """完成并移除一个思考中的消息记录。""" + async with self._thinking_lock: + if chat_id in self.thinking_messages and message_id in self.thinking_messages[chat_id]: + del self.thinking_messages[chat_id][message_id] + logger.debug(f"[{chat_id}] Completed thinking message: {message_id}") + if not self.thinking_messages[chat_id]: + del self.thinking_messages[chat_id] + logger.debug(f"[{chat_id}] Removed empty thinking message container.") + + def is_thinking(self, chat_id: str, message_id: str) -> bool: + """检查指定的消息 ID 是否当前正处于思考状态。""" + return chat_id in self.thinking_messages and message_id in self.thinking_messages[chat_id] + + async def get_thinking_start_time(self, chat_id: str, message_id: str) -> Optional[float]: + """获取已注册思考消息的开始时间。""" + async with self._thinking_lock: + thinking_message = self.thinking_messages.get(chat_id, {}).get(message_id) + return thinking_message.thinking_start_time if thinking_message else None + + async def type_and_send_message(self, message: MessageSending, type = False): + """ + 立即处理、发送并存储单个 MessageSending 消息。 + 调用此方法前,应先调用 register_thinking 注册对应的思考消息。 + 此方法执行后会调用 complete_thinking 清理思考状态。 + """ + if not message.chat_stream: + logger.error("消息缺少 chat_stream,无法发送") + return + if not message.message_info or not message.message_info.message_id: + logger.error("消息缺少 message_info 或 message_id,无法发送") + return + + chat_id = message.chat_stream.stream_id + message_id = message.message_info.message_id + + try: + _ = message.update_thinking_time() + + # --- 条件应用 set_reply 逻辑 --- + if ( + message.apply_set_reply_logic + and message.is_head + and not message.is_private_message() + ): + logger.debug(f"[{chat_id}] 应用 set_reply 逻辑: {message.processed_plain_text[:20]}...") + message.set_reply() + # --- 结束条件 set_reply --- + + await message.process() + + if type: + typing_time = calculate_typing_time( + input_string=message.processed_plain_text, + thinking_start_time=message.thinking_start_time, + is_emoji=message.is_emoji, + ) + await asyncio.sleep(typing_time) + + + await self.send_message(message) + await self.storage.store_message(message, message.chat_stream) + + except Exception as e: + logger.error( + f"[{chat_id}] 处理或存储消息 {message_id} 时出错: {e}" + ) + raise e + finally: + await self.complete_thinking(chat_id, message_id) + + async def send_and_store(self, message: MessageSending): + """处理、发送并存储单个消息,不涉及思考状态管理。""" + if not message.chat_stream: + logger.error(f"[{message.message_info.platform or 'UnknownPlatform'}] 消息缺少 chat_stream,无法发送") + return + if not message.message_info or not message.message_info.message_id: + logger.error(f"[{message.chat_stream.stream_id if message.chat_stream else 'UnknownStream'}] 消息缺少 message_info 或 message_id,无法发送") + return + + chat_id = message.chat_stream.stream_id + message_id = message.message_info.message_id # 获取消息ID用于日志 + + try: + await message.process() + + await asyncio.sleep(0.5) + + await self.send_message(message) # 使用现有的发送方法 + await self.storage.store_message(message, message.chat_stream) # 使用现有的存储方法 + + except Exception as e: + logger.error( + f"[{chat_id}] 处理或存储消息 {message_id} 时出错: {e}" + ) + # 重新抛出异常,让调用者知道失败了 + raise e