From bcf295905e50a4face6f7e46decfb83feb4a9a00 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Tue, 22 Apr 2025 23:16:57 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E6=9B=B4=E6=94=B9=E5=BF=83?= =?UTF-8?q?=E6=B5=81=E8=BF=90=E8=A1=8C=E7=BB=93=E6=9E=84=EF=BC=8C=E5=8F=8D?= =?UTF-8?q?=E6=AD=A3=E8=83=BD=E8=B7=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/heart_flow/README.md | 34 +-- src/heart_flow/heartflow.py | 165 +++++++------ src/heart_flow/sub_heartflow.py | 2 +- src/main.py | 20 +- src/plugins/heartFC_chat/heartFC_chat.py | 182 ++++++++------- src/plugins/heartFC_chat/heartFC_controler.py | 218 ------------------ src/plugins/heartFC_chat/heartFC_generator.py | 12 +- .../heartFC_chat/heartflow_message_sender.py | 28 +-- .../heartFC_chat/heartflow_processor.py | 4 +- .../heartFC_chat/heartflow_prompt_builder.py | 23 +- src/plugins/heartFC_chat/normal_chat.py | 35 ++- .../heartFC_chat/normal_chat_generator.py | 11 +- src/plugins/person_info/person_info.py | 2 +- 13 files changed, 280 insertions(+), 456 deletions(-) delete mode 100644 src/plugins/heartFC_chat/heartFC_controler.py diff --git a/src/heart_flow/README.md b/src/heart_flow/README.md index 6da1c34d6..dc00a9ff9 100644 --- a/src/heart_flow/README.md +++ b/src/heart_flow/README.md @@ -58,23 +58,23 @@ subheartflow.add_observation(observation) 2. 需要合理配置更新间隔以平衡性能和响应速度 3. 观察系统会限制消息处理数量以避免过载 -# PFChatting 与主动回复流程说明 (V2) +# HeartFChatting 与主动回复流程说明 (V2) -本文档描述了 `PFChatting` 类及其在 `heartFC_controler` 模块中实现的主动、基于兴趣的回复流程。 +本文档描述了 `HeartFChatting` 类及其在 `heartFC_controler` 模块中实现的主动、基于兴趣的回复流程。 -## 1. `PFChatting` 类概述 +## 1. `HeartFChatting` 类概述 * **目标**: 管理特定聊天流 (`stream_id`) 的主动回复逻辑,使其行为更像人类的自然交流。 -* **创建时机**: 当 `HeartFC_Chat` 的兴趣监控任务 (`_interest_monitor_loop`) 检测到某个聊天流的兴趣度 (`InterestChatting`) 达到了触发回复评估的条件 (`should_evaluate_reply`) 时,会为该 `stream_id` 获取或创建唯一的 `PFChatting` 实例 (`_get_or_create_heartFC_chat`)。 +* **创建时机**: 当 `HeartFC_Chat` 的兴趣监控任务 (`_interest_monitor_loop`) 检测到某个聊天流的兴趣度 (`InterestChatting`) 达到了触发回复评估的条件 (`should_evaluate_reply`) 时,会为该 `stream_id` 获取或创建唯一的 `HeartFChatting` 实例 (`_get_or_create_heartFC_chat`)。 * **持有**: * 对应的 `sub_heartflow` 实例引用 (通过 `heartflow.get_subheartflow(stream_id)`)。 * 对应的 `chat_stream` 实例引用。 * 对 `HeartFC_Chat` 单例的引用 (用于调用发送消息、处理表情等辅助方法)。 -* **初始化**: `PFChatting` 实例在创建后会执行异步初始化 (`_initialize`),这可能包括加载必要的上下文或历史信息(*待确认是否实现了读取历史消息*)。 +* **初始化**: `HeartFChatting` 实例在创建后会执行异步初始化 (`_initialize`),这可能包括加载必要的上下文或历史信息(*待确认是否实现了读取历史消息*)。 ## 2. 核心回复流程 (由 `HeartFC_Chat` 触发) -当 `HeartFC_Chat` 调用 `PFChatting` 实例的方法 (例如 `add_time`) 时,会启动内部的回复决策与执行流程: +当 `HeartFC_Chat` 调用 `HeartFChatting` 实例的方法 (例如 `add_time`) 时,会启动内部的回复决策与执行流程: 1. **规划 (Planner):** * **输入**: 从关联的 `sub_heartflow` 获取观察结果、思考链、记忆片段等上下文信息。 @@ -100,7 +100,7 @@ subheartflow.add_observation(observation) * **处理**: 如果检查结果认为回复不合适,则该回复将被**抛弃**。 4. **发送协调:** - * **执行**: 如果 Checker 通过,`PFChatting` 会调用 `HeartFC_Chat` 实例提供的发送接口: + * **执行**: 如果 Checker 通过,`HeartFChatting` 会调用 `HeartFC_Chat` 实例提供的发送接口: * `_create_thinking_message`: 通知 `MessageManager` 显示"正在思考"状态。 * `_send_response_messages`: 将最终的回复文本交给 `MessageManager` 进行排队和发送。 * `_handle_emoji`: 如果需要发送表情包,调用此方法处理表情包的获取和发送。 @@ -109,20 +109,20 @@ subheartflow.add_observation(observation) ## 3. 与其他模块的交互 * **`HeartFC_Chat`**: - * 创建、管理和触发 `PFChatting` 实例。 - * 提供发送消息 (`_send_response_messages`)、处理表情 (`_handle_emoji`)、创建思考消息 (`_create_thinking_message`) 的接口给 `PFChatting` 调用。 + * 创建、管理和触发 `HeartFChatting` 实例。 + * 提供发送消息 (`_send_response_messages`)、处理表情 (`_handle_emoji`)、创建思考消息 (`_create_thinking_message`) 的接口给 `HeartFChatting` 调用。 * 运行兴趣监控循环 (`_interest_monitor_loop`)。 * **`InterestManager` / `InterestChatting`**: * `InterestManager` 存储每个 `stream_id` 的 `InterestChatting` 实例。 * `InterestChatting` 负责计算兴趣衰减和回复概率。 - * `HeartFC_Chat` 查询 `InterestChatting.should_evaluate_reply()` 来决定是否触发 `PFChatting`。 + * `HeartFC_Chat` 查询 `InterestChatting.should_evaluate_reply()` 来决定是否触发 `HeartFChatting`。 * **`heartflow` / `sub_heartflow`**: - * `PFChatting` 从对应的 `sub_heartflow` 获取进行规划所需的核心上下文信息 (观察、思考链等)。 + * `HeartFChatting` 从对应的 `sub_heartflow` 获取进行规划所需的核心上下文信息 (观察、思考链等)。 * **`MessageManager` / `MessageSender`**: * 接收来自 `HeartFC_Chat` 的发送请求 (思考消息、文本消息、表情包消息)。 * 管理消息队列 (`MessageContainer`),处理消息发送间隔和实际发送 (`MessageSender`)。 * **`ResponseGenerator` (`gpt`)**: - * 被 `PFChatting` 的 Replier 部分调用,用于生成回复文本。 + * 被 `HeartFChatting` 的 Replier 部分调用,用于生成回复文本。 * **`MessageStorage`**: * 存储所有接收和发送的消息。 * **`HippocampusManager`**: @@ -131,24 +131,24 @@ subheartflow.add_observation(observation) ## 4. 原有问题与状态更新 1. **每个 `pfchating` 是否对应一个 `chat_stream`,是否是唯一的?** - * **是**。`HeartFC_Chat._get_or_create_heartFC_chat` 确保了每个 `stream_id` 只有一个 `PFChatting` 实例。 (已确认) + * **是**。`HeartFC_Chat._get_or_create_heartFC_chat` 确保了每个 `stream_id` 只有一个 `HeartFChatting` 实例。 (已确认) 2. **`observe_text` 传入进来是纯 str,是不是应该传进来 message 构成的 list?** - * **机制已改变**。当前的触发机制是基于 `InterestManager` 的概率判断。`PFChatting` 启动后,应从其关联的 `sub_heartflow` 获取更丰富的上下文信息,而非简单的 `observe_text`。 + * **机制已改变**。当前的触发机制是基于 `InterestManager` 的概率判断。`HeartFChatting` 启动后,应从其关联的 `sub_heartflow` 获取更丰富的上下文信息,而非简单的 `observe_text`。 3. **检查失败的回复应该怎么处理?** * **暂定:抛弃**。这是当前 Checker 逻辑的基础设定。 4. **如何比较相似度?** * **待实现**。Checker 需要具体的算法来比较候选回复与新消息的相似度。 5. **Planner 怎么写?** - * **待实现**。这是 `PFChatting` 的核心决策逻辑,需要结合 `sub_heartflow` 的输出、LLM 工具调用和个性化配置来设计。 + * **待实现**。这是 `HeartFChatting` 的核心决策逻辑,需要结合 `sub_heartflow` 的输出、LLM 工具调用和个性化配置来设计。 ## 6. 未来优化点 * 实现 Checker 中的相似度比较算法。 * 详细设计并实现 Planner 的决策逻辑,包括 LLM 工具调用和个性化。 -* 确认并完善 `PFChatting._initialize()` 中的历史消息加载逻辑。 +* 确认并完善 `HeartFChatting._initialize()` 中的历史消息加载逻辑。 * 探索更优的检查失败回复处理策略(例如:重新规划、修改回复等)。 -* 优化 `PFChatting` 与 `sub_heartflow` 的信息交互。 +* 优化 `HeartFChatting` 与 `sub_heartflow` 的信息交互。 diff --git a/src/heart_flow/heartflow.py b/src/heart_flow/heartflow.py index f2cd3f457..7afd0d067 100644 --- a/src/heart_flow/heartflow.py +++ b/src/heart_flow/heartflow.py @@ -15,6 +15,13 @@ import enum import os # 新增 import json # 新增 from src.plugins.chat.chat_stream import chat_manager # 新增 +# --- Add imports for merged dependencies --- +from src.plugins.heartFC_chat.heartFC_generator import ResponseGenerator +from src.do_tool.tool_use import ToolUser +from src.plugins.chat.emoji_manager import emoji_manager # Module instance +from src.plugins.person_info.relationship_manager import relationship_manager # Module instance +from src.plugins.heartFC_chat.heartflow_message_sender import MessageManager +# --- End imports --- heartflow_config = LogConfig( # 使用海马体专用样式 @@ -26,6 +33,7 @@ logger = get_module_logger("heartflow", config=heartflow_config) # Type hinting for circular dependency if TYPE_CHECKING: from src.heart_flow.sub_heartflow import SubHeartflow, ChatState # Keep SubHeartflow here too + # from src.plugins.heartFC_chat.heartFC_controler import HeartFCController # No longer needed def init_prompt(): prompt = "" @@ -143,14 +151,23 @@ class Heartflow: self._subheartflows: Dict[Any, 'SubHeartflow'] = {} # Update type hint - # --- 新增:日志和清理相关属性 (从 InterestManager 移动) --- + # --- Dependencies moved from HeartFCController --- + self.gpt_instance = ResponseGenerator() + self.mood_manager = MoodManager.get_instance() # Note: MaiStateInfo also has one, consider consolidating later if needed + self.tool_user_instance = ToolUser() + self.emoji_manager_instance = emoji_manager # Module instance + self.relationship_manager_instance = relationship_manager # Module instance + self.message_manager_instance = MessageManager() # Instantiate the message manager + # --- End moved dependencies --- + + # --- Background Task Management --- self._history_log_file_path = os.path.join(LOG_DIRECTORY, HISTORY_LOG_FILENAME) self._ensure_log_directory() # 初始化时确保目录存在 self._cleanup_task: Optional[asyncio.Task] = None self._logging_task: Optional[asyncio.Task] = None self._state_update_task: Optional[asyncio.Task] = None # 新增:状态更新任务 # 注意:衰减任务 (_decay_task) 不再需要,衰减在 SubHeartflow 的 InterestChatting 内部处理 - # --- 结束新增属性 --- + # --- End moved dependencies --- def _ensure_log_directory(self): # 新增方法 (从 InterestManager 移动) """确保日志目录存在""" @@ -315,6 +332,38 @@ class Heartflow: # --- 如果没有确定 next_state (即没有触发任何切换规则) --- # # logger.debug(f"[Heartflow State] 状态未改变,保持 {current_status.value}") # 减少日志噪音 + # --- Integrated Interest Evaluation Logic (formerly in Controller loop) --- + if self.current_state.mai_status != MaiState.OFFLINE: + try: + # Use snapshot for safe iteration + subflows_snapshot = list(self._subheartflows.values()) + evaluated_count = 0 + promoted_count = 0 + + for sub_hf in subflows_snapshot: + # Double-check if subflow still exists and is in CHAT state + if sub_hf.subheartflow_id in self._subheartflows and sub_hf.chat_state.chat_status == ChatState.CHAT: + evaluated_count += 1 + if sub_hf.should_evaluate_reply(): + stream_name = chat_manager.get_stream_name(sub_hf.subheartflow_id) or sub_hf.subheartflow_id + log_prefix = f"[{stream_name}]" + logger.info(f"{log_prefix} 兴趣概率触发,尝试将状态从 CHAT 提升到 FOCUSED") + # set_chat_state handles limit checks and HeartFChatting creation internally + await sub_hf.set_chat_state(ChatState.FOCUSED) + # Check if state actually changed (set_chat_state might block due to limits) + if sub_hf.chat_state.chat_status == ChatState.FOCUSED: + promoted_count += 1 + # else: # No need to log every non-trigger event + # logger.trace(f"[{sub_hf.subheartflow_id}] In CHAT state, but should_evaluate_reply returned False.") + + if evaluated_count > 0: + logger.debug(f"[Heartflow Interest Eval] Evaluated {evaluated_count} CHAT flows. Promoted {promoted_count} to FOCUSED.") + + except Exception as e: + logger.error(f"[Heartflow] 兴趣评估任务出错: {e}") + logger.error(traceback.format_exc()) + # --- End Integrated Interest Evaluation --- + except Exception as e: logger.error(f"[Heartflow] 状态更新任务出错: {e}") logger.error(traceback.format_exc()) @@ -549,7 +598,7 @@ class Heartflow: for _, flow in items_snapshot: # Check if flow still exists in the main dict in case it was removed concurrently if flow.subheartflow_id in self._subheartflows and flow.chat_state.chat_status == target_state: - count += 1 + count += 1 return count # --- End helper method --- # @@ -559,42 +608,25 @@ class Heartflow: 创建本身不受限,因为初始状态是ABSENT。 限制将在状态转换时检查。 """ - # --- 移除创建前的限制检查 --- # - # current_mai_state = self.current_state.mai_status - # normal_limit = current_mai_state.get_normal_chat_max_num() - # focused_limit = current_mai_state.get_focused_chat_max_num() - # total_limit = normal_limit + focused_limit - # current_active_count = 0 - # items_snapshot = list(self._subheartflows.items()) - # for _, flow in items_snapshot: - # if flow.chat_state.chat_status == ChatState.CHAT or flow.chat_state.chat_status == ChatState.FOCUSED: - # current_active_count += 1 - # if current_active_count >= total_limit and total_limit > 0: - # stream_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id - # logger.warning(f"[Heartflow Create] Skipped due to limit...") - # return None - # --- 结束移除 --- # existing_subheartflow = self._subheartflows.get(subheartflow_id) if existing_subheartflow: return existing_subheartflow - logger.info(f"[Heartflow] 尝试创建新的 subheartflow: {subheartflow_id}") + # logger.info(f"[Heartflow] 尝试创建新的 subheartflow: {subheartflow_id}") try: - # --- Pass 'self' (Heartflow instance) to SubHeartflow constructor --- # subheartflow = SubHeartflow(subheartflow_id, self) # 创建并初始化观察对象 - logger.debug(f"[Heartflow] 为 {subheartflow_id} 创建 observation") + observation = ChattingObservation(subheartflow_id) await observation.initialize() subheartflow.add_observation(observation) - logger.debug(f"[Heartflow] 为 {subheartflow_id} 添加 observation 成功") + # 创建并存储后台任务 (SubHeartflow 自己的后台任务) subheartflow.task = asyncio.create_task(subheartflow.subheartflow_start_working()) - logger.debug(f"[Heartflow] 为 {subheartflow_id} 创建后台任务成功") - + logger.debug(f"[Heartflow] 为 {subheartflow_id} 创建后台任务成功,添加 observation 成功") # 添加到管理字典 self._subheartflows[subheartflow_id] = subheartflow logger.info(f"[Heartflow] 添加 subheartflow {subheartflow_id} 成功") @@ -628,10 +660,11 @@ class Heartflow: logger.debug(f"[Heartflow Limits] 已取消子心流 {stream_name} 的后台任务") # TODO: Ensure controller.stop_heartFC_chat is called if needed - from src.plugins.heartFC_chat.heartFC_controler import HeartFCController # Local import to avoid cycle - controller = HeartFCController.get_instance() - if controller and controller.is_heartFC_chat_active(subheartflow_id): - await controller.stop_heartFC_chat(subheartflow_id) + # This is now handled by subheartflow.set_chat_state(ChatState.ABSENT) called in _stop_subheartflow + # from src.plugins.heartFC_chat.heartFC_controler import HeartFCController # Local import to avoid cycle + # controller = HeartFCController.get_instance() + # if controller and controller.is_heartFC_chat_active(subheartflow_id): + # await controller.stop_heartFC_chat(subheartflow_id) # 从字典移除 del self._subheartflows[subheartflow_id] @@ -684,8 +717,8 @@ class Heartflow: focused_flows = [] items_snapshot_after_normal = list(self._subheartflows.items()) for flow_id, flow in items_snapshot_after_normal: - if flow_id not in self._subheartflows: continue # Double check - if flow.chat_state.chat_status == ChatState.FOCUSED: + if flow_id not in self._subheartflows: continue # Double check + if flow.chat_state.chat_status == ChatState.FOCUSED: focused_flows.append((flow_id, flow.last_active_time)) # 检查 Focused (FOCUSED) 限制 @@ -745,50 +778,50 @@ class Heartflow: """当主状态变为 OFFLINE 时,停止所有子心流的活动并设置为 ABSENT""" logger.info("[Heartflow Deactivate] 开始停用所有子心流...") try: - from src.plugins.heartFC_chat.heartFC_controler import HeartFCController # 本地导入避免循环依赖 - controller = HeartFCController.get_instance() - except ImportError: - logger.warning("[Heartflow Deactivate] 无法导入 HeartFCController,将跳过停止 heartFC_chat。") - controller = None - except Exception as e: - logger.error(f"[Heartflow Deactivate] 获取 HeartFCController 实例时出错: {e}") - controller = None + # TODO: Ensure controller.stop_heartFC_chat is called if needed + # This is now handled by subheartflow.set_chat_state(ChatState.ABSENT) called in _stop_subheartflow + # from src.plugins.heartFC_chat.heartFC_controler import HeartFCController # Local import to avoid cycle + # controller = HeartFCController.get_instance() + # if controller and controller.is_heartFC_chat_active(flow_id): + # await controller.stop_heartFC_chat(flow_id) - # 使用 ID 快照进行迭代 - flow_ids_snapshot = list(self._subheartflows.keys()) - deactivated_count = 0 + # 使用 ID 快照进行迭代 + flow_ids_snapshot = list(self._subheartflows.keys()) + deactivated_count = 0 - for flow_id in flow_ids_snapshot: - subflow = self._subheartflows.get(flow_id) - if not subflow: - continue # Subflow 可能在迭代过程中被清理 + for flow_id in flow_ids_snapshot: + subflow = self._subheartflows.get(flow_id) + if not subflow: + continue # Subflow 可能在迭代过程中被清理 - stream_name = chat_manager.get_stream_name(flow_id) or flow_id + stream_name = chat_manager.get_stream_name(flow_id) or flow_id - try: - # 停止相关聊天进程 (例如 pf_chat) - if controller: + try: + # 停止相关聊天进程 (例如 pf_chat) # TODO: 确认是否有 reason_chat 需要停止,并添加相应逻辑 - if controller.is_heartFC_chat_active(flow_id): - logger.debug(f"[Heartflow Deactivate] 正在停止子心流 {stream_name} 的 heartFC_chat。") - await controller.stop_heartFC_chat(flow_id) + # if controller: + # if controller.is_heartFC_chat_active(flow_id): + # logger.debug(f"[Heartflow Deactivate] 正在停止子心流 {stream_name} 的 heartFC_chat。") + # await controller.stop_heartFC_chat(flow_id) - # 设置状态为 ABSENT - if subflow.chat_state.chat_status != ChatState.ABSENT: - logger.debug(f"[Heartflow Deactivate] 正在将子心流 {stream_name} 状态设置为 ABSENT。") - # 调用 set_chat_state,它会处理日志和状态更新 - subflow.set_chat_state(ChatState.ABSENT) - deactivated_count += 1 - else: - # 如果已经是 ABSENT,则无需再次设置,但记录一下检查 - logger.trace(f"[Heartflow Deactivate] 子心流 {stream_name} 已处于 ABSENT 状态。") + # 设置状态为 ABSENT + if subflow.chat_state.chat_status != ChatState.ABSENT: + logger.debug(f"[Heartflow Deactivate] 正在将子心流 {stream_name} 状态设置为 ABSENT。") + # 调用 set_chat_state,它会处理日志和状态更新 + subflow.set_chat_state(ChatState.ABSENT) + deactivated_count += 1 + else: + # 如果已经是 ABSENT,则无需再次设置,但记录一下检查 + logger.trace(f"[Heartflow Deactivate] 子心流 {stream_name} 已处于 ABSENT 状态。") - except Exception as e: - logger.error(f"[Heartflow Deactivate] 停用子心流 {stream_name} 时出错: {e}") - logger.error(traceback.format_exc()) + except Exception as e: + logger.error(f"[Heartflow Deactivate] 停用子心流 {stream_name} 时出错: {e}") + logger.error(traceback.format_exc()) - logger.info(f"[Heartflow Deactivate] 完成停用,共将 {deactivated_count} 个子心流设置为 ABSENT 状态 (不包括已是 ABSENT 的)。") - # --- 结束新增方法 --- # + logger.info(f"[Heartflow Deactivate] 完成停用,共将 {deactivated_count} 个子心流设置为 ABSENT 状态 (不包括已是 ABSENT 的)。") + except Exception as e: + logger.error(f"[Heartflow Deactivate] 停用所有子心流时出错: {e}") + logger.error(traceback.format_exc()) init_prompt() # 创建一个全局的管理器实例 diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index ad9c11497..9d86561b2 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -288,7 +288,7 @@ class SubHeartflow: self.last_active_time = time.time() logger.info(f"{log_prefix} 聊天状态从 {current_state.value} 变更为 {new_state.value}") - # TODO: 考虑从FOCUSED状态转出时是否需要停止PFChatting + # TODO: 考虑从FOCUSED状态转出时是否需要停止HeartFChatting # 这部分逻辑可能更适合放在Heartflow的_stop_subheartflow或HeartFCController的循环中处理 async def subheartflow_start_working(self): diff --git a/src/main.py b/src/main.py index fa18dcc0d..ae851551a 100644 --- a/src/main.py +++ b/src/main.py @@ -17,7 +17,6 @@ from .common.logger import get_module_logger from .plugins.remote import heartbeat_thread # noqa: F401 from .individuality.individuality import Individuality from .common.server import global_server -from .plugins.heartFC_chat.heartFC_controler import HeartFCController logger = get_module_logger("main") @@ -67,11 +66,6 @@ class MainSystem: # 启动愿望管理器 await willing_manager.async_task_starter() - # 启动消息处理器 - if not self._message_manager_started: - asyncio.create_task(message_manager.start_processor()) - self._message_manager_started = True - # 初始化聊天管理器 await chat_manager._initialize() asyncio.create_task(chat_manager._auto_save_task()) @@ -107,18 +101,14 @@ class MainSystem: logger.success("个体特征初始化成功") try: - # 启动心流系统 + # 启动 Heartflow 的 MessageManager (负责消息发送/排队) + await heartflow.message_manager_instance.start() + logger.success("心流消息管理器启动成功") + + # 启动心流系统主循环 asyncio.create_task(heartflow.heartflow_start_working()) logger.success("心流系统启动成功") - # 初始化并独立启动 HeartFCController - heartfc_chat_instance = HeartFCController.get_instance() - if heartfc_chat_instance: - await heartfc_chat_instance.start() - logger.success("HeartFC_Chat 模块独立启动成功") - else: - logger.error("获取 HeartFC_Chat 实例失败,无法启动。") - init_time = int(1000 * (time.time() - init_start_time)) logger.success(f"初始化完成,神经元放电{init_time}次") except Exception as e: diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index d2695c6ad..3fec14d36 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -14,6 +14,13 @@ from src.plugins.models.utils_model import LLMRequest from src.config.config import global_config from src.plugins.chat.utils_image import image_path_to_base64 # Local import needed after move from src.plugins.utils.timer_calculater import Timer # <--- Import Timer +# --- Import necessary dependencies directly --- +from .heartFC_generator import ResponseGenerator # Assuming this is the type for gpt +from src.do_tool.tool_use import ToolUser +from src.plugins.chat.emoji_manager import EmojiManager # Assuming this is the type +from .heartflow_message_sender import MessageManager # Assuming this is the type +# --- End import --- + INITIAL_DURATION = 60.0 @@ -23,11 +30,13 @@ interest_log_config = LogConfig( console_format=PFC_STYLE_CONFIG["console_format"], # 使用默认控制台格式 file_format=PFC_STYLE_CONFIG["file_format"], # 使用默认文件格式 ) -logger = get_module_logger("PFCLoop", config=interest_log_config) # Logger Name Changed +logger = get_module_logger("HeartFCLoop", config=interest_log_config) # Logger Name Changed # Forward declaration for type hinting if TYPE_CHECKING: + # Keep this if HeartFCController methods are still needed elsewhere, + # but the instance variable will be removed from HeartFChatting from .heartFC_controler import HeartFCController PLANNER_TOOL_DEFINITION = [ @@ -57,22 +66,33 @@ PLANNER_TOOL_DEFINITION = [ ] -class PFChatting: +class HeartFChatting: """ - 管理一个连续的Plan-Filter-Check (现在改为Plan-Replier-Sender)循环 + 管理一个连续的Plan-Replier-Sender循环 用于在特定聊天流中生成回复,由计时器控制。 只要计时器>0,循环就会继续。 + 现在由其关联的 SubHeartflow 管理生命周期。 """ - def __init__(self, chat_id: str, heartfc_controller_instance: "HeartFCController"): + def __init__(self, + chat_id: str, + # --- Explicit Dependencies --- + gpt_instance: ResponseGenerator, + tool_user_instance: ToolUser, + emoji_manager_instance: EmojiManager, + message_manager_instance: MessageManager + # --- End Explicit Dependencies --- + ): """ - 初始化PFChatting实例。 + 初始化HeartFChatting实例。 Args: chat_id: The identifier for the chat stream (e.g., stream_id). - heartfc_controller_instance: 访问共享资源和方法的主HeartFCController实例。 + gpt_instance: The ResponseGenerator instance for generating text replies. + tool_user_instance: The ToolUser instance for using tools. + emoji_manager_instance: The EmojiManager instance for handling emojis. + message_manager_instance: The MessageManager instance for sending/managing messages. """ - self.heartfc_controller = heartfc_controller_instance # Store the controller instance self.stream_id: str = chat_id self.chat_stream: Optional[ChatStream] = None self.sub_hf: Optional[SubHeartflow] = None @@ -81,7 +101,15 @@ class PFChatting: self._processing_lock = asyncio.Lock() # 确保只有一个 Plan-Replier-Sender 周期在运行 self._timer_lock = asyncio.Lock() # 用于安全更新计时器 - # Access LLM config through the controller + # --- Store Dependencies --- + self.gpt_instance = gpt_instance + self.tool_user = tool_user_instance + self.emoji_manager = emoji_manager_instance + self.message_manager = message_manager_instance + # --- End Store Dependencies --- + + + # Access LLM config through global_config or pass if needed self.planner_llm = LLMRequest( model=global_config.llm_normal, temperature=global_config.llm_normal["temp"], @@ -123,7 +151,7 @@ class PFChatting: logger.warning(f"{log_prefix} 获取SubHeartflow失败。一些功能可能受限。") self._initialized = True - logger.info(f"麦麦感觉到了,激发了PFChatting{log_prefix} 初始化成功。") + logger.info(f"麦麦感觉到了,激发了HeartFChatting{log_prefix} 初始化成功。") return True except Exception as e: logger.error(f"{log_prefix} 初始化失败: {e}") @@ -187,23 +215,22 @@ class PFChatting: try: exception = task.exception() if exception: - logger.error(f"{log_prefix} PFChatting: 麦麦脱离了聊天(异常): {exception}") + logger.error(f"{log_prefix} HeartFChatting: 麦麦脱离了聊天(异常): {exception}") logger.error(traceback.format_exc()) # Log full traceback for exceptions else: - logger.debug(f"{log_prefix} PFChatting: 麦麦脱离了聊天 (正常完成)") + logger.debug(f"{log_prefix} HeartFChatting: 麦麦脱离了聊天 (正常完成)") except asyncio.CancelledError: - logger.info(f"{log_prefix} PFChatting: 麦麦脱离了聊天(任务取消)") + logger.info(f"{log_prefix} HeartFChatting: 麦麦脱离了聊天(任务取消)") finally: self._loop_active = False self._loop_task = None self._last_added_duration = self._initial_duration self._trigger_count_this_activation = 0 if self._processing_lock.locked(): - logger.warning(f"{log_prefix} PFChatting: 处理锁在循环结束时仍被锁定,强制释放。") + logger.warning(f"{log_prefix} HeartFChatting: 处理锁在循环结束时仍被锁定,强制释放。") self._processing_lock.release() - # Remove instance from controller's dict? Only if it's truly done. - # Consider if loop can be restarted vs instance destroyed. - # asyncio.create_task(self.heartfc_controller._remove_heartFC_chat_instance(self.stream_id)) # Example cleanup + # Instance removal is now handled by SubHeartflow + # asyncio.create_task(self.heartfc_controller._remove_heartFC_chat_instance(self.stream_id)) # Removed async def _run_pf_loop(self): """ @@ -211,25 +238,26 @@ class PFChatting: 管理每个循环周期的处理锁 """ log_prefix = self._get_log_prefix() - logger.info(f"{log_prefix} PFChatting: 麦麦打算好好聊聊 (定时器: {self._loop_timer:.1f}s)") + logger.info(f"{log_prefix} HeartFChatting: 麦麦打算好好聊聊 (定时器: {self._loop_timer:.1f}s)") try: thinking_id = "" while True: cycle_timers = {} # <--- Initialize timers dict for this cycle - if self.heartfc_controller.MessageManager().check_if_sending_message_exist(self.stream_id, thinking_id): - # logger.info(f"{log_prefix} PFChatting: 11111111111111111111111111111111麦麦还在发消息,等会再规划") + # Access MessageManager directly + if self.message_manager.check_if_sending_message_exist(self.stream_id, thinking_id): + # logger.info(f"{log_prefix} HeartFChatting: 11111111111111111111111111111111麦麦还在发消息,等会再规划") await asyncio.sleep(1) continue else: - # logger.info(f"{log_prefix} PFChatting: 11111111111111111111111111111111麦麦不发消息了,开始规划") + # logger.info(f"{log_prefix} HeartFChatting: 11111111111111111111111111111111麦麦不发消息了,开始规划") pass async with self._timer_lock: current_timer = self._loop_timer if current_timer <= 0: logger.info( - f"{log_prefix} PFChatting: 聊太久了,麦麦打算休息一下 (计时器为 {current_timer:.1f}s)。退出PFChatting。" + f"{log_prefix} HeartFChatting: 聊太久了,麦麦打算休息一下 (计时器为 {current_timer:.1f}s)。退出HeartFChatting。" ) break @@ -244,7 +272,7 @@ class PFChatting: # Use try_acquire pattern or timeout? await self._processing_lock.acquire() acquired_lock = True - # logger.debug(f"{log_prefix} PFChatting: 循环获取到处理锁") + # logger.debug(f"{log_prefix} HeartFChatting: 循环获取到处理锁") # 在规划前记录数据库时间戳 planner_start_db_time = time.time() @@ -268,7 +296,7 @@ class PFChatting: # Continue to timer decrement and sleep elif action == "text_reply": - logger.debug(f"{log_prefix} PFChatting: 麦麦决定回复文本. 理由: {reasoning}") + logger.debug(f"{log_prefix} HeartFChatting: 麦麦决定回复文本. 理由: {reasoning}") action_taken_this_cycle = True anchor_message = await self._get_anchor_message(observed_messages) if not anchor_message: @@ -312,7 +340,7 @@ class PFChatting: self._cleanup_thinking_message(thinking_id) elif action == "emoji_reply": logger.info( - f"{log_prefix} PFChatting: 麦麦决定回复表情 ('{emoji_query}'). 理由: {reasoning}" + f"{log_prefix} HeartFChatting: 麦麦决定回复表情 ('{emoji_query}'). 理由: {reasoning}" ) action_taken_this_cycle = True anchor = await self._get_anchor_message(observed_messages) @@ -328,10 +356,10 @@ class PFChatting: action_taken_this_cycle = True # 即使发送失败,Planner 也决策了动作 elif action == "no_reply": - logger.info(f"{log_prefix} PFChatting: 麦麦决定不回复. 原因: {reasoning}") + logger.info(f"{log_prefix} HeartFChatting: 麦麦决定不回复. 原因: {reasoning}") action_taken_this_cycle = False # 标记为未执行动作 # --- 新增:等待新消息 --- - logger.debug(f"{log_prefix} PFChatting: 开始等待新消息 (自 {planner_start_db_time})...") + logger.debug(f"{log_prefix} HeartFChatting: 开始等待新消息 (自 {planner_start_db_time})...") observation = None if self.sub_hf: observation = self.sub_hf._get_primary_observation() @@ -343,18 +371,18 @@ class PFChatting: # 检查计时器是否耗尽 async with self._timer_lock: if self._loop_timer <= 0: - logger.info(f"{log_prefix} PFChatting: 等待新消息时计时器耗尽。") + logger.info(f"{log_prefix} HeartFChatting: 等待新消息时计时器耗尽。") break # 计时器耗尽,退出等待 # 检查是否有新消息 has_new = await observation.has_new_messages_since(planner_start_db_time) if has_new: - logger.info(f"{log_prefix} PFChatting: 检测到新消息,结束等待。") + logger.info(f"{log_prefix} HeartFChatting: 检测到新消息,结束等待。") break # 收到新消息,退出等待 # 检查等待是否超时(例如,防止无限等待) if time.monotonic() - wait_start_time > 60: # 等待60秒示例 - logger.warning(f"{log_prefix} PFChatting: 等待新消息超时(60秒)。") + logger.warning(f"{log_prefix} HeartFChatting: 等待新消息超时(60秒)。") break # 超时退出 # 等待一段时间再检查 @@ -364,16 +392,16 @@ class PFChatting: logger.info(f"{log_prefix} 等待新消息的 sleep 被中断。") raise # 重新抛出取消错误,以便外层循环处理 else: - logger.warning(f"{log_prefix} PFChatting: 无法获取 Observation 实例,无法等待新消息。") + logger.warning(f"{log_prefix} HeartFChatting: 无法获取 Observation 实例,无法等待新消息。") # --- 等待结束 --- elif action == "error": # Action specifically set to error by planner - logger.error(f"{log_prefix} PFChatting: Planner返回错误状态. 原因: {reasoning}") + logger.error(f"{log_prefix} HeartFChatting: Planner返回错误状态. 原因: {reasoning}") action_taken_this_cycle = False else: # Unknown action from planner logger.warning( - f"{log_prefix} PFChatting: Planner返回未知动作 '{action}'. 原因: {reasoning}" + f"{log_prefix} HeartFChatting: Planner返回未知动作 '{action}'. 原因: {reasoning}" ) action_taken_this_cycle = False @@ -404,14 +432,14 @@ class PFChatting: finally: if acquired_lock: self._processing_lock.release() - logger.trace(f"{log_prefix} 循环释放了处理锁.") + # logger.trace(f"{log_prefix} 循环释放了处理锁.") # Reduce noise async with self._timer_lock: self._loop_timer -= cycle_duration # Log timer decrement less aggressively if cycle_duration > 0.1 or not action_taken_this_cycle: logger.debug( - f"{log_prefix} PFChatting: 周期耗时 {cycle_duration:.2f}s. 剩余时间: {self._loop_timer:.1f}s." + f"{log_prefix} HeartFChatting: 周期耗时 {cycle_duration:.2f}s. 剩余时间: {self._loop_timer:.1f}s." ) # --- Delay --- # @@ -431,13 +459,13 @@ class PFChatting: break except asyncio.CancelledError: - logger.info(f"{log_prefix} PFChatting: 麦麦的聊天主循环被取消了") + logger.info(f"{log_prefix} HeartFChatting: 麦麦的聊天主循环被取消了") except Exception as e_loop_outer: - logger.error(f"{log_prefix} PFChatting: 麦麦的聊天主循环意外出错: {e_loop_outer}") + logger.error(f"{log_prefix} HeartFChatting: 麦麦的聊天主循环意外出错: {e_loop_outer}") logger.error(traceback.format_exc()) finally: # State reset is primarily handled by _handle_loop_completion callback - logger.info(f"{log_prefix} PFChatting: 麦麦的聊天主循环结束。") + logger.info(f"{log_prefix} HeartFChatting: 麦麦的聊天主循环结束。") async def _planner(self) -> Dict[str, Any]: """ @@ -451,19 +479,32 @@ class PFChatting: current_mind: Optional[str] = None llm_error = False # Flag for LLM failure + # --- Ensure SubHeartflow is available --- + if not self.sub_hf: + # Attempt to re-fetch if missing (might happen if initialization order changes) + self.sub_hf = heartflow.get_subheartflow(self.stream_id) + if not self.sub_hf: + logger.error(f"{log_prefix}[Planner] SubHeartflow is not available. Cannot proceed.") + return {"action": "error", "reasoning": "SubHeartflow unavailable", "llm_error": True, "observed_messages": []} + + try: + # Access observation via self.sub_hf observation = self.sub_hf._get_primary_observation() await observation.observe() observed_messages = observation.talking_message observed_messages_str = observation.talking_message_str except Exception as e: logger.error(f"{log_prefix}[Planner] 获取观察信息时出错: {e}") + # Handle error gracefully, maybe return an error state + observed_messages_str = "[Error getting observation]" + # Consider returning error here if observation is critical # --- 结束获取观察信息 --- # # --- (Moved from _replier_work) 1. 思考前使用工具 --- # try: - # Access tool_user via controller - tool_result = await self.heartfc_controller.tool_user.use_tool( + # Access tool_user directly + tool_result = await self.tool_user.use_tool( message_txt=observed_messages_str, sub_heartflow=self.sub_hf ) if tool_result.get("used_tools", False): @@ -580,28 +621,6 @@ class PFChatting: """ try: - # last_msg_dict = None - # if observed_messages: - # last_msg_dict = observed_messages[-1] - - # if last_msg_dict: - # try: - # anchor_message = MessageRecv(last_msg_dict) # 移除 chat_stream 参数 - # anchor_message.update_chat_stream(self.chat_stream) # 添加 update_chat_stream 调用 - # if not ( - # anchor_message - # and anchor_message.message_info - # and anchor_message.message_info.message_id - # and anchor_message.message_info.user_info - # ): - # raise ValueError("重构的 MessageRecv 缺少必要信息.") - # # logger.debug(f"{self._get_log_prefix()} 重构的锚点消息: ID={anchor_message.message_info.message_id}") - # return anchor_message - # except Exception as e_reconstruct: - # logger.warning( - # f"{self._get_log_prefix()} 从观察到的消息重构 MessageRecv 失败: {e_reconstruct}. 创建占位符." - # ) - # --- Create Placeholder --- # placeholder_id = f"mid_pf_{int(time.time() * 1000)}" @@ -637,8 +656,8 @@ class PFChatting: """Safely removes the thinking message.""" log_prefix = self._get_log_prefix() try: - # Access MessageManager via controller - container = self.heartfc_controller.MessageManager().get_container(self.stream_id) + # Access MessageManager directly + container = self.message_manager.get_container(self.stream_id) container.remove_message(thinking_id, msg_type=MessageThinking) logger.debug(f"{log_prefix} Cleaned up thinking message {thinking_id}.") except Exception as e: @@ -676,10 +695,10 @@ class PFChatting: async def shutdown(self): """ - Gracefully shuts down the PFChatting instance by cancelling the active loop task. + Gracefully shuts down the HeartFChatting instance by cancelling the active loop task. """ log_prefix = self._get_log_prefix() - logger.info(f"{log_prefix} Shutting down PFChatting...") + logger.info(f"{log_prefix} Shutting down HeartFChatting...") if self._loop_task and not self._loop_task.done(): logger.info(f"{log_prefix} Cancelling active PF loop task.") self._loop_task.cancel() @@ -699,7 +718,7 @@ class PFChatting: if self._processing_lock.locked(): logger.warning(f"{log_prefix} Releasing processing lock during shutdown.") self._processing_lock.release() - logger.info(f"{log_prefix} PFChatting shutdown complete.") + logger.info(f"{log_prefix} HeartFChatting shutdown complete.") async def _build_planner_prompt(self, observed_messages_str: str, current_mind: Optional[str]) -> str: """构建 Planner LLM 的提示词""" @@ -749,12 +768,13 @@ class PFChatting: response_set: Optional[List[str]] = None try: # --- Generate Response with LLM --- # - # Access gpt instance via controller - gpt_instance = self.heartfc_controller.gpt + # Access gpt instance directly # logger.debug(f"{log_prefix}[Replier-{thinking_id}] Calling LLM to generate response...") # Ensure generate_response has access to current_mind if it's crucial context - response_set = await gpt_instance.generate_response( + # Access gpt_instance directly + response_set = await self.gpt_instance.generate_response( + self.sub_hf, reason, anchor_message, # Pass anchor_message positionally (matches 'message' parameter) thinking_id, # Pass thinking_id positionally @@ -797,8 +817,8 @@ class PFChatting: reply=anchor_message, # 回复的是锚点消息 thinking_start_time=thinking_time_point, ) - # Access MessageManager via controller - self.heartfc_controller.MessageManager().add_message(thinking_message) + # Access MessageManager directly + self.message_manager.add_message(thinking_message) return thinking_id async def _send_response_messages( @@ -810,7 +830,8 @@ class PFChatting: return None chat = anchor_message.chat_stream - container = self.heartfc_controller.MessageManager().get_container(chat.stream_id) + # Access MessageManager directly + container = self.message_manager.get_container(chat.stream_id) thinking_message = None # 移除思考消息 @@ -853,7 +874,8 @@ class PFChatting: first_bot_msg = bot_message message_set.add_message(bot_message) - self.heartfc_controller.MessageManager().add_message(message_set) + # Access MessageManager directly + self.message_manager.add_message(message_set) return first_bot_msg async def _handle_emoji(self, anchor_message: Optional[MessageRecv], response_set: List[str], send_emoji: str = ""): @@ -864,13 +886,15 @@ class PFChatting: return chat = anchor_message.chat_stream - # Access emoji_manager via controller - emoji_manager_instance = self.heartfc_controller.emoji_manager + # Access emoji_manager directly + # emoji_manager_instance = self.heartfc_controller.emoji_manager # Removed if send_emoji: - emoji_raw = await emoji_manager_instance.get_emoji_for_text(send_emoji) + # Use self.emoji_manager directly + emoji_raw = await self.emoji_manager.get_emoji_for_text(send_emoji) else: emoji_text_source = "".join(response_set) if response_set else "" - emoji_raw = await emoji_manager_instance.get_emoji_for_text(emoji_text_source) + # Use self.emoji_manager directly + emoji_raw = await self.emoji_manager.get_emoji_for_text(emoji_text_source) if emoji_raw: emoji_path, _description = emoji_raw @@ -892,5 +916,5 @@ class PFChatting: is_head=False, is_emoji=True, ) - # Access MessageManager via controller - self.heartfc_controller.MessageManager().add_message(bot_message) + # Access MessageManager directly + self.message_manager.add_message(bot_message) diff --git a/src/plugins/heartFC_chat/heartFC_controler.py b/src/plugins/heartFC_chat/heartFC_controler.py deleted file mode 100644 index 521eb0d94..000000000 --- a/src/plugins/heartFC_chat/heartFC_controler.py +++ /dev/null @@ -1,218 +0,0 @@ -import traceback -from typing import Optional, Dict -import asyncio -import threading # 导入 threading -from ..moods.moods import MoodManager -from ..chat.emoji_manager import emoji_manager -from .heartFC_generator import ResponseGenerator -from .heartflow_message_sender import MessageManager -from src.heart_flow.heartflow import heartflow, MaiState -from src.heart_flow.sub_heartflow import SubHeartflow, ChatState -from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig -from src.plugins.person_info.relationship_manager import relationship_manager -from src.do_tool.tool_use import ToolUser -from src.plugins.chat.chat_stream import chat_manager -from .heartFC_chat import PFChatting - - -# 定义日志配置 -chat_config = LogConfig( - console_format=CHAT_STYLE_CONFIG["console_format"], - file_format=CHAT_STYLE_CONFIG["file_format"], -) - -logger = get_module_logger("HeartFCController", config=chat_config) - -# 检测群聊兴趣的间隔时间 -INTEREST_MONITOR_INTERVAL_SECONDS = 1 - - -class HeartFCController: - _instance: Optional['HeartFCController'] = None - _lock = threading.Lock() # 用于保证 get_instance 线程安全 - - def __init__(self): - # __init__ 现在只会在 get_instance 首次创建实例时调用一次 - # 因此不再需要 _initialized 标志 - - # 检查是否已被初始化,防止意外重入 (虽然理论上不太可能) - # hasattr 检查通常比标志位稍慢,但在这里作为额外的安全措施 - if hasattr(self, 'gpt'): - logger.warning("HeartFCController __init__ 被意外再次调用。") - return - - logger.debug("初始化 HeartFCController 单例实例...") # 更新日志信息 - self.gpt = ResponseGenerator() - self.mood_manager = MoodManager.get_instance() - self.tool_user = ToolUser() - self._interest_monitor_task: Optional[asyncio.Task] = None - - self.heartflow = heartflow - - self.heartFC_chat_instances: Dict[str, PFChatting] = {} - self._heartFC_chat_lock = asyncio.Lock() - self.emoji_manager = emoji_manager - self.relationship_manager = relationship_manager - - self.MessageManager = MessageManager - logger.info("HeartFCController 单例初始化完成。") - - @classmethod - def get_instance(cls): - """获取 HeartFCController 的单例实例。线程安全。""" - # Double-checked locking - if cls._instance is None: - with cls._lock: - if cls._instance is None: - logger.info("HeartFCController 实例不存在,正在创建...") - # 创建实例,这将自动调用 __init__ 一次 - cls._instance = cls() - logger.info("HeartFCController 实例已创建并初始化。") - # else: # 不需要这个 else 日志,否则每次获取都会打印 - # logger.debug("返回已存在的 HeartFCController 实例。") - return cls._instance - - # --- 新增:检查 PFChatting 状态的方法 --- # - def is_heartFC_chat_active(self, stream_id: str) -> bool: - """检查指定 stream_id 的 PFChatting 循环是否处于活动状态。""" - # 注意:这里直接访问字典,不加锁,因为读取通常是安全的, - # 并且 PFChatting 实例的 _loop_active 状态由其自身的异步循环管理。 - # 如果需要更强的保证,可以在访问 pf_instance 前获取 _heartFC_chat_lock - pf_instance = self.heartFC_chat_instances.get(stream_id) - if pf_instance and pf_instance._loop_active: # 直接检查 PFChatting 实例的 _loop_active 属性 - return True - return False - - # --- 结束新增 --- # - - async def start(self): - """启动异步任务,如回复启动器""" - logger.debug("HeartFCController 正在启动异步任务...") - self._initialize_monitor_task() - logger.info("HeartFCController 异步任务启动完成") - - def _initialize_monitor_task(self): - """启动后台兴趣监控任务,可以检查兴趣是否足以开启心流对话""" - if self._interest_monitor_task is None or self._interest_monitor_task.done(): - try: - loop = asyncio.get_running_loop() - self._interest_monitor_task = loop.create_task(self._response_control_loop()) - except RuntimeError: - logger.error("创建兴趣监控任务失败:没有运行中的事件循环。") - raise - else: - logger.warning("跳过兴趣监控任务创建:任务已存在或正在运行。") - - # --- Added PFChatting Instance Manager --- - async def _get_or_create_heartFC_chat(self, stream_id: str) -> Optional[PFChatting]: - """获取现有PFChatting实例或创建新实例。""" - async with self._heartFC_chat_lock: - if stream_id not in self.heartFC_chat_instances: - logger.info(f"为流 {stream_id} 创建新的PFChatting实例") - # 传递 self (HeartFCController 实例) 进行依赖注入 - instance = PFChatting(stream_id, self) - # 执行异步初始化 - if not await instance._initialize(): - logger.error(f"为流 {stream_id} 初始化PFChatting失败") - return None - self.heartFC_chat_instances[stream_id] = instance - return self.heartFC_chat_instances[stream_id] - - async def stop_heartFC_chat(self, stream_id: str): - """尝试停止并清理指定 stream_id 的 PFChatting 实例。""" - async with self._heartFC_chat_lock: - pf_instance = self.heartFC_chat_instances.pop(stream_id, None) # 从字典中移除 - if pf_instance: - stream_name = chat_manager.get_stream_name(stream_id) or stream_id - logger.info(f"[{stream_name}] 正在停止 PFChatting 实例...") - try: - await pf_instance.shutdown() # 调用实例的 shutdown 方法 - logger.info(f"[{stream_name}] PFChatting 实例已停止。") - except Exception as e: - logger.error(f"[{stream_name}] 停止 PFChatting 实例时出错: {e}") - # else: - # logger.debug(f"[{stream_name}] 没有找到需要停止的 PFChatting 实例。") - - async def _response_control_loop(self): - """后台任务,定期检查兴趣度变化并触发回复""" - logger.info("兴趣监控循环开始...") - while True: - await asyncio.sleep(INTEREST_MONITOR_INTERVAL_SECONDS) - - try: - global_mai_state = self.heartflow.current_state.mai_status - - active_stream_ids = list(self.heartflow.get_all_subheartflows_streams_ids()) - for stream_id in active_stream_ids: - stream_name = chat_manager.get_stream_name(stream_id) or stream_id - sub_hf = self.heartflow.get_subheartflow(stream_id) - if not sub_hf: - continue - - current_chat_state = sub_hf.chat_state.chat_status - log_prefix = f"[{stream_name}]" - - if global_mai_state == MaiState.OFFLINE: - if current_chat_state == ChatState.FOCUSED: - logger.warning(f"{log_prefix} Global state is OFFLINE, but SubHeartflow is FOCUSED. Stopping PFChatting.") - await self.stop_heartFC_chat(stream_id) - continue - - # --- 只有在全局状态允许时才执行以下逻辑 --- # - if current_chat_state == ChatState.CHAT: - should_evaluate = False - try: - should_evaluate = sub_hf.should_evaluate_reply() - except Exception as e: - logger.error(f"检查回复概率时出错 流 {stream_name}: {e}") - logger.error(traceback.format_exc()) - - if should_evaluate: - # --- Limit Check before entering FOCUSED state --- # - focused_limit = global_mai_state.get_focused_chat_max_num() - current_focused_count = self.heartflow.count_subflows_by_state(ChatState.FOCUSED) - - if current_focused_count >= focused_limit: - logger.debug(f"{log_prefix} 拒绝从 CHAT 转换到 FOCUSED。原因:FOCUSED 状态已达上限 ({focused_limit})。当前数量: {current_focused_count}") - # Do not change state, continue to next stream or cycle - else: - logger.info(f"{log_prefix} 兴趣概率触发,将状态从 CHAT 提升到 FOCUSED (全局状态: {global_mai_state.value}, 上限: {focused_limit}, 当前: {current_focused_count})") - sub_hf.set_chat_state(ChatState.FOCUSED) - # --- End Limit Check --- # - - elif current_chat_state == ChatState.FOCUSED: - # logger.debug(f"[{stream_name}] State FOCUSED, triggering HFC (全局状态: {global_mai_state.value})...") - await self._trigger_hfc(sub_hf) - - except asyncio.CancelledError: - logger.info("兴趣监控循环已取消。") - break - except Exception as e: - logger.error(f"兴趣监控循环错误: {e}") - logger.error(traceback.format_exc()) - await asyncio.sleep(5) - - async def _trigger_hfc(self, sub_hf: SubHeartflow): - """仅当 SubHeartflow 状态为 FOCUSED 时,触发 PFChatting 的激活或时间增加。""" - stream_id = sub_hf.subheartflow_id - stream_name = chat_manager.get_stream_name(stream_id) or stream_id # 获取流名称 - - # 首先检查状态 - if sub_hf.chat_state.chat_status != ChatState.FOCUSED: - logger.warning(f"[{stream_name}] 尝试在非 FOCUSED 状态 ({sub_hf.chat_state.chat_status.value}) 下触发 HFC。已跳过。") - return - - # 移除内部状态修改逻辑 - # chat_state = sub_hf.chat_state - # if chat_state == ChatState.ABSENT: - # chat_state = ChatState.CHAT - # elif chat_state == ChatState.CHAT: - # chat_state = ChatState.FOCUSED - - # 状态已经是 FOCUSED,直接获取或创建 PFChatting 并添加时间 - # logger.debug(f"[{stream_name}] Triggering PFChatting add_time in FOCUSED state.") # Debug log - pf_instance = await self._get_or_create_heartFC_chat(stream_id) - if pf_instance: # 确保实例成功获取或创建 - await pf_instance.add_time() # 注意:这里不再需要 create_task,因为 add_time 内部会处理任务创建 - else: - logger.error(f"[{stream_name}] 无法获取或创建 PFChatting 实例以触发 HFC。") diff --git a/src/plugins/heartFC_chat/heartFC_generator.py b/src/plugins/heartFC_chat/heartFC_generator.py index d3d5c86eb..7e1a26b16 100644 --- a/src/plugins/heartFC_chat/heartFC_generator.py +++ b/src/plugins/heartFC_chat/heartFC_generator.py @@ -11,7 +11,7 @@ from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from ..utils.timer_calculater import Timer from src.plugins.moods.moods import MoodManager - +from src.heart_flow.sub_heartflow import SubHeartflow # 定义日志配置 llm_config = LogConfig( # 使用消息发送专用样式 @@ -39,6 +39,7 @@ class ResponseGenerator: async def generate_response( self, + sub_hf: SubHeartflow, reason: str, message: MessageRecv, thinking_id: str, @@ -55,7 +56,7 @@ class ResponseGenerator: current_model = self.model_normal current_model.temperature = global_config.llm_normal["temp"] * arousal_multiplier # 激活度越高,温度越高 model_response = await self._generate_response_with_model( - reason, message, current_model, thinking_id + sub_hf, reason, message, current_model, thinking_id ) if model_response: @@ -70,7 +71,7 @@ class ResponseGenerator: return None async def _generate_response_with_model( - self, reason: str, message: MessageRecv, model: LLMRequest, thinking_id: str + self, sub_hf: SubHeartflow, reason: str, message: MessageRecv, model: LLMRequest, thinking_id: str ) -> str: sender_name = "" @@ -78,15 +79,14 @@ class ResponseGenerator: sender_name = f"<{message.chat_stream.user_info.platform}:{message.chat_stream.user_info.user_id}:{message.chat_stream.user_info.user_nickname}:{message.chat_stream.user_info.user_cardname}>" - # 构建prompt + with Timer() as t_build_prompt: prompt = await prompt_builder.build_prompt( build_mode="focus", reason=reason, - chat_stream=message.chat_stream, message_txt=message.processed_plain_text, sender_name=sender_name, - stream_id=message.chat_stream.stream_id, + subheartflow=sub_hf ) logger.info(f"构建prompt时间: {t_build_prompt.human_readable}") diff --git a/src/plugins/heartFC_chat/heartflow_message_sender.py b/src/plugins/heartFC_chat/heartflow_message_sender.py index ea8f45831..dd051da97 100644 --- a/src/plugins/heartFC_chat/heartflow_message_sender.py +++ b/src/plugins/heartFC_chat/heartflow_message_sender.py @@ -132,21 +132,21 @@ class MessageManager: """管理所有聊天流的消息容器""" _instance = None - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super(MessageManager, cls).__new__(cls, *args, **kwargs) - return cls._instance + _lock = asyncio.Lock() def __init__(self): - # 确保 __init__ 只被调用一次 - if not hasattr(self, "_initialized"): - self.containers: Dict[str, MessageContainer] = {} # chat_id -> MessageContainer - self.storage = MessageStorage() - self._running = True - self._initialized = True - # 在实例首次创建时启动消息处理器 - asyncio.create_task(self.start_processor()) + if MessageManager._instance is not None: + raise Exception("This class is a singleton!") + else: + self.containers: Dict[str, MessageContainer] = {} + self._container_lock = asyncio.Lock() + self.running = True + MessageManager._instance = self + + async def start(self): + """Starts the background processor task.""" + asyncio.create_task(self.start_processor()) + logger.info("MessageManager processor task started.") def get_container(self, chat_id: str) -> MessageContainer: """获取或创建聊天流的消息容器""" @@ -225,7 +225,7 @@ class MessageManager: async def start_processor(self): """启动消息处理器""" - while self._running: + while self.running: await asyncio.sleep(1) tasks = [] for chat_id in list(self.containers.keys()): # 使用 list 复制 key,防止在迭代时修改字典 diff --git a/src/plugins/heartFC_chat/heartflow_processor.py b/src/plugins/heartFC_chat/heartflow_processor.py index 53f064044..3d3909bc4 100644 --- a/src/plugins/heartFC_chat/heartflow_processor.py +++ b/src/plugins/heartFC_chat/heartflow_processor.py @@ -12,7 +12,7 @@ from ..chat.chat_stream import chat_manager from ..chat.message_buffer import message_buffer from ..utils.timer_calculater import Timer from src.plugins.person_info.relationship_manager import relationship_manager -from .normal_chat import ReasoningChat +from .normal_chat import NormalChat # 定义日志配置 processor_config = LogConfig( @@ -25,7 +25,7 @@ logger = get_module_logger("heartflow_processor", config=processor_config) class HeartFCProcessor: def __init__(self): self.storage = MessageStorage() - self.normal_chat = ReasoningChat.get_instance() + self.normal_chat = NormalChat.get_instance() async def process_message(self, message_data: str) -> None: """处理接收到的原始消息数据,完成消息解析、缓冲、过滤、存储、兴趣度计算与更新等核心流程。 diff --git a/src/plugins/heartFC_chat/heartflow_prompt_builder.py b/src/plugins/heartFC_chat/heartflow_prompt_builder.py index 2ce13e7c3..5647925e0 100644 --- a/src/plugins/heartFC_chat/heartflow_prompt_builder.py +++ b/src/plugins/heartFC_chat/heartflow_prompt_builder.py @@ -1,11 +1,10 @@ import random from typing import Optional +from src.heart_flow.sub_heartflow import SubHeartflow from ...config.config import global_config -from ..chat.utils import get_recent_group_detailed_plain_text from ..chat.chat_stream import chat_manager from src.common.logger import get_module_logger from ...individuality.individuality import Individuality -from src.heart_flow.heartflow import heartflow from src.plugins.utils.prompt_builder import Prompt, global_prompt_manager from src.plugins.utils.chat_message_builder import build_readable_messages, get_raw_msg_before_timestamp_with_chat from src.plugins.person_info.relationship_manager import relationship_manager @@ -83,21 +82,23 @@ class PromptBuilder: async def build_prompt( - self, build_mode,reason, chat_stream, message_txt: str, sender_name: str = "某人", stream_id: Optional[int] = None + self, build_mode,reason, message_txt: str, sender_name: str = "某人",subheartflow: SubHeartflow =None ) -> tuple[str, str]: + chat_stream = chat_manager.get_stream(subheartflow.subheartflow_id) + if build_mode == "normal": - return await self._build_prompt_normal(chat_stream, message_txt, sender_name, stream_id) + return await self._build_prompt_normal(chat_stream, message_txt, sender_name, subheartflow) elif build_mode == "focus": - return await self._build_prompt_focus(reason, chat_stream, message_txt, sender_name, stream_id) + return await self._build_prompt_focus(reason, chat_stream, message_txt, sender_name, subheartflow) async def _build_prompt_focus( - self, reason, chat_stream, message_txt: str, sender_name: str = "某人", stream_id: Optional[int] = None + self, reason, chat_stream, message_txt: str, sender_name: str = "某人", subheartflow: SubHeartflow =None ) -> tuple[str, str]: - current_mind_info = heartflow.get_subheartflow(stream_id).current_mind + current_mind_info = subheartflow.current_mind individuality = Individuality.get_instance() prompt_personality = individuality.get_prompt(type="personality", x_person=2, level=1) @@ -106,7 +107,7 @@ class PromptBuilder: # 日程构建 # schedule_prompt = f'''你现在正在做的事情是:{bot_schedule.get_current_num_task(num = 1,time_info = False)}''' - chat_stream = chat_manager.get_stream(stream_id) + chat_stream = chat_manager.get_stream(subheartflow.subheartflow_id) if chat_stream.group_info: chat_in_group = True else: @@ -185,7 +186,7 @@ class PromptBuilder: async def _build_prompt_normal( - self, chat_stream, message_txt: str, sender_name: str = "某人", stream_id: Optional[int] = None + self, chat_stream, message_txt: str, sender_name: str = "某人", subheartflow=None ) -> tuple[str, str]: # 开始构建prompt prompt_personality = "你" @@ -208,7 +209,7 @@ class PromptBuilder: (chat_stream.user_info.platform, chat_stream.user_info.user_id, chat_stream.user_info.user_nickname) ] who_chat_in_group += get_recent_group_speaker( - stream_id, + subheartflow.subheartflow_id, (chat_stream.user_info.platform, chat_stream.user_info.user_id), limit=global_config.MAX_CONTEXT_SIZE, ) @@ -248,7 +249,7 @@ class PromptBuilder: # schedule_prompt = f"""你现在正在做的事情是:{bot_schedule.get_current_num_task(num=1, time_info=False)}""" # 获取聊天上下文 - chat_stream = chat_manager.get_stream(stream_id) + chat_stream = chat_manager.get_stream(subheartflow.subheartflow_id) if chat_stream.group_info: chat_in_group = True else: diff --git a/src/plugins/heartFC_chat/normal_chat.py b/src/plugins/heartFC_chat/normal_chat.py index 74f999ef6..9e1f39608 100644 --- a/src/plugins/heartFC_chat/normal_chat.py +++ b/src/plugins/heartFC_chat/normal_chat.py @@ -22,8 +22,7 @@ from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from src.plugins.utils.timer_calculater import Timer from src.heart_flow.heartflow import heartflow from src.heart_flow.sub_heartflow import ChatState -from .heartFC_controler import HeartFCController - +from src.heart_flow.heartflow import heartflow # 定义日志配置 chat_config = LogConfig( console_format=CHAT_STYLE_CONFIG["console_format"], @@ -33,7 +32,7 @@ chat_config = LogConfig( logger = get_module_logger("normal_chat", config=chat_config) -class ReasoningChat: +class NormalChat: _instance = None _lock = threading.Lock() _initialized = False @@ -53,21 +52,21 @@ class ReasoningChat: with self.__class__._lock: # 使用类锁确保线程安全 if self._initialized: return - logger.info("正在初始化 ReasoningChat 单例...") # 添加日志 + logger.info("正在初始化 NormalChat 单例...") # 添加日志 self.storage = MessageStorage() self.gpt = ResponseGenerator() self.mood_manager = MoodManager.get_instance() # 用于存储每个 chat stream 的兴趣监控任务 self._interest_monitoring_tasks: Dict[str, asyncio.Task] = {} self._initialized = True - logger.info("ReasoningChat 单例初始化完成。") # 添加日志 + logger.info("NormalChat 单例初始化完成。") # 添加日志 @classmethod def get_instance(cls): - """获取 ReasoningChat 的单例实例。""" + """获取 NormalChat 的单例实例。""" if cls._instance is None: # 如果实例还未创建(理论上应该在 main 中初始化,但作为备用) - logger.warning("ReasoningChat 实例在首次 get_instance 时创建。") + logger.warning("NormalChat 实例在首次 get_instance 时创建。") cls() # 调用构造函数来创建实例 return cls._instance @@ -180,14 +179,8 @@ class ReasoningChat: async def _find_interested_message(self, chat: ChatStream) -> None: # 此函数设计为后台任务,轮询指定 chat 的兴趣消息。 # 它通常由外部代码在 chat 流活跃时启动。 - controller = HeartFCController.get_instance() # 获取控制器实例 stream_id = chat.stream_id # 获取 stream_id - if not controller: - logger.error(f"无法获取 HeartFCController 实例,无法检查 PFChatting 状态。stream: {stream_id}") - # 在没有控制器的情况下可能需要决定是继续处理还是完全停止?这里暂时假设继续 - pass # 或者 return? - # logger.info(f"[{stream_id}] 兴趣消息监控任务启动。") # 减少日志 while True: await asyncio.sleep(1) # 每秒检查一次 @@ -220,19 +213,18 @@ class ReasoningChat: continue # 处理下一条消息 # --- 结束状态检查 --- # - # --- 检查 PFChatting 是否活跃 (保持原有逻辑) --- # - pf_active = False - if controller: - pf_active = controller.is_heartFC_chat_active(stream_id) + # --- 检查 HeartFChatting 是否活跃 (改为检查 SubHeartflow 状态) --- # + is_focused = subheartflow.chat_state.chat_status == ChatState.FOCUSED - if pf_active: + if is_focused: # New check: If the subflow is focused, NormalChat shouldn't process removed_item = interest_dict.pop(msg_id, None) if removed_item: - logger.debug(f"[{stream_name}] PFChatting 活跃,已跳过并移除兴趣消息 {msg_id}") + # logger.debug(f"[{stream_name}] SubHeartflow 处于 FOCUSED 状态,已跳过并移除 NormalChat 兴趣消息 {msg_id}") # Reduce noise + pass continue # --- 结束检查 --- # - # 只有当状态为 CHAT 且 PFChatting 不活跃时才执行以下处理逻辑 + # 只有当状态为 CHAT 且 HeartFChatting 不活跃 (即 Subflow 不是 FOCUSED) 时才执行以下处理逻辑 try: await self.normal_normal_chat( message=message, @@ -318,9 +310,12 @@ class ReasoningChat: info_catcher.catch_decide_to_response(message) # 生成回复 + sub_hf = heartflow.get_subheartflow(stream_id) + try: with Timer("生成回复", timing_results): response_set = await self.gpt.generate_response( + sub_hf=sub_hf, message=message, thinking_id=thinking_id, ) diff --git a/src/plugins/heartFC_chat/normal_chat_generator.py b/src/plugins/heartFC_chat/normal_chat_generator.py index e9ddcba0c..18c550a13 100644 --- a/src/plugins/heartFC_chat/normal_chat_generator.py +++ b/src/plugins/heartFC_chat/normal_chat_generator.py @@ -8,7 +8,7 @@ from ..chat.utils import process_llm_response from ..utils.timer_calculater import Timer from src.common.logger import get_module_logger, LogConfig, LLM_STYLE_CONFIG from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager - +from src.heart_flow.sub_heartflow import SubHeartflow # 定义日志配置 llm_config = LogConfig( # 使用消息发送专用样式 @@ -40,7 +40,7 @@ class ResponseGenerator: self.current_model_type = "r1" # 默认使用 R1 self.current_model_name = "unknown model" - async def generate_response(self, message: MessageThinking, thinking_id: str) -> Optional[Union[str, List[str]]]: + async def generate_response(self, sub_hf: SubHeartflow, message: MessageThinking, thinking_id: str) -> Optional[Union[str, List[str]]]: """根据当前模型类型选择对应的生成函数""" # 从global_config中获取模型概率值并选择模型 if random.random() < global_config.model_reasoning_probability: @@ -54,7 +54,7 @@ class ResponseGenerator: f"{self.current_model_type}思考:{message.processed_plain_text[:30] + '...' if len(message.processed_plain_text) > 30 else message.processed_plain_text}" ) # noqa: E501 - model_response = await self._generate_response_with_model(message, current_model, thinking_id) + model_response = await self._generate_response_with_model(sub_hf, message, current_model, thinking_id) # print(f"raw_content: {model_response}") @@ -67,7 +67,7 @@ class ResponseGenerator: logger.info(f"{self.current_model_type}思考,失败") return None - async def _generate_response_with_model(self, message: MessageThinking, model: LLMRequest, thinking_id: str): + async def _generate_response_with_model(self, sub_hf: SubHeartflow, message: MessageThinking, model: LLMRequest, thinking_id: str): info_catcher = info_catcher_manager.get_info_catcher(thinking_id) if message.chat_stream.user_info.user_cardname and message.chat_stream.user_info.user_nickname: @@ -86,10 +86,9 @@ class ResponseGenerator: prompt = await prompt_builder.build_prompt( build_mode="normal", reason= "", - chat_stream=message.chat_stream, message_txt=message.processed_plain_text, sender_name=sender_name, - stream_id=message.chat_stream.stream_id, + subheartflow=sub_hf, ) logger.info(f"构建prompt时间: {t_build_prompt.human_readable}") diff --git a/src/plugins/person_info/person_info.py b/src/plugins/person_info/person_info.py index d903213f4..fc9b47c0a 100644 --- a/src/plugins/person_info/person_info.py +++ b/src/plugins/person_info/person_info.py @@ -200,7 +200,7 @@ class PersonInfoManager: }""" # logger.debug(f"取名提示词:{qv_name_prompt}") response = await self.qv_name_llm.generate_response(qv_name_prompt) - logger.debug(f"取名提示词:{qv_name_prompt}\n取名回复:{response}") + logger.trace(f"取名提示词:{qv_name_prompt}\n取名回复:{response}") result = self._extract_json_from_text(response[0]) if not result["nickname"]: