From 3763a0ed9e0334a2551b374659d054ba0f2777a1 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sat, 26 Apr 2025 19:03:36 +0800 Subject: [PATCH 1/7] =?UTF-8?q?feat=EF=BC=9A=E6=8F=90=E4=BE=9B=E6=96=B9?= =?UTF-8?q?=E6=B3=95=E8=AE=A9HFC=E7=BB=93=E6=9D=9F=EF=BC=8C=E5=BD=93?= =?UTF-8?q?=E7=AD=89=E5=BE=85=E8=BF=87=E4=B9=85no=5Freply=EF=BC=8C?= =?UTF-8?q?=E4=BC=9A=E5=9B=9E=E5=88=B0ABSENT=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/logger.py | 2 +- src/heart_flow/README.md | 3 +- src/heart_flow/background_tasks.py | 25 ---- src/heart_flow/sub_heartflow.py | 28 +++- src/heart_flow/subheartflow_manager.py | 175 +++++++++++------------ src/plugins/heartFC_chat/heartFC_chat.py | 121 ++++++++++++++-- 6 files changed, 214 insertions(+), 140 deletions(-) diff --git a/src/common/logger.py b/src/common/logger.py index 6ab3505df..176d4629c 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -337,7 +337,7 @@ REMOTE_STYLE_CONFIG = { "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 远程 | {message}", }, "simple": { - "console_format": "{time:MM-DD HH:mm} | 远程 | {message}", + "console_format": "{time:MM-DD HH:mm} | 远程| {message}", "file_format": "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | 远程 | {message}", }, } diff --git a/src/heart_flow/README.md b/src/heart_flow/README.md index a2540ebf1..4ccb662dd 100644 --- a/src/heart_flow/README.md +++ b/src/heart_flow/README.md @@ -61,6 +61,7 @@ c HeartFChatting工作方式 c.5.2.2 通过 HeartFCSender 直接发送匹配查询 (emoji_query) 的表情。 c.5.3 如果决策是 no_reply: c.5.3.1 进入等待状态,直到检测到新消息或超时。 + c.5.3.2 同时,增加内部连续不回复计数器。如果该计数器达到预设阈值(例如 5 次),则调用初始化时由 `SubHeartflowManager` 提供的回调函数。此回调函数会通知 `SubHeartflowManager` 请求将对应的 `SubHeartflow` 状态转换为 `ABSENT`。如果执行了其他动作(如 `text_reply` 或 `emoji_reply`),则此计数器会被重置。 c.6 循环结束后,记录周期信息 (CycleInfo),并根据情况进行短暂休眠,防止CPU空转。 @@ -152,7 +153,7 @@ c HeartFChatting工作方式 - **状态转换机制** (由 `SubHeartflowManager` 驱动): - **激活 `CHAT`**: 当 `Heartflow` 状态从 `OFFLINE` 变为允许聊天的状态时,`SubHeartflowManager` 会根据限制(通过 `self.mai_state_info` 获取),选择部分 `ABSENT` 状态的子心流,**检查当前 CHAT 状态数量是否达到上限**,如果未达上限,则调用其 `change_chat_state` 方法将其转换为 `CHAT`。此外,`evaluate_and_transition_subflows_by_llm` 方法也会根据 LLM 的判断,在未达上限时将 `ABSENT` 状态的子心流激活为 `CHAT`。 - **激活 `FOCUSED`**: `SubHeartflowManager` 会定期评估处于 `CHAT` 状态的子心流的兴趣度 (`InterestChatting.start_hfc_probability`),若满足条件且**检查当前 FOCUSED 状态数量未达上限**(通过 `self.mai_state_info` 获取限制),则调用 `change_chat_state` 将其提升为 `FOCUSED`。 - - **停用/回退**: `SubHeartflowManager` 可能因 `Heartflow` 状态变化、达到数量限制、长时间不活跃、随机概率 (`randomly_deactivate_subflows`) 或 LLM 评估 (`evaluate_and_transition_subflows_by_llm` 判断 `CHAT` 状态子心流应休眠) 等原因,调用 `change_chat_state` 将子心流状态设置为 `ABSENT` 或从 `FOCUSED` 回退到 `CHAT`。当子心流进入 `ABSENT` 状态后,如果持续一小时不活跃,才会被后台清理任务删除。 + - **停用/回退**: `SubHeartflowManager` 可能因 `Heartflow` 状态变化、达到数量限制、长时间不活跃、随机概率 (`randomly_deactivate_subflows`)、LLM 评估 (`evaluate_and_transition_subflows_by_llm` 判断 `CHAT` 状态子心流应休眠) 或收到来自 `HeartFChatting` 的连续不回复回调信号 (`request_absent_transition`) 等原因,调用 `change_chat_state` 将子心流状态设置为 `ABSENT` 或从 `FOCUSED` 回退到 `CHAT`。当子心流进入 `ABSENT` 状态后,如果持续一小时不活跃,才会被后台清理任务删除。 - **注意**: `change_chat_state` 方法本身只负责执行状态转换和管理内部聊天实例(`NormalChatInstance`/`HeartFlowChatInstance`),不再进行限额检查。限额检查的责任完全由调用方(即 `SubHeartflowManager` 中的相关方法,这些方法会使用内部存储的 `mai_state_info` 来获取限制)承担。 ## 3. 聊天实例详解 (Chat Instances Explained) diff --git a/src/heart_flow/background_tasks.py b/src/heart_flow/background_tasks.py index 7ae4b62f9..076f441c9 100644 --- a/src/heart_flow/background_tasks.py +++ b/src/heart_flow/background_tasks.py @@ -112,15 +112,6 @@ class BackgroundTaskManager: f"兴趣评估任务已启动 间隔:{self.interest_eval_interval}s", "_interest_eval_task", ), - # 新增随机停用任务配置 - ( - self._random_deactivation_task, - self._run_random_deactivation_cycle, - "hf_random_deactivation", - "debug", # 设为debug,避免过多日志 - f"随机停用任务已启动 间隔:{self.random_deactivation_interval}s", - "_random_deactivation_task", - ), ] # 统一启动所有任务 @@ -264,10 +255,6 @@ class BackgroundTaskManager: # --- 结束新增 --- - # --- 新增随机停用工作函数 --- - async def _perform_random_deactivation_work(self): - """执行一轮子心流随机停用检查。""" - await self.subheartflow_manager.randomly_deactivate_subflows() # --- 结束新增 --- @@ -300,15 +287,3 @@ class BackgroundTaskManager: task_func=self._perform_interest_eval_work, ) - # --- 结束新增 --- - - # --- 新增随机停用任务运行器 --- - async def _run_random_deactivation_cycle(self): - """运行随机停用循环。""" - await self._run_periodic_loop( - task_name="Random Deactivation", - interval=self.random_deactivation_interval, - task_func=self._perform_random_deactivation_work, - ) - - # --- 结束新增 --- diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index efd0ea1ed..fdc6bac7d 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -2,7 +2,7 @@ from .observation import Observation, ChattingObservation import asyncio from src.config.config import global_config import time -from typing import Optional, List, Dict, Tuple +from typing import Optional, List, Dict, Tuple, TYPE_CHECKING, Callable, Coroutine import traceback from src.common.logger import get_module_logger, LogConfig, SUB_HEARTFLOW_STYLE_CONFIG # noqa: E402 import random @@ -15,6 +15,11 @@ from src.heart_flow.mai_state_manager import MaiStateInfo from src.heart_flow.chat_state_info import ChatState, ChatStateInfo from src.heart_flow.sub_mind import SubMind +# # --- REMOVE: Conditional import --- # +# if TYPE_CHECKING: +# from src.heart_flow.subheartflow_manager import SubHeartflowManager +# # --- END REMOVE --- # + # 定义常量 (从 interest.py 移动过来) MAX_INTEREST = 15.0 @@ -234,16 +239,23 @@ class InterestChatting: class SubHeartflow: - def __init__(self, subheartflow_id, mai_states: MaiStateInfo): + def __init__( + self, + subheartflow_id, + mai_states: MaiStateInfo, + hfc_no_reply_callback: Callable[[], Coroutine[None, None, None]] + ): """子心流初始化函数 Args: subheartflow_id: 子心流唯一标识符 - parent_heartflow: 父级心流实例 + mai_states: 麦麦状态信息实例 + hfc_no_reply_callback: HFChatting 连续不回复时触发的回调 """ # 基础属性,两个值是一样的 self.subheartflow_id = subheartflow_id self.chat_id = subheartflow_id + self.hfc_no_reply_callback = hfc_no_reply_callback # 麦麦的状态 self.mai_states = mai_states @@ -364,11 +376,17 @@ class SubHeartflow: # 如果实例不存在,则创建并启动 logger.info(f"{log_prefix} 麦麦准备开始专注聊天 (创建新实例)...") try: + # 创建 HeartFChatting 实例,并传递 从构造函数传入的 回调函数 self.heart_fc_instance = HeartFChatting( - chat_id=self.chat_id, sub_mind=self.sub_mind, observations=self.observations + chat_id=self.subheartflow_id, + sub_mind=self.sub_mind, + observations=self.observations, # 传递所有观察者 + on_consecutive_no_reply_callback=self.hfc_no_reply_callback # <-- Use stored callback ) + + # 初始化并启动 HeartFChatting if await self.heart_fc_instance._initialize(): - await self.heart_fc_instance.start() # 初始化成功后启动循环 + await self.heart_fc_instance.start() logger.info(f"{log_prefix} 麦麦已成功进入专注聊天模式 (新实例已启动)。") return True else: diff --git a/src/heart_flow/subheartflow_manager.py b/src/heart_flow/subheartflow_manager.py index 62d9e2f7b..a8403d4f8 100644 --- a/src/heart_flow/subheartflow_manager.py +++ b/src/heart_flow/subheartflow_manager.py @@ -3,6 +3,7 @@ import time import random from typing import Dict, Any, Optional, List import json # 导入 json 模块 +import functools # <-- 新增导入 # 导入日志模块 from src.common.logger import get_module_logger, LogConfig, SUBHEARTFLOW_MANAGER_STYLE_CONFIG @@ -77,8 +78,17 @@ class SubHeartflowManager: return subflow try: - # 初始化子心流, 传入存储的 mai_state_info - new_subflow = SubHeartflow(subheartflow_id, self.mai_state_info) + # --- 使用 functools.partial 创建 HFC 回调 --- # + # 将 manager 的 _handle_hfc_no_reply 方法与当前的 subheartflow_id 绑定 + hfc_callback = functools.partial(self._handle_hfc_no_reply, subheartflow_id) + # --- 结束创建回调 --- # + + # 初始化子心流, 传入 mai_state_info 和 partial 创建的回调 + new_subflow = SubHeartflow( + subheartflow_id, + self.mai_state_info, + hfc_callback # <-- 传递 partial 创建的回调 + ) # 异步初始化 await new_subflow.initialize() @@ -285,74 +295,11 @@ class SubHeartflowManager: ) and final_subflow.chat_state.chat_status == ChatState.FOCUSED: current_focused_count += 1 - async def randomly_deactivate_subflows(self, deactivation_probability: float = 0.1): - """以一定概率将 FOCUSED 或 CHAT 状态的子心流回退到 ABSENT 状态。""" - log_prefix_manager = "[子心流管理器-随机停用]" - logger.debug(f"{log_prefix_manager} 开始随机停用检查... (概率: {deactivation_probability:.0%})") - - # 使用快照安全遍历 - subflows_snapshot = list(self.subheartflows.values()) - deactivated_count = 0 - - try: - for sub_hf in subflows_snapshot: - flow_id = sub_hf.subheartflow_id - stream_name = chat_manager.get_stream_name(flow_id) or flow_id - log_prefix_flow = f"[{stream_name}]" - current_state = sub_hf.chat_state.chat_status - - # 只处理 FOCUSED 或 CHAT 状态 - if current_state not in [ChatState.FOCUSED, ChatState.CHAT]: - continue - - # 检查随机概率 - if random.random() < deactivation_probability: - logger.info( - f"{log_prefix_manager} {log_prefix_flow} 随机触发停用 (从 {current_state.value}) -> ABSENT" - ) - - # 获取当前实例以检查最新状态 - current_subflow = self.subheartflows.get(flow_id) - if not current_subflow or current_subflow.chat_state.chat_status != current_state: - logger.warning(f"{log_prefix_manager} {log_prefix_flow} 尝试停用时状态已改变或实例消失,跳过。") - continue - - # --- 状态设置 --- # - # 注意:这里传递的状态数量是 *停用前* 的状态数量 - await current_subflow.change_chat_state(ChatState.ABSENT) - - # --- 状态验证 (可选) --- - final_subflow = self.subheartflows.get(flow_id) - if final_subflow: - final_state = final_subflow.chat_state.chat_status - if final_state == ChatState.ABSENT: - logger.debug( - f"{log_prefix_manager} {log_prefix_flow} 成功从 {current_state.value} 停用到 ABSENT 状态" - ) - deactivated_count += 1 - else: - logger.warning( - f"{log_prefix_manager} {log_prefix_flow} 尝试停用到 ABSENT 后状态仍为 {final_state.value}" - ) - else: - logger.warning(f"{log_prefix_manager} {log_prefix_flow} 停用后验证时子心流 {flow_id} 消失") - - except Exception as e: - logger.error(f"{log_prefix_manager} 随机停用周期出错: {e}", exc_info=True) - - if deactivated_count > 0: - logger.info(f"{log_prefix_manager} 随机停用周期结束, 成功停用 {deactivated_count} 个子心流。") - else: - logger.debug(f"{log_prefix_manager} 随机停用周期结束, 未停用任何子心流。") - async def evaluate_and_transition_subflows_by_llm(self): """ 使用LLM评估每个子心流的状态,并根据LLM的判断执行状态转换(ABSENT <-> CHAT)。 注意:此函数包含对假设的LLM函数的调用。 """ - log_prefix = "[LLM状态评估]" - logger.info(f"{log_prefix} 开始基于LLM评估子心流状态...") - # 获取当前状态和限制,用于CHAT激活检查 current_mai_state = self.mai_state_info.get_current_state() chat_limit = current_mai_state.get_normal_chat_max_num() @@ -366,33 +313,27 @@ class SubHeartflowManager: current_chat_count = self.count_subflows_by_state_nolock(ChatState.CHAT) if not subflows_snapshot: - logger.info(f"{log_prefix} 当前没有子心流需要评估。") + logger.info("当前没有子心流需要评估。") return for sub_hf in subflows_snapshot: flow_id = sub_hf.subheartflow_id stream_name = chat_manager.get_stream_name(flow_id) or flow_id + log_prefix = f"[{stream_name}]" current_subflow_state = sub_hf.chat_state.chat_status - # --- 获取观察内容 --- - # 从 sub_hf.observations 获取 ChattingObservation 并提取信息 + _observation_summary = "没有可用的观察信息。" # 默认值 - try: - # 检查 observations 列表是否存在且不为空 - # 假设第一个观察者是 ChattingObservation - first_observation = sub_hf.observations[0] - if isinstance(first_observation, ChattingObservation): - # 组合中期记忆和当前聊天内容 - current_chat = first_observation.talking_message_str or "当前无聊天内容。" - combined_summary = f"当前聊天内容:\n{current_chat}" - else: - logger.warning(f"{log_prefix} [{stream_name}] 第一个观察者不是 ChattingObservation 类型。") + first_observation = sub_hf.observations[0] + if isinstance(first_observation, ChattingObservation): + # 组合中期记忆和当前聊天内容 + current_chat = first_observation.talking_message_str or "当前无聊天内容。" + combined_summary = f"当前聊天内容:\n{current_chat}" + else: + logger.warning(f"{log_prefix} [{stream_name}] 第一个观察者不是 ChattingObservation 类型。") + - except Exception as e: - logger.warning(f"{log_prefix} [{stream_name}] 获取观察信息失败: {e}", exc_info=True) - # 保留默认值或错误信息 - combined_summary = f"获取观察信息时出错: {e}" # --- 获取麦麦状态 --- mai_state_description = f"麦麦当前状态: {current_mai_state.value}。" @@ -414,7 +355,7 @@ class SubHeartflowManager: # 调用LLM评估 should_activate = await self._llm_evaluate_state_transition(prompt) if should_activate is None: # 处理解析失败或意外情况 - logger.warning(f"{log_prefix} [{stream_name}] LLM评估返回无效结果,跳过。") + logger.warning(f"{log_prefix}LLM评估返回无效结果,跳过。") continue if should_activate: @@ -423,17 +364,19 @@ class SubHeartflowManager: current_chat_count = self.count_subflows_by_state_nolock(ChatState.CHAT) if current_chat_count < chat_limit: logger.info( - f"{log_prefix} [{stream_name}] LLM建议激活到CHAT状态,且未达上限({current_chat_count}/{chat_limit})。正在尝试转换..." + f"{log_prefix}LLM建议激活到CHAT状态,且未达上限({current_chat_count}/{chat_limit})。正在尝试转换..." ) await sub_hf.change_chat_state(ChatState.CHAT) if sub_hf.chat_state.chat_status == ChatState.CHAT: transitioned_to_chat += 1 else: - logger.warning(f"{log_prefix} [{stream_name}] 尝试激活到CHAT失败。") + logger.warning(f"{log_prefix}尝试激活到CHAT失败。") else: logger.info( - f"{log_prefix} [{stream_name}] LLM建议激活到CHAT状态,但已达到上限({current_chat_count}/{chat_limit})。跳过转换。" + f"{log_prefix}LLM建议激活到CHAT状态,但已达到上限({current_chat_count}/{chat_limit})。跳过转换。" ) + else: + logger.info(f"{log_prefix}LLM建议不激活到CHAT状态。") # --- 针对 CHAT 状态 --- elif current_subflow_state == ChatState.CHAT: @@ -452,20 +395,18 @@ class SubHeartflowManager: # 调用LLM评估 should_deactivate = await self._llm_evaluate_state_transition(prompt) if should_deactivate is None: # 处理解析失败或意外情况 - logger.warning(f"{log_prefix} [{stream_name}] LLM评估返回无效结果,跳过。") + logger.warning(f"{log_prefix}LLM评估返回无效结果,跳过。") continue if should_deactivate: - logger.info(f"{log_prefix} [{stream_name}] LLM建议进入ABSENT状态。正在尝试转换...") + logger.info(f"{log_prefix}LLM建议进入ABSENT状态。正在尝试转换...") await sub_hf.change_chat_state(ChatState.ABSENT) if sub_hf.chat_state.chat_status == ChatState.ABSENT: transitioned_to_absent += 1 - - logger.info( - f"{log_prefix} LLM评估周期结束。" - f" 成功转换到CHAT: {transitioned_to_chat}." - f" 成功转换到ABSENT: {transitioned_to_absent}." - ) + else: + logger.info(f"{log_prefix}LLM建议不进入ABSENT状态。") + + async def _llm_evaluate_state_transition(self, prompt: str) -> Optional[bool]: """ @@ -482,7 +423,8 @@ class SubHeartflowManager: try: # --- 真实的 LLM 调用 --- response_text, _ = await self.llm_state_evaluator.generate_response_async(prompt) - logger.debug(f"{log_prefix} 使用模型 {self.llm_state_evaluator.model_name} 评估,原始响应: ```{response_text}```") + logger.debug(f"{log_prefix} 使用模型 {self.llm_state_evaluator.model_name} 评估") + logger.debug(f"{log_prefix} 原始响应: {response_text}") # --- 解析 JSON 响应 --- try: @@ -573,3 +515,46 @@ class SubHeartflowManager: logger.error(f"删除 SubHeartflow {subheartflow_id} 时出错: {e}", exc_info=True) else: logger.warning(f"尝试删除不存在的 SubHeartflow: {subheartflow_id}") + + # --- 新增:处理 HFC 无回复回调的专用方法 --- # + async def _handle_hfc_no_reply(self, subheartflow_id: Any): + """处理来自 HeartFChatting 的连续无回复信号 (通过 partial 绑定 ID)""" + # 注意:这里不需要再获取锁,因为 request_absent_transition 内部会处理锁 + logger.debug(f"[管理器 HFC 处理器] 接收到来自 {subheartflow_id} 的 HFC 无回复信号") + await self.request_absent_transition(subheartflow_id) + # --- 结束新增 --- # + + # --- 新增:处理来自 HeartFChatting 的状态转换请求 --- # + async def request_absent_transition(self, subflow_id: Any): + """ + 接收来自 HeartFChatting 的请求,将特定子心流的状态转换为 ABSENT。 + 通常在连续多次 "no_reply" 后被调用。 + + Args: + subflow_id: 需要转换状态的子心流 ID。 + """ + async with self._lock: + subflow = self.subheartflows.get(subflow_id) + if not subflow: + logger.warning(f"[状态转换请求] 尝试转换不存在的子心流 {subflow_id} 到 ABSENT") + return + + stream_name = chat_manager.get_stream_name(subflow_id) or subflow_id + current_state = subflow.chat_state.chat_status + + # 仅当子心流处于 FOCUSED 状态时才进行转换 + # 因为 HeartFChatting 只在 FOCUSED 状态下运行 + if current_state == ChatState.FOCUSED: + logger.info(f"[状态转换请求] 接收到请求,将 {stream_name} (当前: {current_state.value}) 转换为 ABSENT") + try: + await subflow.change_chat_state(ChatState.ABSENT) + logger.info(f"[状态转换请求] {stream_name} 状态已成功转换为 ABSENT") + except Exception as e: + logger.error(f"[状态转换请求] 转换 {stream_name} 到 ABSENT 时出错: {e}", exc_info=True) + elif current_state == ChatState.ABSENT: + logger.debug(f"[状态转换请求] {stream_name} 已处于 ABSENT 状态,无需转换") + else: + logger.warning( + f"[状态转换请求] 收到对 {stream_name} 的请求,但其状态为 {current_state.value} (非 FOCUSED),不执行转换" + ) + # --- 结束新增 --- # diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index 2a33d0671..9a2862adb 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -2,7 +2,7 @@ import asyncio import time import traceback import random # <-- 添加导入 -from typing import List, Optional, Dict, Any, Deque +from typing import List, Optional, Dict, Any, Deque, Callable, Coroutine from collections import deque from src.plugins.chat.message import MessageRecv, BaseMessageInfo, MessageThinking, MessageSending from src.plugins.chat.message import Seg # Local import needed after move @@ -25,7 +25,6 @@ import contextlib from src.plugins.utils.chat_message_builder import num_new_messages_since from src.plugins.heartFC_chat.heartFC_Cycleinfo import CycleInfo from .heartFC_sender import HeartFCSender -# --- End import --- INITIAL_DURATION = 60.0 @@ -155,18 +154,30 @@ class HeartFChatting: 其生命周期现在由其关联的 SubHeartflow 的 FOCUSED 状态控制。 """ - def __init__(self, chat_id: str, sub_mind: SubMind, observations: Observation): + CONSECUTIVE_NO_REPLY_THRESHOLD = 4 # 连续不回复的阈值 + + def __init__( + self, + chat_id: str, + sub_mind: SubMind, + observations: Observation, + on_consecutive_no_reply_callback: Callable[[], Coroutine[None, None, None]] + ): """ HeartFChatting 初始化函数 参数: chat_id: 聊天流唯一标识符(如stream_id) + sub_mind: 关联的子思维 + observations: 关联的观察列表 + on_consecutive_no_reply_callback: 连续不回复达到阈值时调用的异步回调函数 """ # 基础属性 self.stream_id: str = chat_id # 聊天流ID self.chat_stream: Optional[ChatStream] = None # 关联的聊天流 self.sub_mind: SubMind = sub_mind # 关联的子思维 self.observations: List[Observation] = observations # 关联的观察列表,用于监控聊天流状态 + self.on_consecutive_no_reply_callback = on_consecutive_no_reply_callback # 日志前缀 self.log_prefix: str = f"[{chat_manager.get_stream_name(chat_id) or chat_id}]" @@ -198,6 +209,8 @@ class HeartFChatting: self._cycle_counter = 0 self._cycle_history: Deque[CycleInfo] = deque(maxlen=10) # 保留最近10个循环的信息 self._current_cycle: Optional[CycleInfo] = None + self._lian_xu_bu_hui_fu_ci_shu: int = 0 # <--- 新增:连续不回复计数器 + self._shutting_down: bool = False # <--- 新增:关闭标志位 async def _initialize(self) -> bool: """ @@ -276,6 +289,12 @@ class HeartFChatting: """主循环,持续进行计划并可能回复消息,直到被外部取消。""" try: while True: # 主循环 + # --- 在循环开始处检查关闭标志 --- + if self._shutting_down: + logger.info(f"{self.log_prefix} 检测到关闭标志,退出 HFC 循环。") + break + # -------------------------------- + # 创建新的循环信息 self._cycle_counter += 1 self._current_cycle = CycleInfo(self._cycle_counter) @@ -287,6 +306,12 @@ class HeartFChatting: # 执行规划和处理阶段 async with self._get_cycle_context() as acquired_lock: if not acquired_lock: + # 如果未能获取锁(理论上不太可能,除非 shutdown 过程中释放了但又被抢了?) + # 或者也可以在这里再次检查 self._shutting_down + if self._shutting_down: + break # 再次检查,确保退出 + logger.warning(f"{self.log_prefix} 未能获取循环处理锁,跳过本次循环。") + await asyncio.sleep(0.1) # 短暂等待避免空转 continue # 记录规划开始时间点 @@ -320,7 +345,11 @@ class HeartFChatting: ) except asyncio.CancelledError: - logger.info(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)被取消了") + # 设置了关闭标志位后被取消是正常流程 + if not self._shutting_down: + logger.warning(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)循环意外被取消") + else: + logger.info(f"{self.log_prefix} HeartFChatting: 麦麦的认真水群(HFC)循环已取消 (正常关闭)") except Exception as e: logger.error(f"{self.log_prefix} HeartFChatting: 意外错误: {e}") logger.error(traceback.format_exc()) @@ -451,6 +480,8 @@ class HeartFChatting: return await handler(reasoning, planner_start_db_time, cycle_timers), "" except HeartFCError as e: logger.error(f"{self.log_prefix} 处理{action}时出错: {e}") + # 出错时也重置计数器 + self._lian_xu_bu_hui_fu_ci_shu = 0 return False, "" async def _handle_text_reply(self, reasoning: str, emoji_query: str, cycle_timers: dict) -> tuple[bool, str]: @@ -471,6 +502,8 @@ class HeartFChatting: 返回: tuple[bool, str]: (是否回复成功, 思考消息ID) """ + # 重置连续不回复计数器 + self._lian_xu_bu_hui_fu_ci_shu = 0 # 获取锚点消息 anchor_message = await self._get_anchor_message() @@ -544,8 +577,9 @@ class HeartFChatting: 处理不回复的情况 工作流程: - 1. 等待新消息 - 2. 超时或收到新消息时返回 + 1. 等待新消息、超时或关闭信号 + 2. 根据等待结果更新连续不回复计数 + 3. 如果达到阈值,触发回调 参数: reasoning: 不回复的原因 @@ -561,14 +595,36 @@ class HeartFChatting: try: with Timer("等待新消息", cycle_timers): - return await self._wait_for_new_message(observation, planner_start_db_time, self.log_prefix) + # 等待新消息、超时或关闭信号,并获取结果 + await self._wait_for_new_message(observation, planner_start_db_time, self.log_prefix) + + if not self._shutting_down: + self._lian_xu_bu_hui_fu_ci_shu += 1 + logger.debug(f"{self.log_prefix} 连续不回复计数增加: {self._lian_xu_bu_hui_fu_ci_shu}/{self.CONSECUTIVE_NO_REPLY_THRESHOLD}") + + # 检查是否达到阈值 + if self._lian_xu_bu_hui_fu_ci_shu >= self.CONSECUTIVE_NO_REPLY_THRESHOLD: + logger.info(f"{self.log_prefix} 连续不回复达到阈值 ({self._lian_xu_bu_hui_fu_ci_shu}次),调用回调请求状态转换") + # 调用回调。注意:这里不重置计数器,依赖回调函数成功改变状态来隐式重置上下文。 + await self.on_consecutive_no_reply_callback() + + + return True + except asyncio.CancelledError: - logger.info(f"{self.log_prefix} 等待被中断") + # 如果在等待过程中任务被取消(可能是因为 shutdown) + logger.info(f"{self.log_prefix} 处理 'no_reply' 时等待被中断 (CancelledError)") + # 让异常向上传播,由 _hfc_loop 的异常处理逻辑接管 raise + except Exception as e: # 捕获调用管理器或其他地方可能发生的错误 + logger.error(f"{self.log_prefix} 处理 'no_reply' 时发生错误: {e}") + logger.error(traceback.format_exc()) + # 发生意外错误时,可以选择是否重置计数器,这里选择不重置 + return False # 表示动作未成功 async def _wait_for_new_message(self, observation, planner_start_db_time: float, log_prefix: str) -> bool: """ - 等待新消息 + 等待新消息 或 检测到关闭信号 参数: observation: 观察实例 @@ -576,19 +632,36 @@ class HeartFChatting: log_prefix: 日志前缀 返回: - bool: 是否检测到新消息 + bool: 是否检测到新消息 (如果因关闭信号退出则返回 False) """ wait_start_time = time.monotonic() while True: + # --- 在每次循环开始时检查关闭标志 --- + if self._shutting_down: + logger.info(f"{log_prefix} 等待新消息时检测到关闭信号,中断等待。") + return False # 表示因为关闭而退出 + # ----------------------------------- + + # 检查新消息 if await observation.has_new_messages_since(planner_start_db_time): logger.info(f"{log_prefix} 检测到新消息") return True + # 检查超时 (放在检查新消息和关闭之后) if time.monotonic() - wait_start_time > 120: - logger.warning(f"{log_prefix} 等待超时(120秒)") + logger.warning(f"{log_prefix} 等待新消息超时(20秒)") return False - await asyncio.sleep(1.5) + try: + # 短暂休眠,让其他任务有机会运行,并能更快响应取消或关闭 + await asyncio.sleep(0.5) # 缩短休眠时间 + except asyncio.CancelledError: + # 如果在休眠时被取消,再次检查关闭标志 + # 如果是正常关闭,则不需要警告 + if not self._shutting_down: + logger.warning(f"{log_prefix} _wait_for_new_message 的休眠被意外取消") + # 无论如何,重新抛出异常,让上层处理 + raise async def _log_cycle_timers(self, cycle_timers: dict, log_prefix: str): """记录循环周期的计时器结果""" @@ -599,7 +672,9 @@ class HeartFChatting: timer_strings.append(f"{name}: {formatted_time}") if timer_strings: - logger.debug(f"{log_prefix} 该次决策耗时: {'; '.join(timer_strings)}") + # 在记录前检查关闭标志 + if not self._shutting_down: + logger.debug(f"{log_prefix} 该次决策耗时: {'; '.join(timer_strings)}") async def _handle_cycle_delay(self, action_taken_this_cycle: bool, cycle_start_time: float, log_prefix: str): """处理循环延迟""" @@ -835,6 +910,7 @@ class HeartFChatting: async def shutdown(self): """优雅关闭HeartFChatting实例,取消活动循环任务""" logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...") + self._shutting_down = True # <-- 在开始关闭时设置标志位 # 取消循环任务 if self._loop_task and not self._loop_task.done(): @@ -865,6 +941,25 @@ class HeartFChatting: action=action, reasoning=reasoning, ) + + # 在记录循环日志前检查关闭标志 + if not self._shutting_down: + self._current_cycle.complete_cycle() + self._cycle_history.append(self._current_cycle) + + # 记录循环信息和计时器结果 + timer_strings = [] + for name, elapsed in self._current_cycle.timers.items(): + formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" + timer_strings.append(f"{name}: {formatted_time}") + + logger.debug( + f"{self.log_prefix} 第 #{self._current_cycle.cycle_id}次思考完成," + f"耗时: {self._current_cycle.end_time - self._current_cycle.start_time:.2f}秒, " + f"动作: {self._current_cycle.action_type}" + + (f"\n计时器详情: {'; '.join(timer_strings)}" if timer_strings else "") + ) + return prompt async def _build_planner_prompt( From c012d29cbf8eb86c2f3d369cbf95df4a6c62ba71 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sat, 26 Apr 2025 21:00:21 +0800 Subject: [PATCH 2/7] =?UTF-8?q?dev=EF=BC=9A=E8=AE=A9=E9=BA=A6=E9=BA=A6?= =?UTF-8?q?=E6=9B=B4=E6=84=BF=E6=84=8F=E8=AF=B4=E8=AF=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/heart_flow/sub_mind.py | 2 +- src/heart_flow/subheartflow_manager.py | 173 ++++++++++++++++--------- 2 files changed, 112 insertions(+), 63 deletions(-) diff --git a/src/heart_flow/sub_mind.py b/src/heart_flow/sub_mind.py index be995b843..f340b717b 100644 --- a/src/heart_flow/sub_mind.py +++ b/src/heart_flow/sub_mind.py @@ -103,7 +103,7 @@ class SubMind: individuality = Individuality.get_instance() # 构建个性部分 - prompt_personality = f"你的名字是{individuality.personality.bot_nickname},你" + prompt_personality = f"你正在扮演名为{individuality.personality.bot_nickname}的人类,你" prompt_personality += individuality.personality.personality_core # 随机添加个性侧面 diff --git a/src/heart_flow/subheartflow_manager.py b/src/heart_flow/subheartflow_manager.py index a8403d4f8..314bf7115 100644 --- a/src/heart_flow/subheartflow_manager.py +++ b/src/heart_flow/subheartflow_manager.py @@ -19,9 +19,10 @@ from .observation import ChattingObservation # 导入LLM请求工具 from src.plugins.models.utils_model import LLMRequest from src.config.config import global_config - +from src.individuality.individuality import Individuality import traceback + # 初始化日志记录器 subheartflow_manager_log_config = LogConfig( @@ -110,24 +111,53 @@ class SubHeartflowManager: logger.error(f"创建子心流 {subheartflow_id} 失败: {e}", exc_info=True) return None + # --- 新增:内部方法,用于尝试将单个子心流设置为 ABSENT --- + async def _try_set_subflow_absent_internal(self, subflow: "SubHeartflow", log_prefix: str) -> bool: + """ + 尝试将给定的子心流对象状态设置为 ABSENT (内部方法,不处理锁)。 + + Args: + subflow: 子心流对象。 + log_prefix: 用于日志记录的前缀 (例如 "[子心流管理]" 或 "[停用]")。 + + Returns: + bool: 如果状态成功变为 ABSENT 或原本就是 ABSENT,返回 True;否则返回 False。 + """ + flow_id = subflow.subheartflow_id + stream_name = chat_manager.get_stream_name(flow_id) or flow_id + + if subflow.chat_state.chat_status != ChatState.ABSENT: + logger.debug(f"{log_prefix} 设置 {stream_name} 状态为 ABSENT") + try: + await subflow.change_chat_state(ChatState.ABSENT) + # 再次检查以确认状态已更改 (change_chat_state 内部应确保) + if subflow.chat_state.chat_status == ChatState.ABSENT: + return True + else: + logger.warning(f"{log_prefix} 调用 change_chat_state 后,{stream_name} 状态仍为 {subflow.chat_state.chat_status.value}") + return False + except Exception as e: + logger.error(f"{log_prefix} 设置 {stream_name} 状态为 ABSENT 时失败: {e}", exc_info=True) + return False + else: + logger.debug(f"{log_prefix} {stream_name} 已是 ABSENT 状态") + return True # 已经是目标状态,视为成功 + # --- 结束新增 --- + async def sleep_subheartflow(self, subheartflow_id: Any, reason: str) -> bool: - """停止指定的子心流并清理资源""" - subheartflow = self.subheartflows.get(subheartflow_id) - if not subheartflow: - return False + """停止指定的子心流并将其状态设置为 ABSENT""" + log_prefix = "[子心流管理]" + async with self._lock: # 加锁以安全访问字典 + subheartflow = self.subheartflows.get(subheartflow_id) - stream_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id - logger.info(f"[子心流管理] 正在停止 {stream_name}, 原因: {reason}") + stream_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id + logger.info(f"{log_prefix} 正在停止 {stream_name}, 原因: {reason}") - try: - # 设置状态为ABSENT释放资源 - if subheartflow.chat_state.chat_status != ChatState.ABSENT: - logger.debug(f"[子心流管理] 设置 {stream_name} 状态为ABSENT") - await subheartflow.change_chat_state(ChatState.ABSENT) - else: - logger.debug(f"[子心流管理] {stream_name} 已是ABSENT状态") - except Exception as e: - logger.error(f"[子心流管理] 设置ABSENT状态失败: {e}") + # 调用内部方法处理状态变更 + success = await self._try_set_subflow_absent_internal(subheartflow, log_prefix) + + return success + # 锁在此处自动释放 def get_inactive_subheartflows(self, max_age_seconds=INACTIVE_THRESHOLD_SECONDS): """识别并返回需要清理的不活跃(处于ABSENT状态超过一小时)子心流(id, 原因)""" @@ -195,44 +225,37 @@ class SubHeartflowManager: async def deactivate_all_subflows(self): """将所有子心流的状态更改为 ABSENT (例如主状态变为OFFLINE时调用)""" - # logger.info("[停用] 开始将所有子心流状态设置为 ABSENT") - # 使用 list() 创建一个当前值的快照,防止在迭代时修改字典 - flows_to_update = list(self.subheartflows.values()) - - if not flows_to_update: - logger.debug("[停用] 无活跃子心流,无需操作") - return - + log_prefix = "[停用]" changed_count = 0 - for subflow in flows_to_update: - flow_id = subflow.subheartflow_id - stream_name = chat_manager.get_stream_name(flow_id) or flow_id - # 再次检查子心流是否仍然存在于管理器中,以防万一在迭代过程中被移除 + processed_count = 0 - if subflow.chat_state.chat_status != ChatState.ABSENT: - logger.debug( - f"正在将子心流 {stream_name} 的状态从 {subflow.chat_state.chat_status.value} 更改为 ABSENT" - ) - try: - # 调用 change_chat_state 将状态设置为 ABSENT - await subflow.change_chat_state(ChatState.ABSENT) - # 验证状态是否真的改变了 - if ( - flow_id in self.subheartflows - and self.subheartflows[flow_id].chat_state.chat_status == ChatState.ABSENT - ): + async with self._lock: # 获取锁以安全迭代 + # 使用 list() 创建一个当前值的快照,防止在迭代时修改字典 + flows_to_update = list(self.subheartflows.values()) + processed_count = len(flows_to_update) + if not flows_to_update: + logger.debug(f"{log_prefix} 无活跃子心流,无需操作") + return + + for subflow in flows_to_update: + # 记录原始状态,以便统计实际改变的数量 + original_state_was_absent = (subflow.chat_state.chat_status == ChatState.ABSENT) + + + success = await self._try_set_subflow_absent_internal(subflow, log_prefix) + + # 如果成功设置为 ABSENT 且原始状态不是 ABSENT,则计数 + if success and not original_state_was_absent: + if subflow.chat_state.chat_status == ChatState.ABSENT: changed_count += 1 else: - logger.warning( - f"[停用] 尝试更改子心流 {stream_name} 状态后,状态仍未变为 ABSENT 或子心流已消失。" - ) - except Exception as e: - logger.error(f"[停用] 更改子心流 {stream_name} 状态为 ABSENT 时出错: {e}", exc_info=True) - else: - logger.debug(f"[停用] 子心流 {stream_name} 已处于 ABSENT 状态,无需更改。") + # 这种情况理论上不应发生,如果内部方法返回 True 的话 + stream_name = chat_manager.get_stream_name(subflow.subheartflow_id) or subflow.subheartflow_id + logger.warning(f"{log_prefix} 内部方法声称成功但 {stream_name} 状态未变为 ABSENT。") + # 锁在此处自动释放 logger.info( - f"下限完成,共处理 {len(flows_to_update)} 个子心流,成功将 {changed_count} 个子心流的状态更改为 ABSENT。" + f"{log_prefix} 完成,共处理 {processed_count} 个子心流,成功将 {changed_count} 个非 ABSENT 子心流的状态更改为 ABSENT。" ) async def evaluate_interest_and_promote(self): @@ -328,6 +351,7 @@ class SubHeartflowManager: first_observation = sub_hf.observations[0] if isinstance(first_observation, ChattingObservation): # 组合中期记忆和当前聊天内容 + first_observation.observe() current_chat = first_observation.talking_message_str or "当前无聊天内容。" combined_summary = f"当前聊天内容:\n{current_chat}" else: @@ -336,19 +360,40 @@ class SubHeartflowManager: # --- 获取麦麦状态 --- - mai_state_description = f"麦麦当前状态: {current_mai_state.value}。" + mai_state_description = f"你当前状态: {current_mai_state.value}。" + + # 获取个性化信息 + individuality = Individuality.get_instance() + + # 构建个性部分 + prompt_personality = f"你正在扮演名为{individuality.personality.bot_nickname}的人类,你" + prompt_personality += individuality.personality.personality_core + + # 随机添加个性侧面 + if individuality.personality.personality_sides: + random_side = random.choice(individuality.personality.personality_sides) + prompt_personality += f",{random_side}" + + # 随机添加身份细节 + if individuality.identity.identity_detail: + random_detail = random.choice(individuality.identity.identity_detail) + prompt_personality += f",{random_detail}" + # --- 针对 ABSENT 状态 --- if current_subflow_state == ChatState.ABSENT: # 构建Prompt prompt = ( - f"子心流 [{stream_name}] 当前处于非活跃(ABSENT)状态.\n" + f"{prompt_personality}\n" + f"你当前没有在: [{stream_name}] 群中聊天。\n" f"{mai_state_description}\n" - f"最近观察到的内容摘要:\n---\n{combined_summary}\n---\n" - f"基于以上信息,该子心流是否表现出足够的活跃迹象或重要性," - f"值得将其唤醒并进入常规聊天(CHAT)状态?\n" - f"请以 JSON 格式回答,包含一个键 'decision',其值为 true 或 false.\n" - f"例如:{{\"decision\": true}}\n" + f"这个群里最近的聊天内容是:\n---\n{combined_summary}\n---\n" + f"基于以上信息,请判断你是否愿意在这个群开始闲聊," + f"进入常规聊天(CHAT)状态?\n" + f"给出你的判断,和理由,然后以 JSON 格式回答" + f"包含键 'decision',如果要开始聊天,值为 true ,否则为 false.\n" + f"包含键 'reason',其值为你的理由。\n" + f"例如:{{\"decision\": true, \"reason\": \"因为我想聊天\"}}\n" f"请只输出有效的 JSON 对象。" ) @@ -382,13 +427,16 @@ class SubHeartflowManager: elif current_subflow_state == ChatState.CHAT: # 构建Prompt prompt = ( - f"子心流 [{stream_name}] 当前处于常规聊天(CHAT)状态.\n" + f"{prompt_personality}\n" + f"你正在在: [{stream_name}] 群中聊天。\n" f"{mai_state_description}\n" - f"最近观察到的内容摘要:\n---\n{combined_summary}\n---\n" - f"基于以上信息,该子心流是否表现出不活跃、对话结束或不再需要关注的迹象," - f"应该让其进入休眠(ABSENT)状态?\n" - f"请以 JSON 格式回答,包含一个键 'decision',其值为 true (表示应休眠) 或 false (表示不应休眠).\n" - f"例如:{{\"decision\": true}}\n" + f"这个群里最近的聊天内容是:\n---\n{combined_summary}\n---\n" + f"基于以上信息,请判断你是否愿意在这个群继续闲聊," + f"还是暂时离开聊天,进入休眠状态?\n" + f"给出你的判断,和理由,然后以 JSON 格式回答" + f"包含键 'decision',如果要离开聊天,值为 true ,否则为 false.\n" + f"包含键 'reason',其值为你的理由。\n" + f"例如:{{\"decision\": true, \"reason\": \"因为我想休息\"}}\n" f"请只输出有效的 JSON 对象。" ) @@ -423,7 +471,8 @@ class SubHeartflowManager: try: # --- 真实的 LLM 调用 --- response_text, _ = await self.llm_state_evaluator.generate_response_async(prompt) - logger.debug(f"{log_prefix} 使用模型 {self.llm_state_evaluator.model_name} 评估") + # logger.debug(f"{log_prefix} 使用模型 {self.llm_state_evaluator.model_name} 评估") + logger.debug(f"{log_prefix} 原始输入: {prompt}") logger.debug(f"{log_prefix} 原始响应: {response_text}") # --- 解析 JSON 响应 --- From e99876a02a390e8ff8a360e9a195f450d2414c0a Mon Sep 17 00:00:00 2001 From: UnCLAS-Prommer Date: Sat, 26 Apr 2025 21:45:09 +0800 Subject: [PATCH 3/7] minor fix --- src/plugins/models/utils_model.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/plugins/models/utils_model.py b/src/plugins/models/utils_model.py index 2cab7b629..5ab82b42b 100644 --- a/src/plugins/models/utils_model.py +++ b/src/plugins/models/utils_model.py @@ -820,6 +820,7 @@ class LLMRequest: policy = request_content["policy"] payload = request_content["payload"] wait_time = policy["base_wait"] * (2**retry_count) + keep_request = False if retry_count < policy["max_retries"] - 1: keep_request = True if isinstance(exception, RequestAbortException): From dafc5ded95632bae04240dd5bfc9a40a9a928ba4 Mon Sep 17 00:00:00 2001 From: UnCLAS-Prommer Date: Sat, 26 Apr 2025 21:46:01 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=B3=A8=E9=87=8A?= =?UTF-8?q?=E6=8E=89=E7=9A=84=E8=AF=B7=E6=B1=82=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/models/utils_model.py | 389 ------------------------------ 1 file changed, 389 deletions(-) diff --git a/src/plugins/models/utils_model.py b/src/plugins/models/utils_model.py index 5ab82b42b..7c87cf946 100644 --- a/src/plugins/models/utils_model.py +++ b/src/plugins/models/utils_model.py @@ -178,395 +178,6 @@ class LLMRequest: output_cost = (completion_tokens / 1000000) * self.pri_out return round(input_cost + output_cost, 6) - ''' - async def _execute_request( - self, - endpoint: str, - prompt: str = None, - image_base64: str = None, - image_format: str = None, - payload: dict = None, - retry_policy: dict = None, - response_handler: callable = None, - user_id: str = "system", - request_type: str = None, - ): - """统一请求执行入口 - Args: - endpoint: API端点路径 (如 "chat/completions") - prompt: prompt文本 - image_base64: 图片的base64编码 - image_format: 图片格式 - payload: 请求体数据 - retry_policy: 自定义重试策略 - response_handler: 自定义响应处理器 - user_id: 用户ID - request_type: 请求类型 - """ - - if request_type is None: - request_type = self.request_type - - # 合并重试策略 - default_retry = { - "max_retries": 3, - "base_wait": 10, - "retry_codes": [429, 413, 500, 503], - "abort_codes": [400, 401, 402, 403], - } - policy = {**default_retry, **(retry_policy or {})} - - # 常见Error Code Mapping - error_code_mapping = { - 400: "参数不正确", - 401: "API key 错误,认证失败,请检查/config/bot_config.toml和.env中的配置是否正确哦~", - 402: "账号余额不足", - 403: "需要实名,或余额不足", - 404: "Not Found", - 429: "请求过于频繁,请稍后再试", - 500: "服务器内部故障", - 503: "服务器负载过高", - } - - api_url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}" - # 判断是否为流式 - stream_mode = self.stream - # logger_msg = "进入流式输出模式," if stream_mode else "" - # logger.debug(f"{logger_msg}发送请求到URL: {api_url}") - # logger.info(f"使用模型: {self.model_name}") - - # 构建请求体 - if image_base64: - payload = await self._build_payload(prompt, image_base64, image_format) - elif payload is None: - payload = await self._build_payload(prompt) - - # 流式输出标志 - # 先构建payload,再添加流式输出标志 - if stream_mode: - payload["stream"] = stream_mode - - for retry in range(policy["max_retries"]): - try: - # 使用上下文管理器处理会话 - headers = await self._build_headers() - # 似乎是openai流式必须要的东西,不过阿里云的qwq-plus加了这个没有影响 - if stream_mode: - headers["Accept"] = "text/event-stream" - - async with aiohttp.ClientSession() as session: - try: - async with session.post(api_url, headers=headers, json=payload) as response: - # 处理需要重试的状态码 - if response.status in policy["retry_codes"]: - wait_time = policy["base_wait"] * (2**retry) - logger.warning( - f"模型 {self.model_name} 错误码: {response.status}, 等待 {wait_time}秒后重试" - ) - if response.status == 413: - logger.warning("请求体过大,尝试压缩...") - image_base64 = compress_base64_image_by_scale(image_base64) - payload = await self._build_payload(prompt, image_base64, image_format) - elif response.status in [500, 503]: - logger.error( - f"模型 {self.model_name} 错误码: {response.status} - {error_code_mapping.get(response.status)}" - ) - raise RuntimeError("服务器负载过高,模型恢复失败QAQ") - else: - logger.warning(f"模型 {self.model_name} 请求限制(429),等待{wait_time}秒后重试...") - - await asyncio.sleep(wait_time) - continue - elif response.status in policy["abort_codes"]: - logger.error( - f"模型 {self.model_name} 错误码: {response.status} - {error_code_mapping.get(response.status)}" - ) - # 尝试获取并记录服务器返回的详细错误信息 - try: - error_json = await response.json() - if error_json and isinstance(error_json, list) and len(error_json) > 0: - for error_item in error_json: - if "error" in error_item and isinstance(error_item["error"], dict): - error_obj = error_item["error"] - error_code = error_obj.get("code") - error_message = error_obj.get("message") - error_status = error_obj.get("status") - logger.error( - f"服务器错误详情: 代码={error_code}, 状态={error_status}, " - f"消息={error_message}" - ) - elif isinstance(error_json, dict) and "error" in error_json: - # 处理单个错误对象的情况 - error_obj = error_json.get("error", {}) - error_code = error_obj.get("code") - error_message = error_obj.get("message") - error_status = error_obj.get("status") - logger.error( - f"服务器错误详情: 代码={error_code}, 状态={error_status}, 消息={error_message}" - ) - else: - # 记录原始错误响应内容 - logger.error(f"服务器错误响应: {error_json}") - except Exception as e: - logger.warning(f"无法解析服务器错误响应: {str(e)}") - - if response.status == 403: - # 只针对硅基流动的V3和R1进行降级处理 - if ( - self.model_name.startswith("Pro/deepseek-ai") - and self.base_url == "https://api.siliconflow.cn/v1/" - ): - old_model_name = self.model_name - self.model_name = self.model_name[4:] # 移除"Pro/"前缀 - logger.warning( - f"检测到403错误,模型从 {old_model_name} 降级为 {self.model_name}" - ) - - # 对全局配置进行更新 - if global_config.llm_normal.get("name") == old_model_name: - global_config.llm_normal["name"] = self.model_name - logger.warning(f"将全局配置中的 llm_normal 模型临时降级至{self.model_name}") - - if global_config.llm_reasoning.get("name") == old_model_name: - global_config.llm_reasoning["name"] = self.model_name - logger.warning( - f"将全局配置中的 llm_reasoning 模型临时降级至{self.model_name}" - ) - - # 更新payload中的模型名 - if payload and "model" in payload: - payload["model"] = self.model_name - - # 重新尝试请求 - retry -= 1 # 不计入重试次数 - continue - - raise RuntimeError(f"请求被拒绝: {error_code_mapping.get(response.status)}") - - response.raise_for_status() - reasoning_content = "" - - # 将流式输出转化为非流式输出 - if stream_mode: - flag_delta_content_finished = False - accumulated_content = "" - usage = None # 初始化usage变量,避免未定义错误 - - async for line_bytes in response.content: - try: - line = line_bytes.decode("utf-8").strip() - if not line: - continue - if line.startswith("data:"): - data_str = line[5:].strip() - if data_str == "[DONE]": - break - try: - chunk = json.loads(data_str) - if flag_delta_content_finished: - chunk_usage = chunk.get("usage", None) - if chunk_usage: - usage = chunk_usage # 获取token用量 - else: - delta = chunk["choices"][0]["delta"] - delta_content = delta.get("content") - if delta_content is None: - delta_content = "" - accumulated_content += delta_content - # 检测流式输出文本是否结束 - finish_reason = chunk["choices"][0].get("finish_reason") - if delta.get("reasoning_content", None): - reasoning_content += delta["reasoning_content"] - if finish_reason == "stop": - chunk_usage = chunk.get("usage", None) - if chunk_usage: - usage = chunk_usage - break - # 部分平台在文本输出结束前不会返回token用量,此时需要再获取一次chunk - flag_delta_content_finished = True - - except Exception as e: - logger.exception(f"模型 {self.model_name} 解析流式输出错误: {str(e)}") - except GeneratorExit: - logger.warning("模型 {self.model_name} 流式输出被中断,正在清理资源...") - # 确保资源被正确清理 - await response.release() - # 返回已经累积的内容 - result = { - "choices": [ - { - "message": { - "content": accumulated_content, - "reasoning_content": reasoning_content, - # 流式输出可能没有工具调用,此处不需要添加tool_calls字段 - } - } - ], - "usage": usage, - } - return ( - response_handler(result) - if response_handler - else self._default_response_handler(result, user_id, request_type, endpoint) - ) - except Exception as e: - logger.error(f"模型 {self.model_name} 处理流式输出时发生错误: {str(e)}") - # 确保在发生错误时也能正确清理资源 - try: - await response.release() - except Exception as cleanup_error: - logger.error(f"清理资源时发生错误: {cleanup_error}") - # 返回已经累积的内容 - result = { - "choices": [ - { - "message": { - "content": accumulated_content, - "reasoning_content": reasoning_content, - # 流式输出可能没有工具调用,此处不需要添加tool_calls字段 - } - } - ], - "usage": usage, - } - return ( - response_handler(result) - if response_handler - else self._default_response_handler(result, user_id, request_type, endpoint) - ) - content = accumulated_content - think_match = re.search(r"(.*?)", content, re.DOTALL) - if think_match: - reasoning_content = think_match.group(1).strip() - content = re.sub(r".*?", "", content, flags=re.DOTALL).strip() - # 构造一个伪result以便调用自定义响应处理器或默认处理器 - result = { - "choices": [ - { - "message": { - "content": content, - "reasoning_content": reasoning_content, - # 流式输出可能没有工具调用,此处不需要添加tool_calls字段 - } - } - ], - "usage": usage, - } - return ( - response_handler(result) - if response_handler - else self._default_response_handler(result, user_id, request_type, endpoint) - ) - else: - result = await response.json() - # 使用自定义处理器或默认处理 - return ( - response_handler(result) - if response_handler - else self._default_response_handler(result, user_id, request_type, endpoint) - ) - - except (aiohttp.ClientError, asyncio.TimeoutError) as e: - if retry < policy["max_retries"] - 1: - wait_time = policy["base_wait"] * (2**retry) - logger.error(f"模型 {self.model_name} 网络错误,等待{wait_time}秒后重试... 错误: {str(e)}") - await asyncio.sleep(wait_time) - continue - else: - logger.critical(f"模型 {self.model_name} 网络错误达到最大重试次数: {str(e)}") - raise RuntimeError(f"网络请求失败: {str(e)}") from e - except Exception as e: - logger.critical(f"模型 {self.model_name} 未预期的错误: {str(e)}") - raise RuntimeError(f"请求过程中发生错误: {str(e)}") from e - - except aiohttp.ClientResponseError as e: - # 处理aiohttp抛出的响应错误 - if retry < policy["max_retries"] - 1: - wait_time = policy["base_wait"] * (2**retry) - logger.error( - f"模型 {self.model_name} HTTP响应错误,等待{wait_time}秒后重试... 状态码: {e.status}, 错误: {e.message}" - ) - try: - if hasattr(e, "response") and e.response and hasattr(e.response, "text"): - error_text = await e.response.text() - try: - error_json = json.loads(error_text) - if isinstance(error_json, list) and len(error_json) > 0: - for error_item in error_json: - if "error" in error_item and isinstance(error_item["error"], dict): - error_obj = error_item["error"] - logger.error( - f"模型 {self.model_name} 服务器错误详情: 代码={error_obj.get('code')}, " - f"状态={error_obj.get('status')}, " - f"消息={error_obj.get('message')}" - ) - elif isinstance(error_json, dict) and "error" in error_json: - error_obj = error_json.get("error", {}) - logger.error( - f"模型 {self.model_name} 服务器错误详情: 代码={error_obj.get('code')}, " - f"状态={error_obj.get('status')}, " - f"消息={error_obj.get('message')}" - ) - else: - logger.error(f"模型 {self.model_name} 服务器错误响应: {error_json}") - except (json.JSONDecodeError, TypeError) as json_err: - logger.warning( - f"模型 {self.model_name} 响应不是有效的JSON: {str(json_err)}, 原始内容: {error_text[:200]}" - ) - except (AttributeError, TypeError, ValueError) as parse_err: - logger.warning(f"模型 {self.model_name} 无法解析响应错误内容: {str(parse_err)}") - - await asyncio.sleep(wait_time) - else: - logger.critical( - f"模型 {self.model_name} HTTP响应错误达到最大重试次数: 状态码: {e.status}, 错误: {e.message}" - ) - # 安全地检查和记录请求详情 - if ( - image_base64 - and payload - and isinstance(payload, dict) - and "messages" in payload - and len(payload["messages"]) > 0 - ): - if isinstance(payload["messages"][0], dict) and "content" in payload["messages"][0]: - content = payload["messages"][0]["content"] - if isinstance(content, list) and len(content) > 1 and "image_url" in content[1]: - payload["messages"][0]["content"][1]["image_url"]["url"] = ( - f"data:image/{image_format.lower() if image_format else 'jpeg'};base64," - f"{image_base64[:10]}...{image_base64[-10:]}" - ) - logger.critical(f"请求头: {await self._build_headers(no_key=True)} 请求体: {payload}") - raise RuntimeError(f"模型 {self.model_name} API请求失败: 状态码 {e.status}, {e.message}") from e - except Exception as e: - if retry < policy["max_retries"] - 1: - wait_time = policy["base_wait"] * (2**retry) - logger.error(f"模型 {self.model_name} 请求失败,等待{wait_time}秒后重试... 错误: {str(e)}") - await asyncio.sleep(wait_time) - else: - logger.critical(f"模型 {self.model_name} 请求失败: {str(e)}") - # 安全地检查和记录请求详情 - if ( - image_base64 - and payload - and isinstance(payload, dict) - and "messages" in payload - and len(payload["messages"]) > 0 - ): - if isinstance(payload["messages"][0], dict) and "content" in payload["messages"][0]: - content = payload["messages"][0]["content"] - if isinstance(content, list) and len(content) > 1 and "image_url" in content[1]: - payload["messages"][0]["content"][1]["image_url"]["url"] = ( - f"data:image/{image_format.lower() if image_format else 'jpeg'};base64," - f"{image_base64[:10]}...{image_base64[-10:]}" - ) - logger.critical(f"请求头: {await self._build_headers(no_key=True)} 请求体: {payload}") - raise RuntimeError(f"模型 {self.model_name} API请求失败: {str(e)}") from e - - logger.error(f"模型 {self.model_name} 达到最大重试次数,请求仍然失败") - raise RuntimeError(f"模型 {self.model_name} 达到最大重试次数,API请求仍然失败") - ''' - async def _prepare_request( self, endpoint: str, From d50c2df0f6063092efe08a8ecb9c70e39213362c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A2=A8=E6=A2=93=E6=9F=92?= <1787882683@qq.com> Date: Sat, 26 Apr 2025 21:49:11 +0800 Subject: [PATCH 5/7] fix: Ruff --- src/heart_flow/sub_heartflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index 9274acf70..63d4b4a0e 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -2,7 +2,7 @@ from .observation import Observation, ChattingObservation import asyncio from src.config.config import global_config import time -from typing import Optional, List, Dict, Tuple, TYPE_CHECKING, Callable, Coroutine +from typing import Optional, List, Dict, Tuple, Callable, Coroutine import traceback from src.common.logger import get_module_logger, LogConfig, SUB_HEARTFLOW_STYLE_CONFIG # noqa: E402 import random From 3931423e8e2b4b777fcbf5a3c710f9f7bbea25c3 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sat, 26 Apr 2025 13:49:24 +0000 Subject: [PATCH 6/7] =?UTF-8?q?=F0=9F=A4=96=20=E8=87=AA=E5=8A=A8=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E5=8C=96=E4=BB=A3=E7=A0=81=20[skip=20ci]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/heart_flow/background_tasks.py | 2 -- src/heart_flow/sub_heartflow.py | 6 ++-- src/heart_flow/subheartflow_manager.py | 44 +++++++++++------------- src/plugins/heartFC_chat/heartFC_chat.py | 33 ++++++++++-------- 4 files changed, 42 insertions(+), 43 deletions(-) diff --git a/src/heart_flow/background_tasks.py b/src/heart_flow/background_tasks.py index 076f441c9..d2bd93213 100644 --- a/src/heart_flow/background_tasks.py +++ b/src/heart_flow/background_tasks.py @@ -255,7 +255,6 @@ class BackgroundTaskManager: # --- 结束新增 --- - # --- 结束新增 --- # --- Specific Task Runners --- # @@ -286,4 +285,3 @@ class BackgroundTaskManager: interval=self.interest_eval_interval, task_func=self._perform_interest_eval_work, ) - diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index 63d4b4a0e..ead07f53c 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -243,7 +243,7 @@ class SubHeartflow: self, subheartflow_id, mai_states: MaiStateInfo, - hfc_no_reply_callback: Callable[[], Coroutine[None, None, None]] + hfc_no_reply_callback: Callable[[], Coroutine[None, None, None]], ): """子心流初始化函数 @@ -380,8 +380,8 @@ class SubHeartflow: self.heart_fc_instance = HeartFChatting( chat_id=self.subheartflow_id, sub_mind=self.sub_mind, - observations=self.observations, # 传递所有观察者 - on_consecutive_no_reply_callback=self.hfc_no_reply_callback # <-- Use stored callback + observations=self.observations, # 传递所有观察者 + on_consecutive_no_reply_callback=self.hfc_no_reply_callback, # <-- Use stored callback ) # 初始化并启动 HeartFChatting diff --git a/src/heart_flow/subheartflow_manager.py b/src/heart_flow/subheartflow_manager.py index cf6e01b6a..50cf38b03 100644 --- a/src/heart_flow/subheartflow_manager.py +++ b/src/heart_flow/subheartflow_manager.py @@ -2,8 +2,8 @@ import asyncio import time import random from typing import Dict, Any, Optional, List -import json # 导入 json 模块 -import functools # <-- 新增导入 +import json # 导入 json 模块 +import functools # <-- 新增导入 # 导入日志模块 from src.common.logger import get_module_logger, LogConfig, SUBHEARTFLOW_MANAGER_STYLE_CONFIG @@ -88,7 +88,7 @@ class SubHeartflowManager: new_subflow = SubHeartflow( subheartflow_id, self.mai_state_info, - hfc_callback # <-- 传递 partial 创建的回调 + hfc_callback, # <-- 传递 partial 创建的回调 ) # 异步初始化 @@ -134,20 +134,23 @@ class SubHeartflowManager: if subflow.chat_state.chat_status == ChatState.ABSENT: return True else: - logger.warning(f"{log_prefix} 调用 change_chat_state 后,{stream_name} 状态仍为 {subflow.chat_state.chat_status.value}") + logger.warning( + f"{log_prefix} 调用 change_chat_state 后,{stream_name} 状态仍为 {subflow.chat_state.chat_status.value}" + ) return False except Exception as e: logger.error(f"{log_prefix} 设置 {stream_name} 状态为 ABSENT 时失败: {e}", exc_info=True) return False else: logger.debug(f"{log_prefix} {stream_name} 已是 ABSENT 状态") - return True # 已经是目标状态,视为成功 + return True # 已经是目标状态,视为成功 + # --- 结束新增 --- async def sleep_subheartflow(self, subheartflow_id: Any, reason: str) -> bool: """停止指定的子心流并将其状态设置为 ABSENT""" log_prefix = "[子心流管理]" - async with self._lock: # 加锁以安全访问字典 + async with self._lock: # 加锁以安全访问字典 subheartflow = self.subheartflows.get(subheartflow_id) stream_name = chat_manager.get_stream_name(subheartflow_id) or subheartflow_id @@ -229,7 +232,7 @@ class SubHeartflowManager: changed_count = 0 processed_count = 0 - async with self._lock: # 获取锁以安全迭代 + async with self._lock: # 获取锁以安全迭代 # 使用 list() 创建一个当前值的快照,防止在迭代时修改字典 flows_to_update = list(self.subheartflows.values()) processed_count = len(flows_to_update) @@ -239,8 +242,7 @@ class SubHeartflowManager: for subflow in flows_to_update: # 记录原始状态,以便统计实际改变的数量 - original_state_was_absent = (subflow.chat_state.chat_status == ChatState.ABSENT) - + original_state_was_absent = subflow.chat_state.chat_status == ChatState.ABSENT success = await self._try_set_subflow_absent_internal(subflow, log_prefix) @@ -345,7 +347,6 @@ class SubHeartflowManager: log_prefix = f"[{stream_name}]" current_subflow_state = sub_hf.chat_state.chat_status - _observation_summary = "没有可用的观察信息。" # 默认值 first_observation = sub_hf.observations[0] @@ -357,12 +358,10 @@ class SubHeartflowManager: else: logger.warning(f"{log_prefix} [{stream_name}] 第一个观察者不是 ChattingObservation 类型。") - - # --- 获取麦麦状态 --- mai_state_description = f"你当前状态: {current_mai_state.value}。" - - # 获取个性化信息 + + # 获取个性化信息 individuality = Individuality.get_instance() # 构建个性部分 @@ -378,7 +377,6 @@ class SubHeartflowManager: if individuality.identity.identity_detail: random_detail = random.choice(individuality.identity.identity_detail) prompt_personality += f",{random_detail}" - # --- 针对 ABSENT 状态 --- if current_subflow_state == ChatState.ABSENT: @@ -392,14 +390,14 @@ class SubHeartflowManager: f"进入常规聊天(CHAT)状态?\n" f"给出你的判断,和理由,然后以 JSON 格式回答" f"包含键 'decision',如果要开始聊天,值为 true ,否则为 false.\n" - f"包含键 'reason',其值为你的理由。\n" - f"例如:{{\"decision\": true, \"reason\": \"因为我想聊天\"}}\n" + f"包含键 'reason',其值为你的理由。\n" + f'例如:{{"decision": true, "reason": "因为我想聊天"}}\n' f"请只输出有效的 JSON 对象。" ) # 调用LLM评估 should_activate = await self._llm_evaluate_state_transition(prompt) - if should_activate is None: # 处理解析失败或意外情况 + if should_activate is None: # 处理解析失败或意外情况 logger.warning(f"{log_prefix}LLM评估返回无效结果,跳过。") continue @@ -435,14 +433,14 @@ class SubHeartflowManager: f"还是暂时离开聊天,进入休眠状态?\n" f"给出你的判断,和理由,然后以 JSON 格式回答" f"包含键 'decision',如果要离开聊天,值为 true ,否则为 false.\n" - f"包含键 'reason',其值为你的理由。\n" - f"例如:{{\"decision\": true, \"reason\": \"因为我想休息\"}}\n" + f"包含键 'reason',其值为你的理由。\n" + f'例如:{{"decision": true, "reason": "因为我想休息"}}\n' f"请只输出有效的 JSON 对象。" ) # 调用LLM评估 should_deactivate = await self._llm_evaluate_state_transition(prompt) - if should_deactivate is None: # 处理解析失败或意外情况 + if should_deactivate is None: # 处理解析失败或意外情况 logger.warning(f"{log_prefix}LLM评估返回无效结果,跳过。") continue @@ -453,8 +451,6 @@ class SubHeartflowManager: transitioned_to_absent += 1 else: logger.info(f"{log_prefix}LLM建议不进入ABSENT状态。") - - async def _llm_evaluate_state_transition(self, prompt: str) -> Optional[bool]: """ @@ -573,6 +569,7 @@ class SubHeartflowManager: # 注意:这里不需要再获取锁,因为 request_absent_transition 内部会处理锁 logger.debug(f"[管理器 HFC 处理器] 接收到来自 {subheartflow_id} 的 HFC 无回复信号") await self.request_absent_transition(subheartflow_id) + # --- 结束新增 --- # # --- 新增:处理来自 HeartFChatting 的状态转换请求 --- # @@ -608,4 +605,5 @@ class SubHeartflowManager: logger.warning( f"[状态转换请求] 收到对 {stream_name} 的请求,但其状态为 {current_state.value} (非 FOCUSED),不执行转换" ) + # --- 结束新增 --- # diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index 9a2862adb..8f376b37f 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -154,14 +154,14 @@ class HeartFChatting: 其生命周期现在由其关联的 SubHeartflow 的 FOCUSED 状态控制。 """ - CONSECUTIVE_NO_REPLY_THRESHOLD = 4 # 连续不回复的阈值 + CONSECUTIVE_NO_REPLY_THRESHOLD = 4 # 连续不回复的阈值 def __init__( self, chat_id: str, sub_mind: SubMind, observations: Observation, - on_consecutive_no_reply_callback: Callable[[], Coroutine[None, None, None]] + on_consecutive_no_reply_callback: Callable[[], Coroutine[None, None, None]], ): """ HeartFChatting 初始化函数 @@ -209,8 +209,8 @@ class HeartFChatting: self._cycle_counter = 0 self._cycle_history: Deque[CycleInfo] = deque(maxlen=10) # 保留最近10个循环的信息 self._current_cycle: Optional[CycleInfo] = None - self._lian_xu_bu_hui_fu_ci_shu: int = 0 # <--- 新增:连续不回复计数器 - self._shutting_down: bool = False # <--- 新增:关闭标志位 + self._lian_xu_bu_hui_fu_ci_shu: int = 0 # <--- 新增:连续不回复计数器 + self._shutting_down: bool = False # <--- 新增:关闭标志位 async def _initialize(self) -> bool: """ @@ -309,9 +309,9 @@ class HeartFChatting: # 如果未能获取锁(理论上不太可能,除非 shutdown 过程中释放了但又被抢了?) # 或者也可以在这里再次检查 self._shutting_down if self._shutting_down: - break # 再次检查,确保退出 + break # 再次检查,确保退出 logger.warning(f"{self.log_prefix} 未能获取循环处理锁,跳过本次循环。") - await asyncio.sleep(0.1) # 短暂等待避免空转 + await asyncio.sleep(0.1) # 短暂等待避免空转 continue # 记录规划开始时间点 @@ -600,27 +600,30 @@ class HeartFChatting: if not self._shutting_down: self._lian_xu_bu_hui_fu_ci_shu += 1 - logger.debug(f"{self.log_prefix} 连续不回复计数增加: {self._lian_xu_bu_hui_fu_ci_shu}/{self.CONSECUTIVE_NO_REPLY_THRESHOLD}") + logger.debug( + f"{self.log_prefix} 连续不回复计数增加: {self._lian_xu_bu_hui_fu_ci_shu}/{self.CONSECUTIVE_NO_REPLY_THRESHOLD}" + ) # 检查是否达到阈值 if self._lian_xu_bu_hui_fu_ci_shu >= self.CONSECUTIVE_NO_REPLY_THRESHOLD: - logger.info(f"{self.log_prefix} 连续不回复达到阈值 ({self._lian_xu_bu_hui_fu_ci_shu}次),调用回调请求状态转换") + logger.info( + f"{self.log_prefix} 连续不回复达到阈值 ({self._lian_xu_bu_hui_fu_ci_shu}次),调用回调请求状态转换" + ) # 调用回调。注意:这里不重置计数器,依赖回调函数成功改变状态来隐式重置上下文。 await self.on_consecutive_no_reply_callback() - - return True + return True except asyncio.CancelledError: # 如果在等待过程中任务被取消(可能是因为 shutdown) logger.info(f"{self.log_prefix} 处理 'no_reply' 时等待被中断 (CancelledError)") # 让异常向上传播,由 _hfc_loop 的异常处理逻辑接管 raise - except Exception as e: # 捕获调用管理器或其他地方可能发生的错误 + except Exception as e: # 捕获调用管理器或其他地方可能发生的错误 logger.error(f"{self.log_prefix} 处理 'no_reply' 时发生错误: {e}") logger.error(traceback.format_exc()) # 发生意外错误时,可以选择是否重置计数器,这里选择不重置 - return False # 表示动作未成功 + return False # 表示动作未成功 async def _wait_for_new_message(self, observation, planner_start_db_time: float, log_prefix: str) -> bool: """ @@ -639,7 +642,7 @@ class HeartFChatting: # --- 在每次循环开始时检查关闭标志 --- if self._shutting_down: logger.info(f"{log_prefix} 等待新消息时检测到关闭信号,中断等待。") - return False # 表示因为关闭而退出 + return False # 表示因为关闭而退出 # ----------------------------------- # 检查新消息 @@ -654,7 +657,7 @@ class HeartFChatting: try: # 短暂休眠,让其他任务有机会运行,并能更快响应取消或关闭 - await asyncio.sleep(0.5) # 缩短休眠时间 + await asyncio.sleep(0.5) # 缩短休眠时间 except asyncio.CancelledError: # 如果在休眠时被取消,再次检查关闭标志 # 如果是正常关闭,则不需要警告 @@ -910,7 +913,7 @@ class HeartFChatting: async def shutdown(self): """优雅关闭HeartFChatting实例,取消活动循环任务""" logger.info(f"{self.log_prefix} 正在关闭HeartFChatting...") - self._shutting_down = True # <-- 在开始关闭时设置标志位 + self._shutting_down = True # <-- 在开始关闭时设置标志位 # 取消循环任务 if self._loop_task and not self._loop_task.done(): From 84e87d2886eba1fe771d8c4396b59d993a725f14 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sat, 26 Apr 2025 21:57:26 +0800 Subject: [PATCH 7/7] Update subheartflow_manager.py --- src/heart_flow/subheartflow_manager.py | 96 ++++++++++++++------------ 1 file changed, 50 insertions(+), 46 deletions(-) diff --git a/src/heart_flow/subheartflow_manager.py b/src/heart_flow/subheartflow_manager.py index 50cf38b03..6d22494d4 100644 --- a/src/heart_flow/subheartflow_manager.py +++ b/src/heart_flow/subheartflow_manager.py @@ -262,63 +262,67 @@ class SubHeartflowManager: async def evaluate_interest_and_promote(self): """评估子心流兴趣度,满足条件且未达上限则提升到FOCUSED状态(基于start_hfc_probability)""" - log_prefix = "[兴趣评估]" - # 使用 self.mai_state_info 获取当前状态和限制 - current_state = self.mai_state_info.get_current_state() - focused_limit = current_state.get_focused_chat_max_num() + try: + log_prefix = "[兴趣评估]" + # 使用 self.mai_state_info 获取当前状态和限制 + current_state = self.mai_state_info.get_current_state() + focused_limit = current_state.get_focused_chat_max_num() + logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 开始尝试提升到FOCUSED状态") - if int(time.time()) % 20 == 0: # 每20秒输出一次 - logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 可以在{focused_limit}个群激情聊天") + if int(time.time()) % 20 == 0: # 每20秒输出一次 + logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 可以在{focused_limit}个群激情聊天") - if focused_limit <= 0: - # logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 不允许 FOCUSED 子心流") - return + if focused_limit <= 0: + # logger.debug(f"{log_prefix} 当前状态 ({current_state.value}) 不允许 FOCUSED 子心流") + return - current_focused_count = self.count_subflows_by_state(ChatState.FOCUSED) - if current_focused_count >= focused_limit: - logger.debug(f"{log_prefix} 已达专注上限 ({current_focused_count}/{focused_limit})") - return + current_focused_count = self.count_subflows_by_state(ChatState.FOCUSED) + if current_focused_count >= focused_limit: + logger.debug(f"{log_prefix} 已达专注上限 ({current_focused_count}/{focused_limit})") + return - for sub_hf in list(self.subheartflows.values()): - flow_id = sub_hf.subheartflow_id - stream_name = chat_manager.get_stream_name(flow_id) or flow_id + for sub_hf in list(self.subheartflows.values()): + flow_id = sub_hf.subheartflow_id + stream_name = chat_manager.get_stream_name(flow_id) or flow_id - # 跳过非CHAT状态或已经是FOCUSED状态的子心流 - if sub_hf.chat_state.chat_status == ChatState.FOCUSED: - continue - - from .mai_state_manager import enable_unlimited_hfc_chat - - if not enable_unlimited_hfc_chat: - if sub_hf.chat_state.chat_status != ChatState.CHAT: + # 跳过非CHAT状态或已经是FOCUSED状态的子心流 + if sub_hf.chat_state.chat_status == ChatState.FOCUSED: continue - # 检查是否满足提升概率 - if random.random() >= sub_hf.interest_chatting.start_hfc_probability: - continue + from .mai_state_manager import enable_unlimited_hfc_chat - # 再次检查是否达到上限 - if current_focused_count >= focused_limit: - logger.debug(f"{log_prefix} [{stream_name}] 已达专注上限") - break + if not enable_unlimited_hfc_chat: + if sub_hf.chat_state.chat_status != ChatState.CHAT: + continue - # 获取最新状态并执行提升 - current_subflow = self.subheartflows.get(flow_id) - if not current_subflow: - continue + # 检查是否满足提升概率 + if random.random() >= sub_hf.interest_chatting.start_hfc_probability: + continue - logger.info( - f"{log_prefix} [{stream_name}] 触发 认真水群 (概率={current_subflow.interest_chatting.start_hfc_probability:.2f})" - ) + # 再次检查是否达到上限 + if current_focused_count >= focused_limit: + logger.debug(f"{log_prefix} [{stream_name}] 已达专注上限") + break - # 执行状态提升 - await current_subflow.change_chat_state(ChatState.FOCUSED) + # 获取最新状态并执行提升 + current_subflow = self.subheartflows.get(flow_id) + if not current_subflow: + continue - # 验证提升结果 - if ( - final_subflow := self.subheartflows.get(flow_id) - ) and final_subflow.chat_state.chat_status == ChatState.FOCUSED: - current_focused_count += 1 + logger.info( + f"{log_prefix} [{stream_name}] 触发 认真水群 (概率={current_subflow.interest_chatting.start_hfc_probability:.2f})" + ) + + # 执行状态提升 + await current_subflow.change_chat_state(ChatState.FOCUSED) + + # 验证提升结果 + if ( + final_subflow := self.subheartflows.get(flow_id) + ) and final_subflow.chat_state.chat_status == ChatState.FOCUSED: + current_focused_count += 1 + except Exception as e: + logger.error(f"启动HFC 兴趣评估失败: {e}", exc_info=True) async def evaluate_and_transition_subflows_by_llm(self): """ @@ -352,7 +356,7 @@ class SubHeartflowManager: first_observation = sub_hf.observations[0] if isinstance(first_observation, ChattingObservation): # 组合中期记忆和当前聊天内容 - first_observation.observe() + await first_observation.observe() current_chat = first_observation.talking_message_str or "当前无聊天内容。" combined_summary = f"当前聊天内容:\n{current_chat}" else: