From 2eecd746af4bc799cd8fef4e17401253ddd8995c Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Wed, 23 Apr 2025 00:41:46 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E5=9F=BA=E4=BA=8E=E4=B8=8D?= =?UTF-8?q?=E5=90=8C=E5=BF=83=E6=B5=81=E5=8D=95=E7=8B=AC=E7=9A=84=E5=8F=91?= =?UTF-8?q?=E9=80=81=E5=99=A8=E5=AE=9E=E4=BE=8B=EF=BC=8C=E5=8F=8D=E6=AD=A3?= =?UTF-8?q?=E8=83=BD=E8=B7=91=EF=BC=8C=E4=BD=86=E6=88=91=E4=B9=9F=E4=B8=8D?= =?UTF-8?q?=E7=9F=A5=E9=81=93=E8=83=BD=E4=B8=8D=E8=83=BD=E8=B7=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 睡觉 --- src/do_tool/tool_use.py | 16 +- src/heart_flow/heartflow.py | 32 +- src/heart_flow/sub_heartflow.py | 122 +++++-- src/main.py | 8 +- src/plugins/chat/__init__.py | 2 +- src/plugins/chat/message.py | 2 + src/plugins/chat/message_sender.py | 343 ++++++++++++++++++ src/plugins/chat/messagesender.py | 291 --------------- src/plugins/heartFC_chat/heartFC_chat.py | 130 +++---- src/plugins/heartFC_chat/heartFC_generator.py | 10 +- .../heartFC_chat/heartflow_message_sender.py | 241 ------------ .../heartFC_chat/heartflow_prompt_builder.py | 20 +- src/plugins/heartFC_chat/normal_chat.py | 20 +- .../heartFC_chat/normal_chat_generator.py | 3 +- 14 files changed, 568 insertions(+), 672 deletions(-) create mode 100644 src/plugins/chat/message_sender.py delete mode 100644 src/plugins/chat/messagesender.py delete mode 100644 src/plugins/heartFC_chat/heartflow_message_sender.py diff --git a/src/do_tool/tool_use.py b/src/do_tool/tool_use.py index 46716acb0..997ba7ee5 100644 --- a/src/do_tool/tool_use.py +++ b/src/do_tool/tool_use.py @@ -3,10 +3,11 @@ from src.config.config import global_config import json from src.common.logger import get_module_logger, TOOL_USE_STYLE_CONFIG, LogConfig from src.do_tool.tool_can_use import get_all_tool_definitions, get_tool_instance -from src.heart_flow.sub_heartflow import SubHeartflow import traceback from src.plugins.person_info.relationship_manager import relationship_manager from src.plugins.chat.utils import parse_text_timestamps +from src.plugins.chat.chat_stream import ChatStream +from src.heart_flow.observation import ChattingObservation tool_use_config = LogConfig( # 使用消息发送专用样式 @@ -23,7 +24,7 @@ class ToolUser: ) @staticmethod - async def _build_tool_prompt(message_txt: str, subheartflow: SubHeartflow = None): + async def _build_tool_prompt(message_txt: str, chat_stream: ChatStream = None, observation: ChattingObservation = None): """构建工具使用的提示词 Args: @@ -34,8 +35,8 @@ class ToolUser: str: 构建好的提示词 """ - if subheartflow: - mid_memory_info = subheartflow.observations[0].mid_memory_info + if observation: + mid_memory_info = observation.mid_memory_info # print(f"intol111111111111111111111111111111111222222222222mid_memory_info:{mid_memory_info}") # 这些信息应该从调用者传入,而不是从self获取 @@ -102,14 +103,14 @@ class ToolUser: logger.error(f"执行工具调用时发生错误: {str(e)}") return None - async def use_tool(self, message_txt: str, sub_heartflow: SubHeartflow = None): + async def use_tool(self, message_txt: str, chat_stream: ChatStream = None, observation: ChattingObservation = None): """使用工具辅助思考,判断是否需要额外信息 Args: message_txt: 用户消息文本 sender_name: 发送者名称 chat_stream: 聊天流对象 - sub_heartflow: 子心流对象(可选) + observation: 观察对象(可选) Returns: dict: 工具使用结果,包含结构化的信息 @@ -118,7 +119,8 @@ class ToolUser: # 构建提示词 prompt = await self._build_tool_prompt( message_txt=message_txt, - subheartflow=sub_heartflow, + chat_stream=chat_stream, + observation=observation, ) # 定义可用工具 diff --git a/src/heart_flow/heartflow.py b/src/heart_flow/heartflow.py index 7afd0d067..7277d980a 100644 --- a/src/heart_flow/heartflow.py +++ b/src/heart_flow/heartflow.py @@ -20,7 +20,6 @@ from src.plugins.heartFC_chat.heartFC_generator import ResponseGenerator from src.do_tool.tool_use import ToolUser from src.plugins.chat.emoji_manager import emoji_manager # Module instance from src.plugins.person_info.relationship_manager import relationship_manager # Module instance -from src.plugins.heartFC_chat.heartflow_message_sender import MessageManager # --- End imports --- heartflow_config = LogConfig( @@ -66,9 +65,9 @@ LOG_INTERVAL_SECONDS = 3 # 日志记录间隔 (例如:3秒) - 保持与 inter # --- 新增:状态更新常量 --- STATE_UPDATE_INTERVAL_SECONDS = 30 # 状态更新检查间隔(秒) -FIVE_MINUTES = 3 * 60 -FIFTEEN_MINUTES = 6 * 60 -TWENTY_MINUTES = 9 * 60 +FIVE_MINUTES = 1 * 60 +FIFTEEN_MINUTES = 5 * 60 +TWENTY_MINUTES = 10 * 60 # --- 结束新增常量 --- @@ -111,7 +110,7 @@ class MaiState(enum.Enum): class MaiStateInfo: def __init__(self): - # 使用枚举类型初始化状态,默认为不在线 + # 使用枚举类型初始化状态,默认为正常聊天 self.mai_status: MaiState = MaiState.OFFLINE self.mai_status_history = [] # 历史状态,包含 状态,最后时间 self.last_status_change_time: float = time.time() # 新增:状态最后改变时间 @@ -157,7 +156,6 @@ class Heartflow: self.tool_user_instance = ToolUser() self.emoji_manager_instance = emoji_manager # Module instance self.relationship_manager_instance = relationship_manager # Module instance - self.message_manager_instance = MessageManager() # Instantiate the message manager # --- End moved dependencies --- # --- Background Task Management --- @@ -463,7 +461,13 @@ class Heartflow: else: logger.warning("[Heartflow] 跳过创建状态更新任务: 任务已在运行或存在。") - + # --- 新增:在启动时根据初始状态激活子心流 --- + if self.current_state.mai_status != MaiState.OFFLINE: + logger.info(f"[Heartflow] 初始状态为 {self.current_state.mai_status.value},执行初始子心流激活检查。") + # 使用 create_task 确保它不会阻塞 heartflow_start_working 的完成 + # 传递当前状态给激活函数,以便它知道激活的限制 + asyncio.create_task(self._activate_random_subflows_to_chat(self.current_state.mai_status)) + # --- 结束新增逻辑 --- @staticmethod async def _update_current_state(): @@ -646,12 +650,24 @@ class Heartflow: return list(self._subheartflows.keys()) async def _stop_subheartflow(self, subheartflow_id: Any, reason: str): - """停止并移除指定的子心流""" + """停止并移除指定的子心流,确保 HeartFChatting 被关闭""" if subheartflow_id in self._subheartflows: subheartflow = self._subheartflows[subheartflow_id] stream_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id logger.info(f"[Heartflow Limits] 停止子心流 {stream_name}. 原因: {reason}") + # --- 新增:在取消任务和删除前,先设置状态为 ABSENT 以关闭 HeartFChatting --- + try: + if subheartflow.chat_state.chat_status != ChatState.ABSENT: + logger.debug(f"[Heartflow Limits] 将子心流 {stream_name} 状态设置为 ABSENT 以确保资源释放...") + await subheartflow.set_chat_state(ChatState.ABSENT) # 调用异步方法 + else: + logger.debug(f"[Heartflow Limits] 子心流 {stream_name} 已经是 ABSENT 状态。") + except Exception as e: + logger.error(f"[Heartflow Limits] 在停止子心流 {stream_name} 时设置状态为 ABSENT 出错: {e}") + # 即使出错,仍继续尝试停止任务和移除 + # --- 结束新增逻辑 --- + # 标记停止并取消任务 subheartflow.should_stop = True task_to_cancel = subheartflow.task diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index 9d86561b2..fce62399d 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -16,11 +16,13 @@ from ..plugins.utils.prompt_builder import Prompt, global_prompt_manager from src.plugins.chat.message import MessageRecv from src.plugins.chat.chat_stream import chat_manager import math +from src.plugins.heartFC_chat.heartFC_chat import HeartFChatting # Type hinting for circular dependency if TYPE_CHECKING: from .heartflow import Heartflow, MaiState # Import Heartflow for type hinting from .sub_heartflow import ChatState # Keep ChatState here too? + from src.plugins.heartFC_chat.heartFC_chat import HeartFChatting # <-- Add for type hint # 定义常量 (从 interest.py 移动过来) MAX_INTEREST = 15.0 @@ -230,15 +232,38 @@ class InterestChatting: class SubHeartflow: def __init__(self, subheartflow_id, parent_heartflow: 'Heartflow'): + """子心流初始化函数 + + Args: + subheartflow_id: 子心流唯一标识符 + parent_heartflow: 父级心流实例 + """ + # 基础属性 self.subheartflow_id = subheartflow_id self.parent_heartflow = parent_heartflow - - self.current_mind = "你什么也没想" - self.past_mind = [] - self.chat_state: ChatStateInfo = ChatStateInfo() - - self.interest_chatting = InterestChatting(state_change_callback=self.set_chat_state) - + self.bot_name = global_config.BOT_NICKNAME # 机器人昵称 + + # 思维状态相关 + self.current_mind = "你什么也没想" # 当前想法 + self.past_mind = [] # 历史想法记录 + self.main_heartflow_info = "" # 主心流信息 + + # 聊天状态管理 + self.chat_state: ChatStateInfo = ChatStateInfo() # 聊天状态信息 + self.interest_chatting = InterestChatting(state_change_callback=self.set_chat_state) # 兴趣聊天系统 + + # 活动状态管理 + self.last_active_time = time.time() # 最后活跃时间 + self.is_active = False # 是否活跃标志 + self.should_stop = False # 停止标志 + self.task: Optional[asyncio.Task] = None # 后台任务 + self.heart_fc_instance: Optional['HeartFChatting'] = None # <-- Add instance variable + + # 观察和知识系统 + self.observations: List[ChattingObservation] = [] # 观察列表 + self.running_knowledges = [] # 运行中的知识 + + # LLM模型配置 self.llm_model = LLMRequest( model=global_config.llm_sub_heartflow, temperature=global_config.llm_sub_heartflow["temp"], @@ -246,33 +271,19 @@ class SubHeartflow: request_type="sub_heart_flow", ) - self.main_heartflow_info = "" - - self.last_active_time = time.time() - self.should_stop = False - self.task: Optional[asyncio.Task] = None - - self.is_active = False - - self.observations: List[ChattingObservation] = [] - - self.running_knowledges = [] - - self.bot_name = global_config.BOT_NICKNAME - logger.info(f"SubHeartflow {self.subheartflow_id} created with initial state: {self.chat_state.chat_status.value}") - - def set_chat_state(self, new_state: 'ChatState'): - """更新sub_heartflow的聊天状态""" + async def set_chat_state(self, new_state: 'ChatState'): + """更新sub_heartflow的聊天状态,并管理 HeartFChatting 实例""" current_state = self.chat_state.chat_status if current_state == new_state: + logger.trace(f"[{self.subheartflow_id}] State already {current_state.value}, no change.") return # No change needed log_prefix = f"[{chat_manager.get_stream_name(self.subheartflow_id) or self.subheartflow_id}]" + current_mai_state = self.parent_heartflow.current_state.mai_status - # --- Limit Check before entering CHAT state --- # + # --- Entering CHAT state --- if new_state == ChatState.CHAT: - current_mai_state = self.parent_heartflow.current_state.mai_status normal_limit = current_mai_state.get_normal_chat_max_num() current_chat_count = self.parent_heartflow.count_subflows_by_state(ChatState.CHAT) @@ -281,15 +292,61 @@ class SubHeartflow: return # Block the state transition else: logger.debug(f"{log_prefix} 允许从 {current_state.value} 转换到 CHAT (上限: {normal_limit}, 当前: {current_chat_count})" ) + # If transitioning out of FOCUSED, shut down HeartFChatting first + if current_state == ChatState.FOCUSED and self.heart_fc_instance: + logger.info(f"{log_prefix} 从 FOCUSED 转换到 CHAT,正在关闭 HeartFChatting...") + await self.heart_fc_instance.shutdown() + self.heart_fc_instance = None - # 如果检查通过或目标状态不是CHAT,则进行状态变更 + # --- Entering FOCUSED state --- + elif new_state == ChatState.FOCUSED: + focused_limit = current_mai_state.get_focused_chat_max_num() + current_focused_count = self.parent_heartflow.count_subflows_by_state(ChatState.FOCUSED) + + if current_focused_count >= focused_limit: + logger.debug(f"{log_prefix} 拒绝从 {current_state.value} 转换到 FOCUSED。原因:FOCUSED 状态已达上限 ({focused_limit})。当前数量: {current_focused_count}") + return # Block the state transition + else: + logger.debug(f"{log_prefix} 允许从 {current_state.value} 转换到 FOCUSED (上限: {focused_limit}, 当前: {current_focused_count})" ) + if not self.heart_fc_instance: + logger.info(f"{log_prefix} 状态转为 FOCUSED,创建并初始化 HeartFChatting 实例...") + try: + self.heart_fc_instance = HeartFChatting( + chat_id=self.subheartflow_id, + gpt_instance=self.parent_heartflow.gpt_instance, + tool_user_instance=self.parent_heartflow.tool_user_instance, + emoji_manager_instance=self.parent_heartflow.emoji_manager_instance, + ) + # Initialize and potentially start the loop via add_time + if await self.heart_fc_instance._initialize(): + # Give it an initial time boost to start the loop + await self.heart_fc_instance.add_time() + logger.info(f"{log_prefix} HeartFChatting 实例已创建并启动。") + else: + logger.error(f"{log_prefix} HeartFChatting 实例初始化失败,状态回滚到 {current_state.value}") + self.heart_fc_instance = None + return # Prevent state change if HeartFChatting fails to init + except Exception as e: + logger.error(f"{log_prefix} 创建 HeartFChatting 实例时出错: {e}") + logger.error(traceback.format_exc()) + self.heart_fc_instance = None + return # Prevent state change on error + + else: + logger.warning(f"{log_prefix} 尝试进入 FOCUSED 状态,但 HeartFChatting 实例已存在。") + + # --- Entering ABSENT state (or any state other than FOCUSED) --- + elif current_state == ChatState.FOCUSED and self.heart_fc_instance: + logger.info(f"{log_prefix} 从 FOCUSED 转换到 {new_state.value},正在关闭 HeartFChatting...") + await self.heart_fc_instance.shutdown() + self.heart_fc_instance = None + + + # --- Update state and timestamp if transition is allowed --- # 更新状态必须放在所有检查和操作之后 self.chat_state.chat_status = new_state - # 状态变更时更新最后活跃时间 - self.last_active_time = time.time() + self.last_active_time = time.time() logger.info(f"{log_prefix} 聊天状态从 {current_state.value} 变更为 {new_state.value}") - # TODO: 考虑从FOCUSED状态转出时是否需要停止HeartFChatting - # 这部分逻辑可能更适合放在Heartflow的_stop_subheartflow或HeartFCController的循环中处理 async def subheartflow_start_working(self): while True: @@ -297,7 +354,8 @@ class SubHeartflow: logger.info(f"子心流 {self.subheartflow_id} 被标记为停止,正在退出后台任务...") break - await asyncio.sleep(global_config.sub_heart_flow_update_interval) + # await asyncio.sleep(global_config.sub_heart_flow_update_interval) + await asyncio.sleep(10) async def ensure_observed(self): observation = self._get_primary_observation() diff --git a/src/main.py b/src/main.py index ae851551a..75ab2ae72 100644 --- a/src/main.py +++ b/src/main.py @@ -9,7 +9,7 @@ from .plugins.willing.willing_manager import willing_manager from .plugins.chat.chat_stream import chat_manager from .heart_flow.heartflow import heartflow from .plugins.memory_system.Hippocampus import HippocampusManager -from .plugins.chat.messagesender import message_manager +from .plugins.chat.message_sender import message_manager from .plugins.storage.storage import MessageStorage from .config.config import global_config from .plugins.chat.bot import chat_bot @@ -101,9 +101,9 @@ class MainSystem: logger.success("个体特征初始化成功") try: - # 启动 Heartflow 的 MessageManager (负责消息发送/排队) - await heartflow.message_manager_instance.start() - logger.success("心流消息管理器启动成功") + # 启动全局消息管理器 (负责消息发送/排队) + await message_manager.start() + logger.success("全局消息管理器启动成功") # 启动心流系统主循环 asyncio.create_task(heartflow.heartflow_start_working()) diff --git a/src/plugins/chat/__init__.py b/src/plugins/chat/__init__.py index a68caaf1c..8d9aa1f8e 100644 --- a/src/plugins/chat/__init__.py +++ b/src/plugins/chat/__init__.py @@ -1,7 +1,7 @@ from .emoji_manager import emoji_manager from ..person_info.relationship_manager import relationship_manager from .chat_stream import chat_manager -from .messagesender import message_manager +from .message_sender import message_manager from ..storage.storage import MessageStorage diff --git a/src/plugins/chat/message.py b/src/plugins/chat/message.py index b7afa8179..2ba645f95 100644 --- a/src/plugins/chat/message.py +++ b/src/plugins/chat/message.py @@ -290,6 +290,7 @@ class MessageSending(MessageProcessBase): is_head: bool = False, is_emoji: bool = False, thinking_start_time: float = 0, + apply_set_reply_logic: bool = False, ): # 调用父类初始化 super().__init__( @@ -306,6 +307,7 @@ class MessageSending(MessageProcessBase): self.reply_to_message_id = reply.message_info.message_id if reply else None self.is_head = is_head self.is_emoji = is_emoji + self.apply_set_reply_logic = apply_set_reply_logic def set_reply(self, reply: Optional["MessageRecv"] = None) -> None: """设置回复消息""" diff --git a/src/plugins/chat/message_sender.py b/src/plugins/chat/message_sender.py new file mode 100644 index 000000000..23c08a0fc --- /dev/null +++ b/src/plugins/chat/message_sender.py @@ -0,0 +1,343 @@ +# src/plugins/chat/message_sender.py +import asyncio +import time +from typing import Dict, List, Optional, Union + +from src.common.logger import get_module_logger +# from ...common.database import db # 数据库依赖似乎不需要了,注释掉 +from ..message.api import global_api +from .message import MessageSending, MessageThinking, MessageSet + +from ..storage.storage import MessageStorage +from ...config.config import global_config +from .utils import truncate_message, calculate_typing_time, count_messages_between + +from src.common.logger import LogConfig, SENDER_STYLE_CONFIG + +# 定义日志配置 +sender_config = LogConfig( + # 使用消息发送专用样式 + console_format=SENDER_STYLE_CONFIG["console_format"], + file_format=SENDER_STYLE_CONFIG["file_format"], +) + +logger = get_module_logger("msg_sender", config=sender_config) + + +class MessageSender: + """发送器 (不再是单例)""" + + def __init__(self): + self.message_interval = (0.5, 1) # 消息间隔时间范围(秒) + self.last_send_time = 0 + self._current_bot = None + + def set_bot(self, bot): + """设置当前bot实例""" + pass + + async def send_via_ws(self, message: MessageSending) -> None: + """通过 WebSocket 发送消息""" + try: + await global_api.send_message(message) + except Exception as e: + logger.error(f"WS发送失败: {e}") + raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e + + async def send_message( + self, + message: MessageSending, + ) -> None: + """发送消息(核心发送逻辑)""" + + # --- 添加计算打字和延迟的逻辑 (从 heartflow_message_sender 移动并调整) --- + typing_time = calculate_typing_time( + input_string=message.processed_plain_text, + thinking_start_time=message.thinking_start_time, + is_emoji=message.is_emoji, + ) + # logger.trace(f"{message.processed_plain_text},{typing_time},计算输入时间结束") # 减少日志 + await asyncio.sleep(typing_time) + # logger.trace(f"{message.processed_plain_text},{typing_time},等待输入时间结束") # 减少日志 + # --- 结束打字延迟 --- + + message_json = message.to_dict() + message_preview = truncate_message(message.processed_plain_text) + + try: + end_point = global_config.api_urls.get(message.message_info.platform, None) + if end_point: + try: + await global_api.send_message_rest(end_point, message_json) + except Exception as e: + logger.error(f"REST发送失败: {str(e)}") + logger.info(f"[{message.chat_stream.stream_id}] 尝试使用WS发送") + await self.send_via_ws(message) + else: + await self.send_via_ws(message) + logger.success(f"发送消息 '{message_preview}' 成功") # 调整日志格式 + except Exception as e: + logger.error(f"发送消息 '{message_preview}' 失败: {str(e)}") + + +class MessageContainer: + """单个聊天流的发送/思考消息容器""" + + def __init__(self, chat_id: str, max_size: int = 100): + self.chat_id = chat_id + self.max_size = max_size + self.messages: List[Union[MessageThinking, MessageSending]] = [] # 明确类型 + self.last_send_time = 0 + self.thinking_wait_timeout = 20 # 思考等待超时时间(秒) - 从旧 sender 合并 + + def count_thinking_messages(self) -> int: + """计算当前容器中思考消息的数量""" + return sum(1 for msg in self.messages if isinstance(msg, MessageThinking)) + + def get_timeout_sending_messages(self) -> List[MessageSending]: + """获取所有超时的MessageSending对象(思考时间超过20秒),按thinking_start_time排序 - 从旧 sender 合并""" + current_time = time.time() + timeout_messages = [] + + for msg in self.messages: + # 只检查 MessageSending 类型 + if isinstance(msg, MessageSending): + # 确保 thinking_start_time 有效 + if msg.thinking_start_time and current_time - msg.thinking_start_time > self.thinking_wait_timeout: + timeout_messages.append(msg) + + # 按thinking_start_time排序,时间早的在前面 + timeout_messages.sort(key=lambda x: x.thinking_start_time) + return timeout_messages + + def get_earliest_message(self) -> Optional[Union[MessageThinking, MessageSending]]: + """获取thinking_start_time最早的消息对象""" + if not self.messages: + return None + earliest_time = float("inf") + earliest_message = None + for msg in self.messages: + # 确保消息有 thinking_start_time 属性 + msg_time = getattr(msg, 'thinking_start_time', float('inf')) + if msg_time < earliest_time: + earliest_time = msg_time + earliest_message = msg + return earliest_message + + def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]) -> None: + """添加消息到队列""" + if isinstance(message, MessageSet): + for single_message in message.messages: + self.messages.append(single_message) + else: + self.messages.append(message) + + def remove_message(self, message_to_remove: Union[MessageThinking, MessageSending]) -> bool: + """移除指定的消息对象,如果消息存在则返回True,否则返回False""" + try: + initial_len = len(self.messages) + # 使用列表推导式或 filter 创建新列表,排除要删除的元素 + # self.messages = [msg for msg in self.messages if msg is not message_to_remove] + # 或者直接 remove (如果确定对象唯一性) + if message_to_remove in self.messages: + self.messages.remove(message_to_remove) + return True + # logger.debug(f"Removed message {getattr(message_to_remove, 'message_info', {}).get('message_id', 'UNKNOWN')}. Old len: {initial_len}, New len: {len(self.messages)}") + # return len(self.messages) < initial_len + return False + + except Exception as e: + logger.exception(f"移除消息时发生错误: {e}") + return False + + def has_messages(self) -> bool: + """检查是否有待发送的消息""" + return bool(self.messages) + + def get_all_messages(self) -> List[Union[MessageSending, MessageThinking]]: + """获取所有消息""" + return list(self.messages) # 返回副本 + + +class MessageManager: + """管理所有聊天流的消息容器 (不再是单例)""" + + def __init__(self): + self.containers: Dict[str, MessageContainer] = {} + self.storage = MessageStorage() # 添加 storage 实例 + self._running = True # 处理器运行状态 + self._container_lock = asyncio.Lock() # 保护 containers 字典的锁 + # self.message_sender = MessageSender() # 创建发送器实例 (改为全局实例) + + async def start(self): + """启动后台处理器任务。""" + # 检查是否已有任务在运行,避免重复启动 + if hasattr(self, '_processor_task') and not self._processor_task.done(): + logger.warning("Processor task already running.") + return + self._processor_task = asyncio.create_task(self._start_processor_loop()) + logger.info("MessageManager processor task started.") + + def stop(self): + """停止后台处理器任务。""" + self._running = False + if hasattr(self, '_processor_task') and not self._processor_task.done(): + self._processor_task.cancel() + logger.info("MessageManager processor task stopping.") + else: + logger.info("MessageManager processor task not running or already stopped.") + + + async def get_container(self, chat_id: str) -> MessageContainer: + """获取或创建聊天流的消息容器 (异步,使用锁)""" + async with self._container_lock: + if chat_id not in self.containers: + self.containers[chat_id] = MessageContainer(chat_id) + return self.containers[chat_id] + + async def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]) -> None: + """添加消息到对应容器""" + chat_stream = message.chat_stream + if not chat_stream: + logger.error("消息缺少 chat_stream,无法添加到容器") + return # 或者抛出异常 + container = await self.get_container(chat_stream.stream_id) + container.add_message(message) + + def check_if_sending_message_exist(self, chat_id, thinking_id): + """检查指定聊天流的容器中是否存在具有特定 thinking_id 的 MessageSending 消息 或 emoji 消息""" + # 这个方法现在是非异步的,因为它只读取数据 + container = self.containers.get(chat_id) # 直接 get,因为读取不需要锁 + if container and container.has_messages(): + for message in container.get_all_messages(): + if isinstance(message, MessageSending): + msg_id = getattr(message.message_info, 'message_id', None) + # 检查 message_id 是否匹配 thinking_id 或以 "me" 开头 (emoji) + if msg_id == thinking_id or (msg_id and msg_id.startswith("me")): + # logger.debug(f"检查到存在相同thinking_id或emoji的消息: {msg_id} for {thinking_id}") + return True + return False + + async def _handle_sending_message(self, container: MessageContainer, message: MessageSending): + """处理单个 MessageSending 消息 (包含 set_reply 逻辑)""" + try: + _ = message.update_thinking_time() # 更新思考时间 + thinking_start_time = message.thinking_start_time + now_time = time.time() + thinking_messages_count, thinking_messages_length = count_messages_between( + start_time=thinking_start_time, end_time=now_time, stream_id=message.chat_stream.stream_id + ) + + # --- 条件应用 set_reply 逻辑 --- + if ( + message.apply_set_reply_logic # 检查标记 + and message.is_head + and (thinking_messages_count > 4 or thinking_messages_length > 250) + and not message.is_private_message() + ): + logger.debug(f"[{message.chat_stream.stream_id}] 应用 set_reply 逻辑: {message.processed_plain_text[:20]}...") + message.set_reply() + # --- 结束条件 set_reply --- + + await message.process() # 预处理消息内容 + + # 使用全局 message_sender 实例 + await message_sender.send_message(message) + await self.storage.store_message(message, message.chat_stream) + + # 移除消息要在发送 *之后* + container.remove_message(message) + # logger.debug(f"[{message.chat_stream.stream_id}] Sent and removed message: {message.message_info.message_id}") + + except Exception as e: + logger.error(f"[{message.chat_stream.stream_id}] 处理发送消息 {getattr(message.message_info, 'message_id', 'N/A')} 时出错: {e}") + logger.exception("详细错误信息:") + # 考虑是否移除出错的消息,防止无限循环 + removed = container.remove_message(message) + if removed: + logger.warning(f"[{message.chat_stream.stream_id}] 已移除处理出错的消息。") + + + async def _process_chat_messages(self, chat_id: str): + """处理单个聊天流消息 (合并后的逻辑)""" + container = await self.get_container(chat_id) # 获取容器是异步的了 + + if container.has_messages(): + message_earliest = container.get_earliest_message() + + if not message_earliest: # 如果最早消息为空,则退出 + return + + if isinstance(message_earliest, MessageThinking): + # --- 处理思考消息 (来自旧 sender) --- + message_earliest.update_thinking_time() + thinking_time = message_earliest.thinking_time + # 减少控制台刷新频率或只在时间显著变化时打印 + if int(thinking_time) % 5 == 0: # 每5秒打印一次 + print( + f"消息 {message_earliest.message_info.message_id} 正在思考中,已思考 {int(thinking_time)} 秒\r", + end="", + flush=True, + ) + + # 检查是否超时 + if thinking_time > global_config.thinking_timeout: + logger.warning(f"[{chat_id}] 消息思考超时 ({thinking_time:.1f}秒),移除消息 {message_earliest.message_info.message_id}") + container.remove_message(message_earliest) + print() # 超时后换行,避免覆盖下一条日志 + + elif isinstance(message_earliest, MessageSending): + # --- 处理发送消息 --- + await self._handle_sending_message(container, message_earliest) + + # --- 处理超时发送消息 (来自旧 sender) --- + # 在处理完最早的消息后,检查是否有超时的发送消息 + timeout_sending_messages = container.get_timeout_sending_messages() + if timeout_sending_messages: + logger.debug(f"[{chat_id}] 发现 {len(timeout_sending_messages)} 条超时的发送消息") + for msg in timeout_sending_messages: + # 确保不是刚刚处理过的最早消息 (虽然理论上应该已被移除,但以防万一) + if msg is message_earliest: + continue + logger.info(f"[{chat_id}] 处理超时发送消息: {msg.message_info.message_id}") + await self._handle_sending_message(container, msg) # 复用处理逻辑 + + # 清理空容器 (可选) + # async with self._container_lock: + # if not container.has_messages() and chat_id in self.containers: + # logger.debug(f"[{chat_id}] 容器已空,准备移除。") + # del self.containers[chat_id] + + + async def _start_processor_loop(self): + """消息处理器主循环""" + while self._running: + tasks = [] + # 使用异步锁保护迭代器创建过程 + async with self._container_lock: + # 创建 keys 的快照以安全迭代 + chat_ids = list(self.containers.keys()) + + for chat_id in chat_ids: + # 为每个 chat_id 创建一个处理任务 + tasks.append(asyncio.create_task(self._process_chat_messages(chat_id))) + + if tasks: + try: + # 等待当前批次的所有任务完成 + await asyncio.gather(*tasks) + except Exception as e: + logger.error(f"消息处理循环 gather 出错: {e}") + + # 等待一小段时间,避免CPU空转 + try: + await asyncio.sleep(0.1) # 稍微降低轮询频率 + except asyncio.CancelledError: + logger.info("Processor loop sleep cancelled.") + break # 退出循环 + logger.info("MessageManager processor loop finished.") + +# --- 创建全局实例 --- +message_manager = MessageManager() +message_sender = MessageSender() +# --- 结束全局实例 --- \ No newline at end of file diff --git a/src/plugins/chat/messagesender.py b/src/plugins/chat/messagesender.py deleted file mode 100644 index 376a167e1..000000000 --- a/src/plugins/chat/messagesender.py +++ /dev/null @@ -1,291 +0,0 @@ -import asyncio -import time -from typing import Dict, List, Optional, Union - -from src.common.logger import get_module_logger -from ...common.database import db -from ..message.api import global_api -from .message import MessageSending, MessageThinking, MessageSet - -from ..storage.storage import MessageStorage -from ...config.config import global_config -from .utils import truncate_message, calculate_typing_time, count_messages_between - -from src.common.logger import LogConfig, SENDER_STYLE_CONFIG - -# 定义日志配置 -sender_config = LogConfig( - # 使用消息发送专用样式 - console_format=SENDER_STYLE_CONFIG["console_format"], - file_format=SENDER_STYLE_CONFIG["file_format"], -) - -logger = get_module_logger("msg_sender", config=sender_config) - - -class MessageSender: - """发送器""" - - def __init__(self): - self.message_interval = (0.5, 1) # 消息间隔时间范围(秒) - self.last_send_time = 0 - self._current_bot = None - - def set_bot(self, bot): - """设置当前bot实例""" - pass - - @staticmethod - def get_recalled_messages(stream_id: str) -> list: - """获取所有撤回的消息""" - recalled_messages = [] - - recalled_messages = list(db.recalled_messages.find({"stream_id": stream_id}, {"message_id": 1})) - # 按thinking_start_time排序,时间早的在前面 - return recalled_messages - - @staticmethod - async def send_via_ws(message: MessageSending) -> None: - try: - await global_api.send_message(message) - except Exception as e: - raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e - - async def send_message( - self, - message: MessageSending, - ) -> None: - """发送消息""" - - if isinstance(message, MessageSending): - recalled_messages = self.get_recalled_messages(message.chat_stream.stream_id) - is_recalled = False - for recalled_message in recalled_messages: - if message.reply_to_message_id == recalled_message["message_id"]: - is_recalled = True - logger.warning(f"消息“{message.processed_plain_text}”已被撤回,不发送") - break - if not is_recalled: - # print(message.processed_plain_text + str(message.is_emoji)) - typing_time = calculate_typing_time( - input_string=message.processed_plain_text, - thinking_start_time=message.thinking_start_time, - is_emoji=message.is_emoji, - ) - logger.trace(f"{message.processed_plain_text},{typing_time},计算输入时间结束") - await asyncio.sleep(typing_time) - logger.trace(f"{message.processed_plain_text},{typing_time},等待输入时间结束") - - message_json = message.to_dict() - - message_preview = truncate_message(message.processed_plain_text) - try: - end_point = global_config.api_urls.get(message.message_info.platform, None) - if end_point: - # logger.info(f"发送消息到{end_point}") - # logger.info(message_json) - try: - await global_api.send_message_rest(end_point, message_json) - except Exception as e: - logger.error(f"REST方式发送失败,出现错误: {str(e)}") - logger.info("尝试使用ws发送") - await self.send_via_ws(message) - else: - await self.send_via_ws(message) - logger.success(f"发送消息“{message_preview}”成功") - except Exception as e: - logger.error(f"发送消息“{message_preview}”失败: {str(e)}") - - -class MessageContainer: - """单个聊天流的发送/思考消息容器""" - - def __init__(self, chat_id: str, max_size: int = 100): - self.chat_id = chat_id - self.max_size = max_size - self.messages = [] - self.last_send_time = 0 - self.thinking_wait_timeout = 20 # 思考等待超时时间(秒) - - def get_timeout_messages(self) -> List[MessageSending]: - """获取所有超时的Message_Sending对象(思考时间超过20秒),按thinking_start_time排序""" - current_time = time.time() - timeout_messages = [] - - for msg in self.messages: - if isinstance(msg, MessageSending): - if current_time - msg.thinking_start_time > self.thinking_wait_timeout: - timeout_messages.append(msg) - - # 按thinking_start_time排序,时间早的在前面 - timeout_messages.sort(key=lambda x: x.thinking_start_time) - - return timeout_messages - - def get_earliest_message(self) -> Optional[Union[MessageThinking, MessageSending]]: - """获取thinking_start_time最早的消息对象""" - if not self.messages: - return None - earliest_time = float("inf") - earliest_message = None - for msg in self.messages: - msg_time = msg.thinking_start_time - if msg_time < earliest_time: - earliest_time = msg_time - earliest_message = msg - return earliest_message - - def add_message(self, message: Union[MessageThinking, MessageSending]) -> None: - """添加消息到队列""" - if isinstance(message, MessageSet): - for single_message in message.messages: - self.messages.append(single_message) - else: - self.messages.append(message) - - def remove_message(self, message: Union[MessageThinking, MessageSending]) -> bool: - """移除消息,如果消息存在则返回True,否则返回False""" - try: - if message in self.messages: - self.messages.remove(message) - return True - return False - except Exception: - logger.exception("移除消息时发生错误") - return False - - def has_messages(self) -> bool: - """检查是否有待发送的消息""" - return bool(self.messages) - - def get_all_messages(self) -> List[Union[MessageSending, MessageThinking]]: - """获取所有消息""" - return list(self.messages) - - -class MessageManager: - """管理所有聊天流的消息容器""" - - def __init__(self): - self.containers: Dict[str, MessageContainer] = {} # chat_id -> MessageContainer - self.storage = MessageStorage() - self._running = True - - def get_container(self, chat_id: str) -> MessageContainer: - """获取或创建聊天流的消息容器""" - if chat_id not in self.containers: - self.containers[chat_id] = MessageContainer(chat_id) - return self.containers[chat_id] - - def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]) -> None: - chat_stream = message.chat_stream - if not chat_stream: - raise ValueError("无法找到对应的聊天流") - container = self.get_container(chat_stream.stream_id) - container.add_message(message) - - async def process_chat_messages(self, chat_id: str): - """处理聊天流消息""" - container = self.get_container(chat_id) - if container.has_messages(): - # print(f"处理有message的容器chat_id: {chat_id}") - message_earliest = container.get_earliest_message() - - if isinstance(message_earliest, MessageThinking): - """取得了思考消息""" - message_earliest.update_thinking_time() - thinking_time = message_earliest.thinking_time - # print(thinking_time) - print( - f"消息正在思考中,已思考{int(thinking_time)}秒\r", - end="", - flush=True, - ) - - # 检查是否超时 - if thinking_time > global_config.thinking_timeout: - logger.warning(f"消息思考超时({thinking_time}秒),移除该消息") - container.remove_message(message_earliest) - - else: - """取得了发送消息""" - thinking_time = message_earliest.update_thinking_time() - thinking_start_time = message_earliest.thinking_start_time - now_time = time.time() - thinking_messages_count, thinking_messages_length = count_messages_between( - start_time=thinking_start_time, end_time=now_time, stream_id=message_earliest.chat_stream.stream_id - ) - # print(thinking_time) - # print(thinking_messages_count) - # print(thinking_messages_length) - - if ( - message_earliest.is_head - and (thinking_messages_count > 4 or thinking_messages_length > 250) - and not message_earliest.is_private_message() # 避免在私聊时插入reply - ): - logger.debug(f"设置回复消息{message_earliest.processed_plain_text}") - message_earliest.set_reply() - - await message_earliest.process() - - # print(f"message_earliest.thinking_start_tim22222e:{message_earliest.thinking_start_time}") - - await message_sender.send_message(message_earliest) - - await self.storage.store_message(message_earliest, message_earliest.chat_stream) - - container.remove_message(message_earliest) - - message_timeout = container.get_timeout_messages() - if message_timeout: - logger.debug(f"发现{len(message_timeout)}条超时消息") - for msg in message_timeout: - if msg == message_earliest: - continue - - try: - thinking_time = msg.update_thinking_time() - thinking_start_time = msg.thinking_start_time - now_time = time.time() - thinking_messages_count, thinking_messages_length = count_messages_between( - start_time=thinking_start_time, end_time=now_time, stream_id=msg.chat_stream.stream_id - ) - # print(thinking_time) - # print(thinking_messages_count) - # print(thinking_messages_length) - if ( - msg.is_head - and (thinking_messages_count > 4 or thinking_messages_length > 250) - and not msg.is_private_message() # 避免在私聊时插入reply - ): - logger.debug(f"设置回复消息{msg.processed_plain_text}") - msg.set_reply() - - await msg.process() - - await message_sender.send_message(msg) - - await self.storage.store_message(msg, msg.chat_stream) - - if not container.remove_message(msg): - logger.warning("尝试删除不存在的消息") - except Exception: - logger.exception("处理超时消息时发生错误") - continue - - async def start_processor(self): - """启动消息处理器""" - while self._running: - await asyncio.sleep(1) - tasks = [] - for chat_id in self.containers.keys(): - tasks.append(self.process_chat_messages(chat_id)) - - await asyncio.gather(*tasks) - - -# 创建全局消息管理器实例 -message_manager = MessageManager() -# 创建全局发送器实例 -message_sender = MessageSender() diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index 3fec14d36..e79ff7d82 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -7,7 +7,6 @@ from src.plugins.chat.message import MessageRecv, BaseMessageInfo, MessageThinki from src.plugins.chat.message import MessageSet, Seg # Local import needed after move from src.plugins.chat.chat_stream import ChatStream from src.plugins.chat.message import UserInfo -from src.heart_flow.heartflow import heartflow, SubHeartflow from src.plugins.chat.chat_stream import chat_manager from src.common.logger import get_module_logger, LogConfig, PFC_STYLE_CONFIG # 引入 DEFAULT_CONFIG from src.plugins.models.utils_model import LLMRequest @@ -18,7 +17,7 @@ from src.plugins.utils.timer_calculater import Timer # <--- Import Timer from .heartFC_generator import ResponseGenerator # Assuming this is the type for gpt from src.do_tool.tool_use import ToolUser from src.plugins.chat.emoji_manager import EmojiManager # Assuming this is the type -from .heartflow_message_sender import MessageManager # Assuming this is the type +from ..chat.message_sender import message_manager # <-- Import the global manager # --- End import --- @@ -37,7 +36,8 @@ logger = get_module_logger("HeartFCLoop", config=interest_log_config) # Logger if TYPE_CHECKING: # Keep this if HeartFCController methods are still needed elsewhere, # but the instance variable will be removed from HeartFChatting - from .heartFC_controler import HeartFCController + # from .heartFC_controler import HeartFCController + from src.heart_flow.heartflow import SubHeartflow, heartflow # <-- 同时导入 heartflow 实例用于类型检查 PLANNER_TOOL_DEFINITION = [ { @@ -76,54 +76,51 @@ class HeartFChatting: def __init__(self, chat_id: str, - # --- Explicit Dependencies --- - gpt_instance: ResponseGenerator, - tool_user_instance: ToolUser, - emoji_manager_instance: EmojiManager, - message_manager_instance: MessageManager - # --- End Explicit Dependencies --- + # 显式依赖注入 + gpt_instance: ResponseGenerator, # 文本回复生成器 + tool_user_instance: ToolUser, # 工具使用实例 + emoji_manager_instance: EmojiManager, # 表情管理实例 ): """ - 初始化HeartFChatting实例。 - - Args: - chat_id: The identifier for the chat stream (e.g., stream_id). - gpt_instance: The ResponseGenerator instance for generating text replies. - tool_user_instance: The ToolUser instance for using tools. - emoji_manager_instance: The EmojiManager instance for handling emojis. - message_manager_instance: The MessageManager instance for sending/managing messages. + HeartFChatting 初始化函数 + + 参数: + chat_id: 聊天流唯一标识符(如stream_id) + gpt_instance: 文本回复生成器实例 + tool_user_instance: 工具使用实例 + emoji_manager_instance: 表情管理实例 """ - self.stream_id: str = chat_id - self.chat_stream: Optional[ChatStream] = None - self.sub_hf: Optional[SubHeartflow] = None - self._initialized = False - self._init_lock = asyncio.Lock() # Ensure initialization happens only once - self._processing_lock = asyncio.Lock() # 确保只有一个 Plan-Replier-Sender 周期在运行 - self._timer_lock = asyncio.Lock() # 用于安全更新计时器 + # 基础属性 + self.stream_id: str = chat_id # 聊天流ID + self.chat_stream: Optional[ChatStream] = None # 关联的聊天流 + self.sub_hf: Optional[SubHeartflow] = None # 关联的子心流 + + # 初始化状态控制 + self._initialized = False # 是否已初始化标志 + self._init_lock = asyncio.Lock() # 初始化锁(确保只初始化一次) + self._processing_lock = asyncio.Lock() # 处理锁(确保单次Plan-Replier-Sender周期) + self._timer_lock = asyncio.Lock() # 计时器锁(安全更新计时器) - # --- Store Dependencies --- - self.gpt_instance = gpt_instance - self.tool_user = tool_user_instance - self.emoji_manager = emoji_manager_instance - self.message_manager = message_manager_instance - # --- End Store Dependencies --- + # 依赖注入存储 + self.gpt_instance = gpt_instance # 文本回复生成器 + self.tool_user = tool_user_instance # 工具使用实例 + self.emoji_manager = emoji_manager_instance # 表情管理实例 - - # Access LLM config through global_config or pass if needed + # LLM规划器配置 self.planner_llm = LLMRequest( model=global_config.llm_normal, temperature=global_config.llm_normal["temp"], max_tokens=1000, - request_type="action_planning", + request_type="action_planning", # 用于动作规划 ) - # Internal state for loop control - self._loop_timer: float = 0.0 # Remaining time for the loop in seconds - self._loop_active: bool = False # Is the loop currently running? - self._loop_task: Optional[asyncio.Task] = None # Stores the main loop task - self._trigger_count_this_activation: int = 0 # Counts triggers within an active period + # 循环控制内部状态 + self._loop_timer: float = 0.0 # 循环剩余时间(秒) + self._loop_active: bool = False # 循环是否正在运行 + self._loop_task: Optional[asyncio.Task] = None # 主循环任务 + self._trigger_count_this_activation: int = 0 # 当前激活周期内的触发计数 self._initial_duration: float = INITIAL_DURATION # 首次触发增加的时间 - self._last_added_duration: float = self._initial_duration # <--- 新增:存储上次增加的时间 + self._last_added_duration: float = self._initial_duration # 上次增加的时间 def _get_log_prefix(self) -> str: """获取日志前缀,包含可读的流名称""" @@ -146,6 +143,8 @@ class HeartFChatting: logger.error(f"{log_prefix} 获取ChatStream失败。") return False + # <-- 在这里导入 heartflow 实例 + from src.heart_flow.heartflow import heartflow self.sub_hf = heartflow.get_subheartflow(self.stream_id) if not self.sub_hf: logger.warning(f"{log_prefix} 获取SubHeartflow失败。一些功能可能受限。") @@ -245,7 +244,7 @@ class HeartFChatting: cycle_timers = {} # <--- Initialize timers dict for this cycle # Access MessageManager directly - if self.message_manager.check_if_sending_message_exist(self.stream_id, thinking_id): + if message_manager.check_if_sending_message_exist(self.stream_id, thinking_id): # logger.info(f"{log_prefix} HeartFChatting: 11111111111111111111111111111111麦麦还在发消息,等会再规划") await asyncio.sleep(1) continue @@ -318,7 +317,7 @@ class HeartFChatting: ) except Exception as e_replier: logger.error(f"{log_prefix} 循环: 回复器工作失败: {e_replier}") - self._cleanup_thinking_message(thinking_id) + # self._cleanup_thinking_message(thinking_id) <-- Remove cleanup call if replier_result: # --- Sender Work --- # @@ -334,10 +333,10 @@ class HeartFChatting: except Exception as e_sender: logger.error(f"{log_prefix} 循环: 发送器失败: {e_sender}") # _sender should handle cleanup, but double check - # self._cleanup_thinking_message(thinking_id) + # self._cleanup_thinking_message(thinking_id) <-- Remove cleanup call else: logger.warning(f"{log_prefix} 循环: 回复器未产生结果. 跳过发送.") - self._cleanup_thinking_message(thinking_id) + # self._cleanup_thinking_message(thinking_id) <-- Remove cleanup call elif action == "emoji_reply": logger.info( f"{log_prefix} HeartFChatting: 麦麦决定回复表情 ('{emoji_query}'). 理由: {reasoning}" @@ -652,16 +651,25 @@ class HeartFChatting: logger.error(traceback.format_exc()) return None - def _cleanup_thinking_message(self, thinking_id: str): - """Safely removes the thinking message.""" - log_prefix = self._get_log_prefix() - try: - # Access MessageManager directly - container = self.message_manager.get_container(self.stream_id) - container.remove_message(thinking_id, msg_type=MessageThinking) - logger.debug(f"{log_prefix} Cleaned up thinking message {thinking_id}.") - except Exception as e: - logger.error(f"{log_prefix} Error cleaning up thinking message {thinking_id}: {e}") + # def _cleanup_thinking_message(self, thinking_id: str): + # """Safely removes the thinking message.""" + # log_prefix = self._get_log_prefix() + # try: + # # Access MessageManager directly + # container = await message_manager.get_container(self.stream_id) + # # container.remove_message(thinking_id, msg_type=MessageThinking) # Need to find the message object first + # found_msg = None + # for msg in container.get_all_messages(): + # if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: + # found_msg = msg + # break + # if found_msg: + # container.remove_message(found_msg) + # logger.debug(f"{log_prefix} Cleaned up thinking message {thinking_id}.") + # else: + # logger.warning(f"{log_prefix} Could not find thinking message {thinking_id} to cleanup.") + # except Exception as e: + # logger.error(f"{log_prefix} Error cleaning up thinking message {thinking_id}: {e}") # --- 发送器 (Sender) --- # async def _sender( @@ -774,10 +782,10 @@ class HeartFChatting: # Ensure generate_response has access to current_mind if it's crucial context # Access gpt_instance directly response_set = await self.gpt_instance.generate_response( - self.sub_hf, - reason, - anchor_message, # Pass anchor_message positionally (matches 'message' parameter) - thinking_id, # Pass thinking_id positionally + current_mind_info=self.sub_hf.current_mind, + reason=reason, + message=anchor_message, # Pass anchor_message positionally (matches 'message' parameter) + thinking_id=thinking_id, # Pass thinking_id positionally ) if not response_set: @@ -818,7 +826,7 @@ class HeartFChatting: thinking_start_time=thinking_time_point, ) # Access MessageManager directly - self.message_manager.add_message(thinking_message) + await message_manager.add_message(thinking_message) return thinking_id async def _send_response_messages( @@ -831,7 +839,7 @@ class HeartFChatting: chat = anchor_message.chat_stream # Access MessageManager directly - container = self.message_manager.get_container(chat.stream_id) + container = await message_manager.get_container(chat.stream_id) thinking_message = None # 移除思考消息 @@ -875,7 +883,7 @@ class HeartFChatting: message_set.add_message(bot_message) # Access MessageManager directly - self.message_manager.add_message(message_set) + await message_manager.add_message(message_set) return first_bot_msg async def _handle_emoji(self, anchor_message: Optional[MessageRecv], response_set: List[str], send_emoji: str = ""): @@ -917,4 +925,4 @@ class HeartFChatting: is_emoji=True, ) # Access MessageManager directly - self.message_manager.add_message(bot_message) + await message_manager.add_message(bot_message) diff --git a/src/plugins/heartFC_chat/heartFC_generator.py b/src/plugins/heartFC_chat/heartFC_generator.py index 7e1a26b16..b05764fd2 100644 --- a/src/plugins/heartFC_chat/heartFC_generator.py +++ b/src/plugins/heartFC_chat/heartFC_generator.py @@ -11,7 +11,6 @@ from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from ..utils.timer_calculater import Timer from src.plugins.moods.moods import MoodManager -from src.heart_flow.sub_heartflow import SubHeartflow # 定义日志配置 llm_config = LogConfig( # 使用消息发送专用样式 @@ -39,7 +38,7 @@ class ResponseGenerator: async def generate_response( self, - sub_hf: SubHeartflow, + current_mind_info: str, reason: str, message: MessageRecv, thinking_id: str, @@ -56,7 +55,7 @@ class ResponseGenerator: current_model = self.model_normal current_model.temperature = global_config.llm_normal["temp"] * arousal_multiplier # 激活度越高,温度越高 model_response = await self._generate_response_with_model( - sub_hf, reason, message, current_model, thinking_id + current_mind_info, reason, message, current_model, thinking_id ) if model_response: @@ -71,7 +70,7 @@ class ResponseGenerator: return None async def _generate_response_with_model( - self, sub_hf: SubHeartflow, reason: str, message: MessageRecv, model: LLMRequest, thinking_id: str + self, current_mind_info: str, reason: str, message: MessageRecv, model: LLMRequest, thinking_id: str ) -> str: sender_name = "" @@ -84,9 +83,10 @@ class ResponseGenerator: prompt = await prompt_builder.build_prompt( build_mode="focus", reason=reason, + current_mind_info=current_mind_info, message_txt=message.processed_plain_text, sender_name=sender_name, - subheartflow=sub_hf + chat_stream=message.chat_stream ) logger.info(f"构建prompt时间: {t_build_prompt.human_readable}") diff --git a/src/plugins/heartFC_chat/heartflow_message_sender.py b/src/plugins/heartFC_chat/heartflow_message_sender.py deleted file mode 100644 index dd051da97..000000000 --- a/src/plugins/heartFC_chat/heartflow_message_sender.py +++ /dev/null @@ -1,241 +0,0 @@ -import asyncio -import time -from typing import Dict, List, Optional, Union - -from src.common.logger import get_module_logger -from ..message.api import global_api -from ..chat.message import MessageSending, MessageThinking, MessageSet -from ..storage.storage import MessageStorage -from ...config.config import global_config -from ..chat.utils import truncate_message, calculate_typing_time, count_messages_between - -from src.common.logger import LogConfig, SENDER_STYLE_CONFIG - -# 定义日志配置 -sender_config = LogConfig( - # 使用消息发送专用样式 - console_format=SENDER_STYLE_CONFIG["console_format"], - file_format=SENDER_STYLE_CONFIG["file_format"], -) - -logger = get_module_logger("msg_sender", config=sender_config) - - -class MessageSender: - """发送器""" - - _instance = None - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super(MessageSender, cls).__new__(cls, *args, **kwargs) - return cls._instance - - def __init__(self): - # 确保 __init__ 只被调用一次 - if not hasattr(self, "_initialized"): - self.message_interval = (0.5, 1) # 消息间隔时间范围(秒) - self.last_send_time = 0 - self._current_bot = None - self._initialized = True - - def set_bot(self, bot): - """设置当前bot实例""" - pass - - async def send_via_ws(self, message: MessageSending) -> None: - try: - await global_api.send_message(message) - except Exception as e: - raise ValueError(f"未找到平台:{message.message_info.platform} 的url配置,请检查配置文件") from e - - async def send_message( - self, - message: MessageSending, - ) -> None: - """发送消息""" - - message_json = message.to_dict() - - message_preview = truncate_message(message.processed_plain_text) - try: - end_point = global_config.api_urls.get(message.message_info.platform, None) - if end_point: - try: - await global_api.send_message_rest(end_point, message_json) - except Exception as e: - logger.error(f"REST方式发送失败,出现错误: {str(e)}") - logger.info("尝试使用ws发送") - await self.send_via_ws(message) - else: - await self.send_via_ws(message) - logger.success(f"发送消息 {message_preview} 成功") - except Exception as e: - logger.error(f"发送消息 {message_preview} 失败: {str(e)}") - - -class MessageContainer: - """单个聊天流的发送/思考消息容器""" - - def __init__(self, chat_id: str, max_size: int = 100): - self.chat_id = chat_id - self.max_size = max_size - self.messages = [] - self.last_send_time = 0 - - def count_thinking_messages(self) -> int: - """计算当前容器中思考消息的数量""" - return sum(1 for msg in self.messages if isinstance(msg, MessageThinking)) - - def get_earliest_message(self) -> Optional[Union[MessageThinking, MessageSending]]: - """获取thinking_start_time最早的消息对象""" - if not self.messages: - return None - earliest_time = float("inf") - earliest_message = None - for msg in self.messages: - msg_time = msg.thinking_start_time - if msg_time < earliest_time: - earliest_time = msg_time - earliest_message = msg - return earliest_message - - def add_message(self, message: Union[MessageThinking, MessageSending]) -> None: - """添加消息到队列""" - if isinstance(message, MessageSet): - for single_message in message.messages: - self.messages.append(single_message) - else: - self.messages.append(message) - - def remove_message(self, message: Union[MessageThinking, MessageSending]) -> bool: - """移除消息,如果消息存在则返回True,否则返回False""" - try: - if message in self.messages: - self.messages.remove(message) - return True - return False - except Exception: - logger.exception("移除消息时发生错误") - return False - - def has_messages(self) -> bool: - """检查是否有待发送的消息""" - return bool(self.messages) - - def get_all_messages(self) -> List[Union[MessageSending, MessageThinking]]: - """获取所有消息""" - return list(self.messages) - - -class MessageManager: - """管理所有聊天流的消息容器""" - - _instance = None - _lock = asyncio.Lock() - - def __init__(self): - if MessageManager._instance is not None: - raise Exception("This class is a singleton!") - else: - self.containers: Dict[str, MessageContainer] = {} - self._container_lock = asyncio.Lock() - self.running = True - MessageManager._instance = self - - async def start(self): - """Starts the background processor task.""" - asyncio.create_task(self.start_processor()) - logger.info("MessageManager processor task started.") - - def get_container(self, chat_id: str) -> MessageContainer: - """获取或创建聊天流的消息容器""" - if chat_id not in self.containers: - self.containers[chat_id] = MessageContainer(chat_id) - return self.containers[chat_id] - - def add_message(self, message: Union[MessageThinking, MessageSending, MessageSet]) -> None: - chat_stream = message.chat_stream - if not chat_stream: - raise ValueError("无法找到对应的聊天流") - container = self.get_container(chat_stream.stream_id) - container.add_message(message) - - def check_if_sending_message_exist(self, chat_id, thinking_id): - """检查指定聊天流的容器中是否存在具有特定 thinking_id 的 MessageSending 消息""" - container = self.get_container(chat_id) - if container.has_messages(): - for message in container.get_all_messages(): - # 首先确保是 MessageSending 类型 - if isinstance(message, MessageSending): - # 然后再访问 message_info.message_id - # 检查 message_id 是否匹配 thinking_id 或以 "me" 开头 - if message.message_info.message_id == thinking_id or message.message_info.message_id[:2] == "me": - # print(f"检查到存在相同thinking_id的消息: {message.message_info.message_id}???{thinking_id}") - - return True - return False - - async def process_chat_messages(self, chat_id: str): - """处理聊天流消息""" - container = self.get_container(chat_id) - if container.has_messages(): - # print(f"处理有message的容器chat_id: {chat_id}") - message_earliest = container.get_earliest_message() - - if isinstance(message_earliest, MessageThinking): - """取得了思考消息""" - message_earliest.update_thinking_time() - thinking_time = message_earliest.thinking_time - # print(thinking_time) - print( - f"消息正在思考中,已思考{int(thinking_time)}秒\r", - end="", - flush=True, - ) - - # 检查是否超时 - if thinking_time > global_config.thinking_timeout: - logger.warning(f"消息思考超时({thinking_time}秒),移除该消息") - container.remove_message(message_earliest) - - else: - """取得了发送消息""" - thinking_time = message_earliest.update_thinking_time() - thinking_start_time = message_earliest.thinking_start_time - now_time = time.time() - thinking_messages_count, thinking_messages_length = count_messages_between( - start_time=thinking_start_time, end_time=now_time, stream_id=message_earliest.chat_stream.stream_id - ) - - await message_earliest.process() - - # 获取 MessageSender 的单例实例并发送消息 - typing_time = calculate_typing_time( - input_string=message_earliest.processed_plain_text, - thinking_start_time=message_earliest.thinking_start_time, - is_emoji=message_earliest.is_emoji, - ) - await asyncio.sleep(typing_time) - - await MessageSender().send_message(message_earliest) - await self.storage.store_message(message_earliest, message_earliest.chat_stream) - - container.remove_message(message_earliest) - - async def start_processor(self): - """启动消息处理器""" - while self.running: - await asyncio.sleep(1) - tasks = [] - for chat_id in list(self.containers.keys()): # 使用 list 复制 key,防止在迭代时修改字典 - tasks.append(self.process_chat_messages(chat_id)) - - if tasks: # 仅在有任务时执行 gather - await asyncio.gather(*tasks) - - -# # 创建全局消息管理器实例 # 已改为单例模式 -# message_manager = MessageManager() -# # 创建全局发送器实例 # 已改为单例模式 -# message_sender = MessageSender() diff --git a/src/plugins/heartFC_chat/heartflow_prompt_builder.py b/src/plugins/heartFC_chat/heartflow_prompt_builder.py index 5647925e0..15623a494 100644 --- a/src/plugins/heartFC_chat/heartflow_prompt_builder.py +++ b/src/plugins/heartFC_chat/heartflow_prompt_builder.py @@ -1,6 +1,5 @@ import random from typing import Optional -from src.heart_flow.sub_heartflow import SubHeartflow from ...config.config import global_config from ..chat.chat_stream import chat_manager from src.common.logger import get_module_logger @@ -82,23 +81,20 @@ class PromptBuilder: async def build_prompt( - self, build_mode,reason, message_txt: str, sender_name: str = "某人",subheartflow: SubHeartflow =None + self, build_mode,reason,current_mind_info, message_txt: str, sender_name: str = "某人",chat_stream=None ) -> tuple[str, str]: - - chat_stream = chat_manager.get_stream(subheartflow.subheartflow_id) - + if build_mode == "normal": - return await self._build_prompt_normal(chat_stream, message_txt, sender_name, subheartflow) + return await self._build_prompt_normal(chat_stream, message_txt, sender_name) elif build_mode == "focus": - return await self._build_prompt_focus(reason, chat_stream, message_txt, sender_name, subheartflow) + return await self._build_prompt_focus(reason, current_mind_info, chat_stream, message_txt, sender_name) async def _build_prompt_focus( - self, reason, chat_stream, message_txt: str, sender_name: str = "某人", subheartflow: SubHeartflow =None + self, reason, current_mind_info, chat_stream, message_txt: str, sender_name: str = "某人" ) -> tuple[str, str]: - current_mind_info = subheartflow.current_mind individuality = Individuality.get_instance() prompt_personality = individuality.get_prompt(type="personality", x_person=2, level=1) @@ -107,7 +103,6 @@ class PromptBuilder: # 日程构建 # schedule_prompt = f'''你现在正在做的事情是:{bot_schedule.get_current_num_task(num = 1,time_info = False)}''' - chat_stream = chat_manager.get_stream(subheartflow.subheartflow_id) if chat_stream.group_info: chat_in_group = True else: @@ -186,7 +181,7 @@ class PromptBuilder: async def _build_prompt_normal( - self, chat_stream, message_txt: str, sender_name: str = "某人", subheartflow=None + self, chat_stream, message_txt: str, sender_name: str = "某人" ) -> tuple[str, str]: # 开始构建prompt prompt_personality = "你" @@ -209,7 +204,7 @@ class PromptBuilder: (chat_stream.user_info.platform, chat_stream.user_info.user_id, chat_stream.user_info.user_nickname) ] who_chat_in_group += get_recent_group_speaker( - subheartflow.subheartflow_id, + chat_stream.stream_id, (chat_stream.user_info.platform, chat_stream.user_info.user_id), limit=global_config.MAX_CONTEXT_SIZE, ) @@ -249,7 +244,6 @@ class PromptBuilder: # schedule_prompt = f"""你现在正在做的事情是:{bot_schedule.get_current_num_task(num=1, time_info=False)}""" # 获取聊天上下文 - chat_stream = chat_manager.get_stream(subheartflow.subheartflow_id) if chat_stream.group_info: chat_in_group = True else: diff --git a/src/plugins/heartFC_chat/normal_chat.py b/src/plugins/heartFC_chat/normal_chat.py index 9e1f39608..f5a9e4c3a 100644 --- a/src/plugins/heartFC_chat/normal_chat.py +++ b/src/plugins/heartFC_chat/normal_chat.py @@ -9,7 +9,7 @@ from ...config.config import global_config from ..chat.emoji_manager import emoji_manager from .normal_chat_generator import ResponseGenerator from ..chat.message import MessageSending, MessageRecv, MessageThinking, MessageSet -from ..chat.messagesender import message_manager +from ..chat.message_sender import message_manager from ..storage.storage import MessageStorage from ..chat.utils import is_mentioned_bot_in_message from ..chat.utils_image import image_path_to_base64 @@ -96,18 +96,18 @@ class NormalChat: @staticmethod async def _send_response_messages(message, chat, response_set: List[str], thinking_id) -> MessageSending: """发送回复消息""" - container = message_manager.get_container(chat.stream_id) + container = await message_manager.get_container(chat.stream_id) thinking_message = None - for msg in container.messages: + for msg in container.messages[:]: if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: thinking_message = msg container.messages.remove(msg) break if not thinking_message: - logger.warning("未找到对应的思考消息,可能已超时被移除") - return + logger.warning(f"[{chat.stream_id}] 未找到对应的思考消息 {thinking_id},可能已超时被移除") + return None thinking_start_time = thinking_message.thinking_start_time message_set = MessageSet(chat, thinking_id) @@ -130,12 +130,14 @@ class NormalChat: is_head=not mark_head, is_emoji=False, thinking_start_time=thinking_start_time, + apply_set_reply_logic=True ) if not mark_head: mark_head = True first_bot_msg = bot_message message_set.add_message(bot_message) - message_manager.add_message(message_set) + + await message_manager.add_message(message_set) return first_bot_msg @@ -164,8 +166,9 @@ class NormalChat: reply=message, is_head=False, is_emoji=True, + apply_set_reply_logic=True ) - message_manager.add_message(bot_message) + await message_manager.add_message(bot_message) async def _update_relationship(self, message: MessageRecv, response_set): """更新关系情绪""" @@ -328,12 +331,13 @@ class NormalChat: if not response_set: logger.info(f"[{chat.stream_id}] 模型未生成回复内容") # 如果模型未生成回复,移除思考消息 - container = message_manager.get_container(chat.stream_id) + container = await message_manager.get_container(chat.stream_id) # thinking_message = None for msg in container.messages[:]: # Iterate over a copy if isinstance(msg, MessageThinking) and msg.message_info.message_id == thinking_id: # thinking_message = msg container.messages.remove(msg) + # container.remove_message(msg) # 直接移除 logger.debug(f"[{chat.stream_id}] 已移除未产生回复的思考消息 {thinking_id}") break return # 不发送回复 diff --git a/src/plugins/heartFC_chat/normal_chat_generator.py b/src/plugins/heartFC_chat/normal_chat_generator.py index 18c550a13..2fb07d318 100644 --- a/src/plugins/heartFC_chat/normal_chat_generator.py +++ b/src/plugins/heartFC_chat/normal_chat_generator.py @@ -86,9 +86,10 @@ class ResponseGenerator: prompt = await prompt_builder.build_prompt( build_mode="normal", reason= "", + current_mind_info="", message_txt=message.processed_plain_text, sender_name=sender_name, - subheartflow=sub_hf, + chat_stream=message.chat_stream, ) logger.info(f"构建prompt时间: {t_build_prompt.human_readable}")