better:更简洁的发送机制

This commit is contained in:
SengokuCola
2025-04-25 23:45:59 +08:00
parent 6471f5e227
commit 2c8343b23a
2 changed files with 256 additions and 80 deletions

View File

@@ -16,7 +16,6 @@ from src.plugins.chat.utils_image import image_path_to_base64 # Local import ne
from src.plugins.utils.timer_calculator import Timer # <--- Import Timer
from src.plugins.heartFC_chat.heartFC_generator import HeartFCGenerator
from src.do_tool.tool_use import ToolUser
from ..chat.message_sender import message_manager # <-- Import the global manager
from src.plugins.emoji_system.emoji_manager import emoji_manager
from src.plugins.utils.json_utils import process_llm_tool_response # 导入新的JSON工具
from src.heart_flow.sub_mind import SubMind
@@ -25,6 +24,7 @@ from src.plugins.heartFC_chat.heartflow_prompt_builder import global_prompt_mana
import contextlib
from src.plugins.utils.chat_message_builder import num_new_messages_since
from src.plugins.heartFC_chat.heartFC_Cycleinfo import CycleInfo
from .heartFC_sender import HeartFCSender
# --- End import ---
@@ -181,6 +181,7 @@ class HeartFChatting:
# 依赖注入存储
self.gpt_instance = HeartFCGenerator() # 文本回复生成器
self.tool_user = ToolUser() # 工具使用实例
self.heart_fc_sender = HeartFCSender()
# LLM规划器配置
self.planner_llm = LLMRequest(
@@ -301,11 +302,6 @@ class HeartFChatting:
# 防止循环过快消耗资源
await self._handle_cycle_delay(action_taken, loop_cycle_start_time, self.log_prefix)
# 等待直到所有消息都发送完成
with Timer("发送消息", cycle_timers):
while await self._should_skip_cycle(thinking_id):
await asyncio.sleep(0.2)
# 完成当前循环并保存历史
self._current_cycle.complete_cycle()
self._cycle_history.append(self._current_cycle)
@@ -593,10 +589,6 @@ class HeartFChatting:
await asyncio.sleep(1.5)
async def _should_skip_cycle(self, thinking_id: str) -> bool:
"""检查是否应该跳过当前循环周期"""
return message_manager.check_if_sending_message_exist(self.stream_id, thinking_id)
async def _log_cycle_timers(self, cycle_timers: dict, log_prefix: str):
"""记录循环周期的计时器结果"""
if cycle_timers:
@@ -806,26 +798,40 @@ class HeartFChatting:
send_emoji: str, # Emoji query decided by planner or tools
):
"""
发送器 (Sender): 使用本类的方法发送生成的回复。
发送器 (Sender): 使用 HeartFCSender 实例发送生成的回复。
处理相关的操作,如发送表情和更新关系。
"""
logger.info(f"{self.log_prefix}开始发送回复")
logger.info(f"{self.log_prefix}开始发送回复 (使用 HeartFCSender)")
first_bot_msg: Optional[MessageSending] = None
# 尝试发送回复消息
first_bot_msg = await self._send_response_messages(anchor_message, response_set, thinking_id)
try:
# _send_response_messages 现在将使用 self.sender 内部处理注册和发送
# 它需要负责创建 MessageThinking 和 MessageSending 对象
# 并调用 self.sender.register_thinking 和 self.sender.type_and_send_message
first_bot_msg = await self._send_response_messages(
anchor_message=anchor_message,
response_set=response_set,
thinking_id=thinking_id
)
if first_bot_msg:
# --- 处理关联表情(如果指定) --- #
if send_emoji:
logger.info(f"{self.log_prefix}正在发送关联表情: '{send_emoji}'")
# 优先使用 first_bot_msg 作为锚点,否则回退到原始锚点
emoji_anchor = first_bot_msg if first_bot_msg else anchor_message
emoji_anchor = first_bot_msg
await self._handle_emoji(emoji_anchor, response_set, send_emoji)
else:
# logger.warning(f"{self.log_prefix}[Sender-{thinking_id}] 发送回复失败(_send_response_messages返回None)。思考消息{thinking_id}可能已被移除。")
# 无需清理,因为_send_response_messages返回None意味着已处理/已删除
raise RuntimeError("发送回复失败,_send_response_messages返回None")
# 如果 _send_response_messages 返回 None,表示在发送前就失败或没有消息可发送
logger.warning(f"{self.log_prefix}[Sender-{thinking_id}] 未能发送任何回复消息 (_send_response_messages 返回 None)。")
# 这里可能不需要抛出异常,取决于 _send_response_messages 的具体实现
except Exception as e:
# 异常现在由 type_and_send_message 内部处理日志,这里只记录发送流程失败
logger.error(f"{self.log_prefix}[Sender-{thinking_id}] 发送回复过程中遇到错误: {e}")
# 思考状态应已在 type_and_send_message 的 finally 块中清理
# 可以选择重新抛出或根据业务逻辑处理
# raise RuntimeError(f"发送回复失败: {e}") from e
async def shutdown(self):
"""优雅关闭HeartFChatting实例取消活动循环任务"""
@@ -959,99 +965,103 @@ class HeartFChatting:
thinking_start_time=thinking_time_point,
)
# Access MessageManager directly
await message_manager.add_message(thinking_message)
await self.heart_fc_sender.register_thinking(thinking_message)
return thinking_id
async def _send_response_messages(
self, anchor_message: Optional[MessageRecv], response_set: List[str], thinking_id: str
) -> Optional[MessageSending]:
"""发送回复消息 (尝试锚定到 anchor_message)"""
"""发送回复消息 (尝试锚定到 anchor_message),使用 HeartFCSender"""
if not anchor_message or not anchor_message.chat_stream:
logger.error(f"{self.log_prefix} 无法发送回复,缺少有效的锚点消息或聊天流。")
return None
# 记录锚点消息ID
if self._current_cycle and anchor_message:
chat = anchor_message.chat_stream
chat_id = chat.stream_id
stream_name = chat_manager.get_stream_name(chat_id) or chat_id # 获取流名称用于日志
# 检查思考过程是否仍在进行,并获取开始时间
thinking_start_time = await self.heart_fc_sender.get_thinking_start_time(chat_id, thinking_id)
if thinking_start_time is None:
logger.warning(f"[{stream_name}] {thinking_id} 思考过程未找到或已结束,无法发送回复。")
return None
# 记录锚点消息ID和回复文本在发送前记录
self._current_cycle.set_response_info(
response_text=response_set, anchor_message_id=anchor_message.message_info.message_id
)
chat = anchor_message.chat_stream
container = await message_manager.get_container(chat.stream_id)
thinking_message = None
# 移除思考消息
for msg in container.messages[:]: # Iterate over a copy
if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id:
thinking_message = msg
container.messages.remove(msg) # Remove the message directly here
# logger.debug(f"{self.log_prefix} Removed thinking message {thinking_id} via iteration.")
break
if not thinking_message:
stream_name = chat_manager.get_stream_name(chat.stream_id) or chat.stream_id # 获取流名称
logger.warning(f"[{stream_name}] {thinking_id},思考太久了,超时被移除")
return None
thinking_start_time = thinking_message.thinking_start_time
message_set = MessageSet(chat, thinking_id)
mark_head = False
first_bot_msg = None
reply_message_ids = [] # 用于记录所有回复消息ID
first_bot_msg: Optional[MessageSending] = None
reply_message_ids = [] # 记录实际发送的消息ID
bot_user_info = UserInfo(
user_id=global_config.BOT_QQ,
user_nickname=global_config.BOT_NICKNAME,
platform=anchor_message.message_info.platform,
)
for msg_text in response_set:
for i, msg_text in enumerate(response_set):
# 为每个消息片段生成唯一ID
part_message_id = f"{thinking_id}_{i}"
message_segment = Seg(type="text", data=msg_text)
bot_message = MessageSending(
message_id=thinking_id, # 使用 thinking_id 作为批次标识
message_id=part_message_id, # 使用片段的唯一ID
chat_stream=chat,
bot_user_info=bot_user_info,
sender_info=anchor_message.message_info.user_info, # 发送给锚点消息的用户
sender_info=anchor_message.message_info.user_info,
message_segment=message_segment,
reply=anchor_message, # 回复锚点消息
reply=anchor_message, # 回复原始锚点
is_head=not mark_head,
is_emoji=False,
thinking_start_time=thinking_start_time,
thinking_start_time=thinking_start_time, # 传递原始思考开始时间
)
try:
if not mark_head:
mark_head = True
first_bot_msg = bot_message
message_set.add_message(bot_message)
reply_message_ids.append(bot_message.message_info.message_id)
first_bot_msg = bot_message # 保存第一个成功发送的消息对象
await self.heart_fc_sender.type_and_send_message(bot_message, type = False)
else:
await self.heart_fc_sender.type_and_send_message(bot_message, type = True)
# 记录回复消息ID列表
if self._current_cycle:
self._current_cycle.set_response_info(reply_message_ids=reply_message_ids)
reply_message_ids.append(part_message_id) # 记录我们生成的ID
# Access MessageManager directly
await message_manager.add_message(message_set)
return first_bot_msg
except Exception as e:
logger.error(f"{self.log_prefix}[Sender-{thinking_id}] 发送回复片段 {i} ({part_message_id}) 时失败: {e}")
# 这里可以选择是继续发送下一个片段还是中止
# 在尝试发送完所有片段后,完成原始的 thinking_id 状态
try:
await self.heart_fc_sender.complete_thinking(chat_id, thinking_id)
except Exception as e:
logger.error(f"{self.log_prefix}[Sender-{thinking_id}] 完成思考状态 {thinking_id} 时出错: {e}")
self._current_cycle.set_response_info(
response_text=response_set, # 保留原始文本
anchor_message_id=anchor_message.message_info.message_id, # 保留锚点ID
reply_message_ids=reply_message_ids # 添加实际发送的ID列表
)
return first_bot_msg # 返回第一个成功发送的消息对象
async def _handle_emoji(self, anchor_message: Optional[MessageRecv], response_set: List[str], send_emoji: str = ""):
"""处理表情包 (尝试锚定到 anchor_message)"""
"""处理表情包 (尝试锚定到 anchor_message),使用 HeartFCSender"""
if not anchor_message or not anchor_message.chat_stream:
logger.error(f"{self.log_prefix} 无法处理表情包,缺少有效的锚点消息或聊天流。")
return
chat = anchor_message.chat_stream
if send_emoji:
emoji_raw = await emoji_manager.get_emoji_for_text(send_emoji)
else:
emoji_text_source = "".join(response_set) if response_set else ""
emoji_raw = await emoji_manager.get_emoji_for_text(emoji_text_source)
if emoji_raw:
emoji_path, description = emoji_raw
# 记录表情信息
if self._current_cycle:
self._current_cycle.set_response_info(emoji_info=f"表情: {description}, 路径: {emoji_path}")
emoji_cq = image_path_to_base64(emoji_path)
thinking_time_point = round(time.time(), 2)
thinking_time_point = round(time.time(), 2) # 用于唯一ID
message_segment = Seg(type="emoji", data=emoji_cq)
bot_user_info = UserInfo(
user_id=global_config.BOT_QQ,
@@ -1059,17 +1069,22 @@ class HeartFChatting:
platform=anchor_message.message_info.platform,
)
bot_message = MessageSending(
message_id="me" + str(thinking_time_point), # 使用不同的 ID 前缀?
message_id="me" + str(thinking_time_point), # 表情消息的唯一ID
chat_stream=chat,
bot_user_info=bot_user_info,
sender_info=anchor_message.message_info.user_info,
message_segment=message_segment,
reply=anchor_message, # 回复锚点消息
is_head=False,
reply=anchor_message, # 回复原始锚点
is_head=False, # 表情通常不是头部消息
is_emoji=True,
# 不需要 thinking_start_time
)
# Access MessageManager directly
await message_manager.add_message(bot_message)
try:
await self.heart_fc_sender.send_and_store(bot_message)
except Exception as e:
logger.error(f"{self.log_prefix} 发送表情包 {bot_message.message_info.message_id} 时失败: {e}")
def get_cycle_history(self, last_n: Optional[int] = None) -> List[Dict[str, Any]]:
"""获取循环历史记录

View File

@@ -0,0 +1,161 @@
# src/plugins/heartFC_chat/heartFC_sender.py
import asyncio # 重新导入 asyncio
import time
from typing import Dict, List, Optional, Union # 重新导入类型
from src.common.logger import get_module_logger
from ..message.api import global_api
from ..chat.message import MessageSending, MessageThinking # 只保留 MessageSending 和 MessageThinking
from ..storage.storage import MessageStorage
from ..chat.utils import truncate_message
from src.common.logger import LogConfig, SENDER_STYLE_CONFIG
from src.plugins.chat.utils import calculate_typing_time
# 定义日志配置
sender_config = LogConfig(
# 使用消息发送专用样式
console_format=SENDER_STYLE_CONFIG["console_format"],
file_format=SENDER_STYLE_CONFIG["file_format"],
)
logger = get_module_logger("msg_sender", config=sender_config)
class HeartFCSender:
"""管理消息的注册、即时处理、发送和存储,并跟踪思考状态。"""
def __init__(self):
self.storage = MessageStorage()
# 用于存储活跃的思考消息
self.thinking_messages: Dict[str, Dict[str, MessageThinking]] = {}
self._thinking_lock = asyncio.Lock() # 保护 thinking_messages 的锁
async def send_message(self, message: MessageSending) -> None:
"""合并后的消息发送函数包含WS发送和日志记录"""
message_preview = truncate_message(message.processed_plain_text)
try:
# 直接调用API发送消息
await global_api.send_message(message)
logger.success(f"发送消息 '{message_preview}' 成功")
except Exception as e:
logger.error(f"发送消息 '{message_preview}' 失败: {str(e)}")
if not message.message_info.platform:
raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置请检查配置文件") from e
raise e # 重新抛出其他异常
async def register_thinking(self, thinking_message: MessageThinking):
"""注册一个思考中的消息。"""
if not thinking_message.chat_stream or not thinking_message.message_info.message_id:
logger.error("无法注册缺少 chat_stream 或 message_id 的思考消息")
return
chat_id = thinking_message.chat_stream.stream_id
message_id = thinking_message.message_info.message_id
async with self._thinking_lock:
if chat_id not in self.thinking_messages:
self.thinking_messages[chat_id] = {}
if message_id in self.thinking_messages[chat_id]:
logger.warning(f"[{chat_id}] 尝试注册已存在的思考消息 ID: {message_id}")
self.thinking_messages[chat_id][message_id] = thinking_message
logger.debug(f"[{chat_id}] Registered thinking message: {message_id}")
async def complete_thinking(self, chat_id: str, message_id: str):
"""完成并移除一个思考中的消息记录。"""
async with self._thinking_lock:
if chat_id in self.thinking_messages and message_id in self.thinking_messages[chat_id]:
del self.thinking_messages[chat_id][message_id]
logger.debug(f"[{chat_id}] Completed thinking message: {message_id}")
if not self.thinking_messages[chat_id]:
del self.thinking_messages[chat_id]
logger.debug(f"[{chat_id}] Removed empty thinking message container.")
def is_thinking(self, chat_id: str, message_id: str) -> bool:
"""检查指定的消息 ID 是否当前正处于思考状态。"""
return chat_id in self.thinking_messages and message_id in self.thinking_messages[chat_id]
async def get_thinking_start_time(self, chat_id: str, message_id: str) -> Optional[float]:
"""获取已注册思考消息的开始时间。"""
async with self._thinking_lock:
thinking_message = self.thinking_messages.get(chat_id, {}).get(message_id)
return thinking_message.thinking_start_time if thinking_message else None
async def type_and_send_message(self, message: MessageSending, type = False):
"""
立即处理、发送并存储单个 MessageSending 消息。
调用此方法前,应先调用 register_thinking 注册对应的思考消息。
此方法执行后会调用 complete_thinking 清理思考状态。
"""
if not message.chat_stream:
logger.error("消息缺少 chat_stream无法发送")
return
if not message.message_info or not message.message_info.message_id:
logger.error("消息缺少 message_info 或 message_id无法发送")
return
chat_id = message.chat_stream.stream_id
message_id = message.message_info.message_id
try:
_ = message.update_thinking_time()
# --- 条件应用 set_reply 逻辑 ---
if (
message.apply_set_reply_logic
and message.is_head
and not message.is_private_message()
):
logger.debug(f"[{chat_id}] 应用 set_reply 逻辑: {message.processed_plain_text[:20]}...")
message.set_reply()
# --- 结束条件 set_reply ---
await message.process()
if type:
typing_time = calculate_typing_time(
input_string=message.processed_plain_text,
thinking_start_time=message.thinking_start_time,
is_emoji=message.is_emoji,
)
await asyncio.sleep(typing_time)
await self.send_message(message)
await self.storage.store_message(message, message.chat_stream)
except Exception as e:
logger.error(
f"[{chat_id}] 处理或存储消息 {message_id} 时出错: {e}"
)
raise e
finally:
await self.complete_thinking(chat_id, message_id)
async def send_and_store(self, message: MessageSending):
"""处理、发送并存储单个消息,不涉及思考状态管理。"""
if not message.chat_stream:
logger.error(f"[{message.message_info.platform or 'UnknownPlatform'}] 消息缺少 chat_stream无法发送")
return
if not message.message_info or not message.message_info.message_id:
logger.error(f"[{message.chat_stream.stream_id if message.chat_stream else 'UnknownStream'}] 消息缺少 message_info 或 message_id无法发送")
return
chat_id = message.chat_stream.stream_id
message_id = message.message_info.message_id # 获取消息ID用于日志
try:
await message.process()
await asyncio.sleep(0.5)
await self.send_message(message) # 使用现有的发送方法
await self.storage.store_message(message, message.chat_stream) # 使用现有的存储方法
except Exception as e:
logger.error(
f"[{chat_id}] 处理或存储消息 {message_id} 时出错: {e}"
)
# 重新抛出异常,让调用者知道失败了
raise e