From 569d584732f7099b5384780a0991dcc7e1b5e375 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sun, 20 Apr 2025 16:23:26 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E9=BA=A6=E9=BA=A6=E8=BF=98=E6=B2=A1?= =?UTF-8?q?=E7=AD=89=E5=88=B0=E8=87=AA=E5=B7=B1=E6=B6=88=E6=81=AF=E5=8F=91?= =?UTF-8?q?=E5=87=BA=E5=8E=BB=E5=B0=B1=E5=BC=80=E5=A7=8B=E4=B8=8B=E4=B8=80?= =?UTF-8?q?=E6=AC=A1=E6=80=9D=E8=80=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chat_module/heartFC_chat/messagesender.py | 39 +++-- .../chat_module/heartFC_chat/pf_chatting.py | 135 ++++++------------ 2 files changed, 62 insertions(+), 112 deletions(-) diff --git a/src/plugins/chat_module/heartFC_chat/messagesender.py b/src/plugins/chat_module/heartFC_chat/messagesender.py index 1ba624c65..6fab730fa 100644 --- a/src/plugins/chat_module/heartFC_chat/messagesender.py +++ b/src/plugins/chat_module/heartFC_chat/messagesender.py @@ -55,23 +55,12 @@ class MessageSender: ) -> None: """发送消息""" - typing_time = calculate_typing_time( - input_string=message.processed_plain_text, - thinking_start_time=message.thinking_start_time, - is_emoji=message.is_emoji, - ) - logger.trace(f"{message.processed_plain_text},{typing_time},计算输入时间结束") - await asyncio.sleep(typing_time) - logger.trace(f"{message.processed_plain_text},{typing_time},等待输入时间结束") - message_json = message.to_dict() message_preview = truncate_message(message.processed_plain_text) try: end_point = global_config.api_urls.get(message.message_info.platform, None) if end_point: - # logger.info(f"发送消息到{end_point}") - # logger.info(message_json) try: await global_api.send_message_rest(end_point, message_json) except Exception as e: @@ -173,6 +162,11 @@ class MessageManager: container = self.get_container(chat_stream.stream_id) container.add_message(message) + def check_if_sending_message_exist(self,chat_id,thinking_id): + container = self.get_container(chat_id) + if container.has_messages(): + + async def process_chat_messages(self, chat_id: str): """处理聊天流消息""" container = self.get_container(chat_id) @@ -205,21 +199,22 @@ class MessageManager: start_time=thinking_start_time, end_time=now_time, stream_id=message_earliest.chat_stream.stream_id ) - # 暂时禁用,因为没有anchor_message - # if ( - # message_earliest.is_head - # and (thinking_messages_count > 3 or thinking_messages_length > 200) - # and not message_earliest.is_private_message() # 避免在私聊时插入reply - # ): - # logger.debug(f"距离原始消息太长,设置回复消息{message_earliest.processed_plain_text}") - # message_earliest.set_reply() - await message_earliest.process() # 获取 MessageSender 的单例实例并发送消息 - await MessageSender().send_message(message_earliest) - + typing_time = calculate_typing_time( + input_string=message_earliest.processed_plain_text, + thinking_start_time=message_earliest.thinking_start_time, + is_emoji=message_earliest.is_emoji, + ) + logger.trace(f"\n{message_earliest.processed_plain_text},{typing_time},计算输入时间结束\n") + await asyncio.sleep(typing_time) + logger.debug(f"\n{message_earliest.processed_plain_text},{typing_time},等待输入时间结束\n") + + await self.storage.store_message(message_earliest, message_earliest.chat_stream) + + await MessageSender().send_message(message_earliest) container.remove_message(message_earliest) diff --git a/src/plugins/chat_module/heartFC_chat/pf_chatting.py b/src/plugins/chat_module/heartFC_chat/pf_chatting.py index 80b78838d..33de6dda1 100644 --- a/src/plugins/chat_module/heartFC_chat/pf_chatting.py +++ b/src/plugins/chat_module/heartFC_chat/pf_chatting.py @@ -5,6 +5,7 @@ from typing import List, Optional, Dict, Any, TYPE_CHECKING import json from src.plugins.chat.message import (MessageRecv, BaseMessageInfo, MessageThinking, MessageSending) +from src.plugins.chat.message import MessageSet, Seg # Local import needed after move from src.plugins.chat.chat_stream import ChatStream from src.plugins.chat.message import UserInfo from src.heart_flow.heartflow import heartflow, SubHeartflow @@ -54,9 +55,9 @@ PLANNER_TOOL_DEFINITION = [ class PFChatting: """ - Manages a continuous Plan-Filter-Check (now Plan-Replier-Sender) loop - for generating replies within a specific chat stream, controlled by a timer. - The loop runs as long as the timer > 0. + 管理一个连续的Plan-Filter-Check (现在改为Plan-Replier-Sender)循环 + 用于在特定聊天流中生成回复,由计时器控制。 + 只要计时器>0,循环就会继续。 """ def __init__(self, chat_id: str, heartfc_controller_instance: "HeartFC_Controller"): @@ -162,7 +163,7 @@ class PFChatting: self._loop_timer = max(0, new_timer_value) # Log less frequently, e.g., every 10 seconds or significant change? # if self._trigger_count_this_activation % 5 == 0: - logger.info(f"{log_prefix} 麦麦现在想聊{self._loop_timer:.1f}秒") + # logger.info(f"{log_prefix} 麦麦现在想聊{self._loop_timer:.1f}秒") # Start the loop if it wasn't active and timer is positive if not self._loop_active and self._loop_timer > 0: @@ -230,7 +231,7 @@ class PFChatting: reasoning = planner_result.get("reasoning", "Planner did not provide reasoning.") emoji_query = planner_result.get("emoji_query", "") current_mind = planner_result.get("current_mind", "[Mind unavailable]") - send_emoji_from_tools = planner_result.get("send_emoji_from_tools", "") # Emoji from tools + # send_emoji_from_tools = planner_result.get("send_emoji_from_tools", "") # Emoji from tools observed_messages = planner_result.get("observed_messages", []) llm_error = planner_result.get("llm_error", False) @@ -260,7 +261,7 @@ class PFChatting: anchor_message=anchor_message, thinking_id=thinking_id, current_mind=current_mind, - send_emoji=send_emoji_from_tools, # Pass tool emoji query + # send_emoji=send_emoji_from_tools, # Pass tool emoji query ) except Exception as e_replier: logger.error(f"{log_prefix} 循环: 回复器工作失败: {e_replier}") @@ -269,17 +270,13 @@ class PFChatting: if replier_result: # --- Sender Work --- # try: - # Pass emoji query from PLANNER if planner decided text+emoji - # If planner just said text_reply, use emoji from TOOLS passed via replier_result - final_emoji_query = emoji_query if emoji_query else replier_result.get("send_emoji", "") - await self._sender( thinking_id=thinking_id, anchor_message=anchor_message, - response_set=replier_result["response_set"], - send_emoji=final_emoji_query # Use planner's or tool's emoji query + response_set=replier_result, + send_emoji=emoji_query ) - logger.info(f"{log_prefix} 循环: 发送器完成成功.") + # logger.info(f"{log_prefix} 循环: 发送器完成成功.") except Exception as e_sender: logger.error(f"{log_prefix} 循环: 发送器失败: {e_sender}") # _sender should handle cleanup, but double check @@ -287,7 +284,6 @@ class PFChatting: else: logger.warning(f"{log_prefix} 循环: 回复器未产生结果. 跳过发送.") self._cleanup_thinking_message(thinking_id) - elif action == "emoji_reply": logger.info(f"{log_prefix} PFChatting: 麦麦决定回复表情 ('{emoji_query}'). 理由: {reasoning}") action_taken_this_cycle = True @@ -369,7 +365,7 @@ class PFChatting: observed_messages: List[dict] = [] tool_result_info = {} get_mid_memory_id = [] - send_emoji_from_tools = "" # Emoji suggested by tools + # send_emoji_from_tools = "" # Emoji suggested by tools current_mind: Optional[str] = None llm_error = False # Flag for LLM failure @@ -381,7 +377,7 @@ class PFChatting: "reasoning": "SubHeartflow not available", "emoji_query": "", "current_mind": None, - "send_emoji_from_tools": "", + # "send_emoji_from_tools": "", "observed_messages": [], "llm_error": True, } @@ -415,9 +411,9 @@ class PFChatting: logger.debug(f"{log_prefix}[Planner] 规划前工具结果: {tool_result_info}") # Extract memory IDs and potential emoji query from tools get_mid_memory_id = [mem["content"] for mem in tool_result_info.get("mid_chat_mem", []) if "content" in mem] - send_emoji_from_tools = next((item["content"] for item in tool_result_info.get("send_emoji", []) if "content" in item), "") - if send_emoji_from_tools: - logger.info(f"{log_prefix}[Planner] 工具建议表情: '{send_emoji_from_tools}'") + # send_emoji_from_tools = next((item["content"] for item in tool_result_info.get("send_emoji", []) if "content" in item), "") + # if send_emoji_from_tools: + # logger.info(f"{log_prefix}[Planner] 工具建议表情: '{send_emoji_from_tools}'") except Exception as e_tool: logger.error(f"{log_prefix}[Planner] 规划前工具使用失败: {e_tool}") @@ -470,7 +466,7 @@ class PFChatting: reasoning = arguments.get("reasoning", "未提供理由") # Planner explicitly provides emoji query if action is emoji_reply or text_reply wants emoji emoji_query = arguments.get("emoji_query", "") - logger.info( + logger.debug( f"{log_prefix}[Planner] LLM 决策: {action}, 理由: {reasoning}, EmojiQuery: '{emoji_query}'" ) except json.JSONDecodeError as json_e: @@ -512,7 +508,7 @@ class PFChatting: "reasoning": reasoning, "emoji_query": emoji_query, # Explicit query from Planner/LLM "current_mind": current_mind, - "send_emoji_from_tools": send_emoji_from_tools, # Emoji suggested by tools (used as fallback) + # "send_emoji_from_tools": send_emoji_from_tools, # Emoji suggested by tools (used as fallback) "observed_messages": observed_messages, "llm_error": llm_error, } @@ -522,9 +518,6 @@ class PFChatting: 重构观察到的最后一条消息作为回复的锚点, 如果重构失败或观察为空,则创建一个占位符。 """ - if not self.chat_stream: - logger.error(f"{self._get_log_prefix()} 无法获取锚点消息: ChatStream 不可用.") - return None try: last_msg_dict = None @@ -533,7 +526,9 @@ class PFChatting: if last_msg_dict: try: - anchor_message = MessageRecv(last_msg_dict, chat_stream=self.chat_stream) + # anchor_message = MessageRecv(last_msg_dict, chat_stream=self.chat_stream) + anchor_message = MessageRecv(last_msg_dict) # 移除 chat_stream 参数 + anchor_message.update_chat_stream(self.chat_stream) # 添加 update_chat_stream 调用 if not ( anchor_message and anchor_message.message_info @@ -587,6 +582,7 @@ class PFChatting: except Exception as e: logger.error(f"{log_prefix} Error cleaning up thinking message {thinking_id}: {e}") + # --- 发送器 (Sender) --- # async def _sender( self, thinking_id: str, @@ -597,55 +593,27 @@ class PFChatting: """ 发送器 (Sender): 使用本类的方法发送生成的回复。 处理相关的操作,如发送表情和更新关系。 - Raises exception on failure to signal the loop. """ log_prefix = self._get_log_prefix() - if not response_set: - logger.error(f"{log_prefix}[Sender-{thinking_id}] Called with empty response_set.") - self._cleanup_thinking_message(thinking_id) - raise ValueError("Sender called with no response_set") first_bot_msg: Optional[MessageSending] = None - send_success = False - try: - # --- Send the main text response (using moved method) --- # - logger.debug(f"{log_prefix}[Sender-{thinking_id}] Sending response messages...") - first_bot_msg = await self._send_response_messages(anchor_message, response_set, thinking_id) + # 尝试发送回复消息 + first_bot_msg = await self._send_response_messages(anchor_message, response_set, thinking_id) + if first_bot_msg: + # --- 处理关联表情(如果指定) --- # + if send_emoji: + logger.info(f"{log_prefix}[Sender-{thinking_id}] 正在发送关联表情: '{send_emoji}'") + # 优先使用first_bot_msg作为锚点,否则回退到原始锚点 + emoji_anchor = first_bot_msg if first_bot_msg else anchor_message + await self._handle_emoji(emoji_anchor, response_set, send_emoji) - if first_bot_msg: - send_success = True - logger.info(f"{log_prefix}[Sender-{thinking_id}] Successfully sent reply.") - - # --- Handle associated emoji (if specified) using moved method --- # - if send_emoji: - logger.info(f"{log_prefix}[Sender-{thinking_id}] Sending associated emoji: '{send_emoji}'") - try: - # Use first_bot_msg as anchor if available, otherwise fallback to original anchor - emoji_anchor = first_bot_msg if first_bot_msg else anchor_message - await self._handle_emoji(emoji_anchor, response_set, send_emoji) - except Exception as e_emoji: - logger.error(f"{log_prefix}[Sender-{thinking_id}] Failed to send associated emoji: {e_emoji}") - - # --- Update relationship (using moved method) --- # - try: - await self._update_relationship(anchor_message, response_set) - # logger.debug(f"{log_prefix}[Sender-{thinking_id}] Updated relationship.") - except Exception as e_rel: - logger.error(f"{log_prefix}[Sender-{thinking_id}] Failed to update relationship: {e_rel}") - - else: - send_success = False - logger.warning(f"{log_prefix}[Sender-{thinking_id}] Failed to send reply (_send_response_messages returned None). Thinking message {thinking_id} likely removed.") - # No cleanup needed here, as _send_response_messages returning None implies it's handled/gone. - raise RuntimeError("Sending reply failed, _send_response_messages returned None.") - - except Exception as e: - logger.error(f"{log_prefix}[Sender-{thinking_id}] Error during sending process: {e}") - logger.error(traceback.format_exc()) - if not send_success: - # Ensure cleanup if error happened before _send_response_messages or during post-send actions - self._cleanup_thinking_message(thinking_id) - raise + # --- 更新关系状态 --- # + await self._update_relationship(anchor_message, response_set) + + else: + # logger.warning(f"{log_prefix}[Sender-{thinking_id}] 发送回复失败(_send_response_messages返回None)。思考消息{thinking_id}可能已被移除。") + # 无需清理,因为_send_response_messages返回None意味着已处理/已删除 + raise RuntimeError("发送回复失败,_send_response_messages返回None") async def shutdown(self): """ @@ -724,12 +692,9 @@ class PFChatting: # --- 回复器 (Replier) 的定义 --- # async def _replier_work( self, - observed_messages: List[dict], # Added observed_messages back, potentially useful context for GPT anchor_message: MessageRecv, thinking_id: str, - current_mind: Optional[str], # Pass current mind for context - send_emoji: str, # Emoji query from tools - ) -> Optional[Dict[str, Any]]: + ) -> Optional[List[str]]: """ 回复器 (Replier): 核心逻辑用于生成回复。 """ @@ -753,10 +718,7 @@ class PFChatting: # --- 准备并返回结果 --- # logger.info(f"{log_prefix}[Replier-{thinking_id}] 成功生成了回复集: {' '.join(response_set)[:50]}...") - return { - "response_set": response_set, - "send_emoji": send_emoji, # Pass through the emoji query from tools/planner - } + return response_set except Exception as e: logger.error(f"{log_prefix}[Replier-{thinking_id}] Unexpected error in replier_work: {e}") @@ -796,17 +758,15 @@ class PFChatting: self, anchor_message: Optional[MessageRecv], response_set: List[str], thinking_id: str ) -> Optional[MessageSending]: """发送回复消息 (尝试锚定到 anchor_message)""" - from src.plugins.chat.message import MessageSet, Seg # Local import needed after move - if not anchor_message or not anchor_message.chat_stream: logger.error(f"{self._get_log_prefix()} 无法发送回复,缺少有效的锚点消息或聊天流。") return None chat = anchor_message.chat_stream - # Access MessageManager via controller container = self.heartfc_controller.MessageManager().get_container(chat.stream_id) thinking_message = None - # Use container.remove_message directly if possible, otherwise iterate + + # 移除思考消息 for msg in container.messages[:]: # Iterate over a copy if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: thinking_message = msg @@ -816,7 +776,7 @@ class PFChatting: if not thinking_message: stream_name = chat_manager.get_stream_name(chat.stream_id) or chat.stream_id # 获取流名称 - logger.warning(f"[{stream_name}] 未找到对应的思考消息 {thinking_id},可能已超时被移除") + logger.warning(f"[{stream_name}] {thinking_id},思考太久了,超时被移除") return None thinking_start_time = thinking_message.thinking_start_time @@ -847,14 +807,9 @@ class PFChatting: first_bot_msg = bot_message message_set.add_message(bot_message) - if message_set.messages: # 确保有消息才添加 - # Access MessageManager via controller - self.heartfc_controller.MessageManager().add_message(message_set) - return first_bot_msg - else: - stream_name = chat_manager.get_stream_name(chat.stream_id) or chat.stream_id # 获取流名称 - logger.warning(f"[{stream_name}] 没有生成有效的回复消息集,无法发送。") - return None + + self.heartfc_controller.MessageManager().add_message(message_set) + return first_bot_msg async def _handle_emoji(self, anchor_message: Optional[MessageRecv], response_set: List[str], send_emoji: str = ""): """处理表情包 (尝试锚定到 anchor_message)"""