fix: 麦麦还没等到自己消息发出去就开始下一次思考

This commit is contained in:
SengokuCola
2025-04-20 16:23:26 +08:00
parent bf4ecb583b
commit 569d584732
2 changed files with 62 additions and 112 deletions

View File

@@ -55,23 +55,12 @@ class MessageSender:
) -> None:
"""发送消息"""
typing_time = calculate_typing_time(
input_string=message.processed_plain_text,
thinking_start_time=message.thinking_start_time,
is_emoji=message.is_emoji,
)
logger.trace(f"{message.processed_plain_text},{typing_time},计算输入时间结束")
await asyncio.sleep(typing_time)
logger.trace(f"{message.processed_plain_text},{typing_time},等待输入时间结束")
message_json = message.to_dict()
message_preview = truncate_message(message.processed_plain_text)
try:
end_point = global_config.api_urls.get(message.message_info.platform, None)
if end_point:
# logger.info(f"发送消息到{end_point}")
# logger.info(message_json)
try:
await global_api.send_message_rest(end_point, message_json)
except Exception as e:
@@ -173,6 +162,11 @@ class MessageManager:
container = self.get_container(chat_stream.stream_id)
container.add_message(message)
def check_if_sending_message_exist(self,chat_id,thinking_id):
container = self.get_container(chat_id)
if container.has_messages():
async def process_chat_messages(self, chat_id: str):
"""处理聊天流消息"""
container = self.get_container(chat_id)
@@ -205,21 +199,22 @@ class MessageManager:
start_time=thinking_start_time, end_time=now_time, stream_id=message_earliest.chat_stream.stream_id
)
# 暂时禁用因为没有anchor_message
# if (
# message_earliest.is_head
# and (thinking_messages_count > 3 or thinking_messages_length > 200)
# and not message_earliest.is_private_message() # 避免在私聊时插入reply
# ):
# logger.debug(f"距离原始消息太长,设置回复消息{message_earliest.processed_plain_text}")
# message_earliest.set_reply()
await message_earliest.process()
# 获取 MessageSender 的单例实例并发送消息
await MessageSender().send_message(message_earliest)
typing_time = calculate_typing_time(
input_string=message_earliest.processed_plain_text,
thinking_start_time=message_earliest.thinking_start_time,
is_emoji=message_earliest.is_emoji,
)
logger.trace(f"\n{message_earliest.processed_plain_text},{typing_time},计算输入时间结束\n")
await asyncio.sleep(typing_time)
logger.debug(f"\n{message_earliest.processed_plain_text},{typing_time},等待输入时间结束\n")
await self.storage.store_message(message_earliest, message_earliest.chat_stream)
await MessageSender().send_message(message_earliest)
container.remove_message(message_earliest)

View File

@@ -5,6 +5,7 @@ from typing import List, Optional, Dict, Any, TYPE_CHECKING
import json
from src.plugins.chat.message import (MessageRecv, BaseMessageInfo, MessageThinking,
MessageSending)
from src.plugins.chat.message import MessageSet, Seg # Local import needed after move
from src.plugins.chat.chat_stream import ChatStream
from src.plugins.chat.message import UserInfo
from src.heart_flow.heartflow import heartflow, SubHeartflow
@@ -54,9 +55,9 @@ PLANNER_TOOL_DEFINITION = [
class PFChatting:
"""
Manages a continuous Plan-Filter-Check (now Plan-Replier-Sender) loop
for generating replies within a specific chat stream, controlled by a timer.
The loop runs as long as the timer > 0.
管理一个连续的Plan-Filter-Check (现在改为Plan-Replier-Sender)循环
用于在特定聊天流中生成回复,由计时器控制。
只要计时器>0循环就会继续。
"""
def __init__(self, chat_id: str, heartfc_controller_instance: "HeartFC_Controller"):
@@ -162,7 +163,7 @@ class PFChatting:
self._loop_timer = max(0, new_timer_value)
# Log less frequently, e.g., every 10 seconds or significant change?
# if self._trigger_count_this_activation % 5 == 0:
logger.info(f"{log_prefix} 麦麦现在想聊{self._loop_timer:.1f}")
# logger.info(f"{log_prefix} 麦麦现在想聊{self._loop_timer:.1f}秒")
# Start the loop if it wasn't active and timer is positive
if not self._loop_active and self._loop_timer > 0:
@@ -230,7 +231,7 @@ class PFChatting:
reasoning = planner_result.get("reasoning", "Planner did not provide reasoning.")
emoji_query = planner_result.get("emoji_query", "")
current_mind = planner_result.get("current_mind", "[Mind unavailable]")
send_emoji_from_tools = planner_result.get("send_emoji_from_tools", "") # Emoji from tools
# send_emoji_from_tools = planner_result.get("send_emoji_from_tools", "") # Emoji from tools
observed_messages = planner_result.get("observed_messages", [])
llm_error = planner_result.get("llm_error", False)
@@ -260,7 +261,7 @@ class PFChatting:
anchor_message=anchor_message,
thinking_id=thinking_id,
current_mind=current_mind,
send_emoji=send_emoji_from_tools, # Pass tool emoji query
# send_emoji=send_emoji_from_tools, # Pass tool emoji query
)
except Exception as e_replier:
logger.error(f"{log_prefix} 循环: 回复器工作失败: {e_replier}")
@@ -269,17 +270,13 @@ class PFChatting:
if replier_result:
# --- Sender Work --- #
try:
# Pass emoji query from PLANNER if planner decided text+emoji
# If planner just said text_reply, use emoji from TOOLS passed via replier_result
final_emoji_query = emoji_query if emoji_query else replier_result.get("send_emoji", "")
await self._sender(
thinking_id=thinking_id,
anchor_message=anchor_message,
response_set=replier_result["response_set"],
send_emoji=final_emoji_query # Use planner's or tool's emoji query
response_set=replier_result,
send_emoji=emoji_query
)
logger.info(f"{log_prefix} 循环: 发送器完成成功.")
# logger.info(f"{log_prefix} 循环: 发送器完成成功.")
except Exception as e_sender:
logger.error(f"{log_prefix} 循环: 发送器失败: {e_sender}")
# _sender should handle cleanup, but double check
@@ -287,7 +284,6 @@ class PFChatting:
else:
logger.warning(f"{log_prefix} 循环: 回复器未产生结果. 跳过发送.")
self._cleanup_thinking_message(thinking_id)
elif action == "emoji_reply":
logger.info(f"{log_prefix} PFChatting: 麦麦决定回复表情 ('{emoji_query}'). 理由: {reasoning}")
action_taken_this_cycle = True
@@ -369,7 +365,7 @@ class PFChatting:
observed_messages: List[dict] = []
tool_result_info = {}
get_mid_memory_id = []
send_emoji_from_tools = "" # Emoji suggested by tools
# send_emoji_from_tools = "" # Emoji suggested by tools
current_mind: Optional[str] = None
llm_error = False # Flag for LLM failure
@@ -381,7 +377,7 @@ class PFChatting:
"reasoning": "SubHeartflow not available",
"emoji_query": "",
"current_mind": None,
"send_emoji_from_tools": "",
# "send_emoji_from_tools": "",
"observed_messages": [],
"llm_error": True,
}
@@ -415,9 +411,9 @@ class PFChatting:
logger.debug(f"{log_prefix}[Planner] 规划前工具结果: {tool_result_info}")
# Extract memory IDs and potential emoji query from tools
get_mid_memory_id = [mem["content"] for mem in tool_result_info.get("mid_chat_mem", []) if "content" in mem]
send_emoji_from_tools = next((item["content"] for item in tool_result_info.get("send_emoji", []) if "content" in item), "")
if send_emoji_from_tools:
logger.info(f"{log_prefix}[Planner] 工具建议表情: '{send_emoji_from_tools}'")
# send_emoji_from_tools = next((item["content"] for item in tool_result_info.get("send_emoji", []) if "content" in item), "")
# if send_emoji_from_tools:
# logger.info(f"{log_prefix}[Planner] 工具建议表情: '{send_emoji_from_tools}'")
except Exception as e_tool:
logger.error(f"{log_prefix}[Planner] 规划前工具使用失败: {e_tool}")
@@ -470,7 +466,7 @@ class PFChatting:
reasoning = arguments.get("reasoning", "未提供理由")
# Planner explicitly provides emoji query if action is emoji_reply or text_reply wants emoji
emoji_query = arguments.get("emoji_query", "")
logger.info(
logger.debug(
f"{log_prefix}[Planner] LLM 决策: {action}, 理由: {reasoning}, EmojiQuery: '{emoji_query}'"
)
except json.JSONDecodeError as json_e:
@@ -512,7 +508,7 @@ class PFChatting:
"reasoning": reasoning,
"emoji_query": emoji_query, # Explicit query from Planner/LLM
"current_mind": current_mind,
"send_emoji_from_tools": send_emoji_from_tools, # Emoji suggested by tools (used as fallback)
# "send_emoji_from_tools": send_emoji_from_tools, # Emoji suggested by tools (used as fallback)
"observed_messages": observed_messages,
"llm_error": llm_error,
}
@@ -522,9 +518,6 @@ class PFChatting:
重构观察到的最后一条消息作为回复的锚点,
如果重构失败或观察为空,则创建一个占位符。
"""
if not self.chat_stream:
logger.error(f"{self._get_log_prefix()} 无法获取锚点消息: ChatStream 不可用.")
return None
try:
last_msg_dict = None
@@ -533,7 +526,9 @@ class PFChatting:
if last_msg_dict:
try:
anchor_message = MessageRecv(last_msg_dict, chat_stream=self.chat_stream)
# anchor_message = MessageRecv(last_msg_dict, chat_stream=self.chat_stream)
anchor_message = MessageRecv(last_msg_dict) # 移除 chat_stream 参数
anchor_message.update_chat_stream(self.chat_stream) # 添加 update_chat_stream 调用
if not (
anchor_message
and anchor_message.message_info
@@ -587,6 +582,7 @@ class PFChatting:
except Exception as e:
logger.error(f"{log_prefix} Error cleaning up thinking message {thinking_id}: {e}")
# --- 发送器 (Sender) --- #
async def _sender(
self,
thinking_id: str,
@@ -597,55 +593,27 @@ class PFChatting:
"""
发送器 (Sender): 使用本类的方法发送生成的回复。
处理相关的操作,如发送表情和更新关系。
Raises exception on failure to signal the loop.
"""
log_prefix = self._get_log_prefix()
if not response_set:
logger.error(f"{log_prefix}[Sender-{thinking_id}] Called with empty response_set.")
self._cleanup_thinking_message(thinking_id)
raise ValueError("Sender called with no response_set")
first_bot_msg: Optional[MessageSending] = None
send_success = False
try:
# --- Send the main text response (using moved method) --- #
logger.debug(f"{log_prefix}[Sender-{thinking_id}] Sending response messages...")
first_bot_msg = await self._send_response_messages(anchor_message, response_set, thinking_id)
# 尝试发送回复消息
first_bot_msg = await self._send_response_messages(anchor_message, response_set, thinking_id)
if first_bot_msg:
# --- 处理关联表情(如果指定) --- #
if send_emoji:
logger.info(f"{log_prefix}[Sender-{thinking_id}] 正在发送关联表情: '{send_emoji}'")
# 优先使用first_bot_msg作为锚点否则回退到原始锚点
emoji_anchor = first_bot_msg if first_bot_msg else anchor_message
await self._handle_emoji(emoji_anchor, response_set, send_emoji)
if first_bot_msg:
send_success = True
logger.info(f"{log_prefix}[Sender-{thinking_id}] Successfully sent reply.")
# --- Handle associated emoji (if specified) using moved method --- #
if send_emoji:
logger.info(f"{log_prefix}[Sender-{thinking_id}] Sending associated emoji: '{send_emoji}'")
try:
# Use first_bot_msg as anchor if available, otherwise fallback to original anchor
emoji_anchor = first_bot_msg if first_bot_msg else anchor_message
await self._handle_emoji(emoji_anchor, response_set, send_emoji)
except Exception as e_emoji:
logger.error(f"{log_prefix}[Sender-{thinking_id}] Failed to send associated emoji: {e_emoji}")
# --- Update relationship (using moved method) --- #
try:
await self._update_relationship(anchor_message, response_set)
# logger.debug(f"{log_prefix}[Sender-{thinking_id}] Updated relationship.")
except Exception as e_rel:
logger.error(f"{log_prefix}[Sender-{thinking_id}] Failed to update relationship: {e_rel}")
else:
send_success = False
logger.warning(f"{log_prefix}[Sender-{thinking_id}] Failed to send reply (_send_response_messages returned None). Thinking message {thinking_id} likely removed.")
# No cleanup needed here, as _send_response_messages returning None implies it's handled/gone.
raise RuntimeError("Sending reply failed, _send_response_messages returned None.")
except Exception as e:
logger.error(f"{log_prefix}[Sender-{thinking_id}] Error during sending process: {e}")
logger.error(traceback.format_exc())
if not send_success:
# Ensure cleanup if error happened before _send_response_messages or during post-send actions
self._cleanup_thinking_message(thinking_id)
raise
# --- 更新关系状态 --- #
await self._update_relationship(anchor_message, response_set)
else:
# logger.warning(f"{log_prefix}[Sender-{thinking_id}] 发送回复失败(_send_response_messages返回None)。思考消息{thinking_id}可能已被移除。")
# 无需清理因为_send_response_messages返回None意味着已处理/已删除
raise RuntimeError("发送回复失败_send_response_messages返回None")
async def shutdown(self):
"""
@@ -724,12 +692,9 @@ class PFChatting:
# --- 回复器 (Replier) 的定义 --- #
async def _replier_work(
self,
observed_messages: List[dict], # Added observed_messages back, potentially useful context for GPT
anchor_message: MessageRecv,
thinking_id: str,
current_mind: Optional[str], # Pass current mind for context
send_emoji: str, # Emoji query from tools
) -> Optional[Dict[str, Any]]:
) -> Optional[List[str]]:
"""
回复器 (Replier): 核心逻辑用于生成回复。
"""
@@ -753,10 +718,7 @@ class PFChatting:
# --- 准备并返回结果 --- #
logger.info(f"{log_prefix}[Replier-{thinking_id}] 成功生成了回复集: {' '.join(response_set)[:50]}...")
return {
"response_set": response_set,
"send_emoji": send_emoji, # Pass through the emoji query from tools/planner
}
return response_set
except Exception as e:
logger.error(f"{log_prefix}[Replier-{thinking_id}] Unexpected error in replier_work: {e}")
@@ -796,17 +758,15 @@ class PFChatting:
self, anchor_message: Optional[MessageRecv], response_set: List[str], thinking_id: str
) -> Optional[MessageSending]:
"""发送回复消息 (尝试锚定到 anchor_message)"""
from src.plugins.chat.message import MessageSet, Seg # Local import needed after move
if not anchor_message or not anchor_message.chat_stream:
logger.error(f"{self._get_log_prefix()} 无法发送回复,缺少有效的锚点消息或聊天流。")
return None
chat = anchor_message.chat_stream
# Access MessageManager via controller
container = self.heartfc_controller.MessageManager().get_container(chat.stream_id)
thinking_message = None
# Use container.remove_message directly if possible, otherwise iterate
# 移除思考消息
for msg in container.messages[:]: # Iterate over a copy
if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
thinking_message = msg
@@ -816,7 +776,7 @@ class PFChatting:
if not thinking_message:
stream_name = chat_manager.get_stream_name(chat.stream_id) or chat.stream_id # 获取流名称
logger.warning(f"[{stream_name}] 未找到对应的思考消息 {thinking_id}可能已超时被移除")
logger.warning(f"[{stream_name}] {thinking_id}思考太久了,超时被移除")
return None
thinking_start_time = thinking_message.thinking_start_time
@@ -847,14 +807,9 @@ class PFChatting:
first_bot_msg = bot_message
message_set.add_message(bot_message)
if message_set.messages: # 确保有消息才添加
# Access MessageManager via controller
self.heartfc_controller.MessageManager().add_message(message_set)
return first_bot_msg
else:
stream_name = chat_manager.get_stream_name(chat.stream_id) or chat.stream_id # 获取流名称
logger.warning(f"[{stream_name}] 没有生成有效的回复消息集,无法发送。")
return None
self.heartfc_controller.MessageManager().add_message(message_set)
return first_bot_msg
async def _handle_emoji(self, anchor_message: Optional[MessageRecv], response_set: List[str], send_emoji: str = ""):
"""处理表情包 (尝试锚定到 anchor_message)"""