diff --git a/src/config/config.py b/src/config/config.py index bf184a002..ba9416d51 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -28,7 +28,7 @@ logger = get_module_logger("config", config=config_config) # 考虑到,实际上配置文件中的mai_version是不会自动更新的,所以采用硬编码 is_test = True mai_version_main = "0.6.3" -mai_version_fix = "snapshot-3" +mai_version_fix = "snapshot-4" if mai_version_fix: if is_test: diff --git a/src/heart_flow/background_tasks.py b/src/heart_flow/background_tasks.py index 7fb63bf66..2bfd74204 100644 --- a/src/heart_flow/background_tasks.py +++ b/src/heart_flow/background_tasks.py @@ -16,6 +16,8 @@ background_tasks_log_config = LogConfig( logger = get_module_logger("background_tasks", config=background_tasks_log_config) +# 新增随机停用间隔 (5 分钟) +RANDOM_DEACTIVATION_INTERVAL_SECONDS = 300 # 新增兴趣评估间隔 INTEREST_EVAL_INTERVAL_SECONDS = 5 @@ -35,6 +37,8 @@ class BackgroundTaskManager: inactive_threshold: int, # 新增兴趣评估间隔参数 interest_eval_interval: int = INTEREST_EVAL_INTERVAL_SECONDS, + # 新增随机停用间隔参数 + random_deactivation_interval: int = RANDOM_DEACTIVATION_INTERVAL_SECONDS, ): self.mai_state_info = mai_state_info self.mai_state_manager = mai_state_manager @@ -47,19 +51,21 @@ class BackgroundTaskManager: self.log_interval = log_interval self.inactive_threshold = inactive_threshold # For cleanup task self.interest_eval_interval = interest_eval_interval # 存储兴趣评估间隔 + self.random_deactivation_interval = random_deactivation_interval # 存储随机停用间隔 # Task references self._state_update_task: Optional[asyncio.Task] = None self._cleanup_task: Optional[asyncio.Task] = None self._logging_task: Optional[asyncio.Task] = None self._interest_eval_task: Optional[asyncio.Task] = None # 新增兴趣评估任务引用 + self._random_deactivation_task: Optional[asyncio.Task] = None # 新增随机停用任务引用 self._tasks: List[Optional[asyncio.Task]] = [] # Keep track of all tasks async def start_tasks(self): """启动所有后台任务 功能说明: - - 启动核心后台任务: 状态更新、清理、日志记录和兴趣评估 + - 启动核心后台任务: 状态更新、清理、日志记录、兴趣评估和随机停用 - 每个任务启动前检查是否已在运行 - 将任务引用保存到任务列表 """ @@ -99,6 +105,15 @@ class BackgroundTaskManager: f"兴趣评估任务已启动 间隔:{self.interest_eval_interval}s", "_interest_eval_task", ), + # 新增随机停用任务配置 + ( + self._random_deactivation_task, + self._run_random_deactivation_cycle, + "hf_random_deactivation", + "debug", # 设为debug,避免过多日志 + f"随机停用任务已启动 间隔:{self.random_deactivation_interval}s", + "_random_deactivation_task", + ), ] # 统一启动所有任务 @@ -230,6 +245,12 @@ class BackgroundTaskManager: # --- 结束新增 --- + # --- 新增随机停用工作函数 --- + async def _perform_random_deactivation_work(self): + """执行一轮子心流随机停用检查。""" + await self.subheartflow_manager.randomly_deactivate_subflows() + # --- 结束新增 --- + # --- Specific Task Runners --- # async def _run_state_update_cycle(self, interval: int): await self._run_periodic_loop( @@ -255,3 +276,14 @@ class BackgroundTaskManager: ) # --- 结束新增 --- + + # --- 新增随机停用任务运行器 --- + async def _run_random_deactivation_cycle(self): + """运行随机停用循环。""" + await self._run_periodic_loop( + task_name="Random Deactivation", + interval=self.random_deactivation_interval, + task_func=self._perform_random_deactivation_work, + ) + + # --- 结束新增 --- diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index 68400e837..85ceaf76f 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -358,30 +358,40 @@ class SubHeartflow: self.clear_interest_dict() # 清理兴趣字典,准备专注聊天 log_prefix = self.log_prefix + # 如果实例已存在,检查其循环任务状态 if self.heart_fc_instance: - if not self.heart_fc_instance._loop_active: - logger.warning(f"{log_prefix} HeartFChatting 实例存在但未激活,尝试重新激活...") - await self.heart_fc_instance.add_time() # 尝试添加时间以激活循环 - return True # 假设 add_time 会处理激活逻辑 + # 如果任务已完成或不存在,则尝试重新启动 + if self.heart_fc_instance._loop_task is None or self.heart_fc_instance._loop_task.done(): + logger.info(f"{log_prefix} HeartFChatting 实例存在但循环未运行,尝试启动...") + try: + await self.heart_fc_instance.start() # 启动循环 + logger.info(f"{log_prefix} HeartFChatting 循环已启动。") + return True + except Exception as e: + logger.error(f"{log_prefix} 尝试启动现有 HeartFChatting 循环时出错: {e}") + logger.error(traceback.format_exc()) + return False # 启动失败 else: + # 任务正在运行 logger.debug(f"{log_prefix} HeartFChatting 已在运行中。") - return True # 已经在运行 + return True # 已经在运行 - logger.info(f"{log_prefix} 麦麦准备开始专注聊天...") + # 如果实例不存在,则创建并启动 + logger.info(f"{log_prefix} 麦麦准备开始专注聊天 (创建新实例)...") try: self.heart_fc_instance = HeartFChatting( chat_id=self.chat_id, ) if await self.heart_fc_instance._initialize(): - await self.heart_fc_instance.add_time() # 初始化成功后添加初始时间 - logger.info(f"{log_prefix} 麦麦已成功进入专注聊天模式。") + await self.heart_fc_instance.start() # 初始化成功后启动循环 + logger.info(f"{log_prefix} 麦麦已成功进入专注聊天模式 (新实例已启动)。") return True else: logger.error(f"{log_prefix} HeartFChatting 初始化失败,无法进入专注模式。") self.heart_fc_instance = None # 初始化失败,清理实例 return False except Exception as e: - logger.error(f"{log_prefix} 创建或初始化 HeartFChatting 实例时出错: {e}") + logger.error(f"{log_prefix} 创建或启动 HeartFChatting 实例时出错: {e}") logger.error(traceback.format_exc()) self.heart_fc_instance = None # 创建或初始化异常,清理实例 return False diff --git a/src/heart_flow/subheartflow_manager.py b/src/heart_flow/subheartflow_manager.py index 344eaf772..d5f7ed86b 100644 --- a/src/heart_flow/subheartflow_manager.py +++ b/src/heart_flow/subheartflow_manager.py @@ -358,6 +358,74 @@ class SubHeartflowManager: else: logger.debug(f"{log_prefix_manager} 评估周期结束, 未提升任何子心流。") + async def randomly_deactivate_subflows(self, deactivation_probability: float = 0.3): + """以一定概率将 FOCUSED 或 CHAT 状态的子心流回退到 ABSENT 状态。""" + log_prefix_manager = "[子心流管理器-随机停用]" + logger.debug(f"{log_prefix_manager} 开始随机停用检查... (概率: {deactivation_probability:.0%})") + + # 使用快照安全遍历 + subflows_snapshot = list(self.subheartflows.values()) + deactivated_count = 0 + + # 预先计算状态数量,因为 set_chat_state 需要 + states_num_before = ( + self.count_subflows_by_state(ChatState.ABSENT), + self.count_subflows_by_state(ChatState.CHAT), + self.count_subflows_by_state(ChatState.FOCUSED), + ) + + try: + for sub_hf in subflows_snapshot: + flow_id = sub_hf.subheartflow_id + stream_name = chat_manager.get_stream_name(flow_id) or flow_id + log_prefix_flow = f"[{stream_name}]" + current_state = sub_hf.chat_state.chat_status + + # 只处理 FOCUSED 或 CHAT 状态 + if current_state not in [ChatState.FOCUSED, ChatState.CHAT]: + continue + + # 检查随机概率 + if random.random() < deactivation_probability: + logger.info( + f"{log_prefix_manager} {log_prefix_flow} 随机触发停用 (从 {current_state.value}) -> ABSENT" + ) + + # 获取当前实例以检查最新状态 + current_subflow = self.subheartflows.get(flow_id) + if not current_subflow or current_subflow.chat_state.chat_status != current_state: + logger.warning(f"{log_prefix_manager} {log_prefix_flow} 尝试停用时状态已改变或实例消失,跳过。") + continue + + # --- 状态设置 --- # + # 注意:这里传递的状态数量是 *停用前* 的状态数量 + await current_subflow.set_chat_state(ChatState.ABSENT, states_num_before) + + # --- 状态验证 (可选) --- + final_subflow = self.subheartflows.get(flow_id) + if final_subflow: + final_state = final_subflow.chat_state.chat_status + if final_state == ChatState.ABSENT: + logger.debug( + f"{log_prefix_manager} {log_prefix_flow} 成功从 {current_state.value} 停用到 ABSENT 状态" + ) + deactivated_count += 1 + # 注意:停用后不需要更新 states_num_before,因为它只用于 set_chat_state 的限制检查 + else: + logger.warning( + f"{log_prefix_manager} {log_prefix_flow} 尝试停用到 ABSENT 后状态仍为 {final_state.value}" + ) + else: + logger.warning(f"{log_prefix_manager} {log_prefix_flow} 停用后验证时子心流 {flow_id} 消失") + + except Exception as e: + logger.error(f"{log_prefix_manager} 随机停用周期出错: {e}", exc_info=True) + + if deactivated_count > 0: + logger.info(f"{log_prefix_manager} 随机停用周期结束, 成功停用 {deactivated_count} 个子心流。") + else: + logger.debug(f"{log_prefix_manager} 随机停用周期结束, 未停用任何子心流。") + def count_subflows_by_state(self, state: ChatState) -> int: """统计指定状态的子心流数量 diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index b34788664..371c817e4 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -68,9 +68,8 @@ PLANNER_TOOL_DEFINITION = [ class HeartFChatting: """ 管理一个连续的Plan-Replier-Sender循环 - 用于在特定聊天流中生成回复,由计时器控制。 - 只要计时器>0,循环就会继续。 - 现在由其关联的 SubHeartflow 管理生命周期。 + 用于在特定聊天流中生成回复。 + 其生命周期现在由其关联的 SubHeartflow 的 FOCUSED 状态控制。 """ def __init__(self, chat_id: str): @@ -79,9 +78,6 @@ class HeartFChatting: 参数: chat_id: 聊天流唯一标识符(如stream_id) - gpt_instance: 文本回复生成器实例 - tool_user_instance: 工具使用实例 - emoji_manager_instance: 表情管理实例 """ # 基础属性 self.stream_id: str = chat_id # 聊天流ID @@ -91,7 +87,6 @@ class HeartFChatting: # 初始化状态控制 self._initialized = False # 是否已初始化标志 self._processing_lock = asyncio.Lock() # 处理锁(确保单次Plan-Replier-Sender周期) - self._timer_lock = asyncio.Lock() # 计时器锁(安全更新计时器) # 依赖注入存储 self.gpt_instance = HeartFCGenerator() # 文本回复生成器 @@ -106,11 +101,8 @@ class HeartFChatting: ) # 循环控制内部状态 - self._loop_timer: float = 0.0 # 循环剩余时间(秒) self._loop_active: bool = False # 循环是否正在运行 self._loop_task: Optional[asyncio.Task] = None # 主循环任务 - self._initial_duration: float = INITIAL_DURATION # 首次触发增加的时间 - self._last_added_duration: float = self._initial_duration # 上次增加的时间 def _get_log_prefix(self) -> str: """获取日志前缀,包含可读的流名称""" @@ -147,34 +139,10 @@ class HeartFChatting: logger.error(traceback.format_exc()) return False - async def add_time(self): - """ - 为麦麦添加时间,麦麦有兴趣时,固定增加15秒 - """ - log_prefix = self._get_log_prefix() - if not self._initialized: - if not await self._initialize(): - logger.error(f"{log_prefix} 无法添加时间: 未初始化。") - return - - async with self._timer_lock: - duration_to_add: float = 15.0 # 固定增加15秒 - if not self._loop_active: # 首次触发 - logger.info(f"{log_prefix} 麦麦有兴趣! 打算聊:15s.") - else: # 循环已激活 - logger.info(f"{log_prefix} 麦麦想继续聊:15s, 还能聊: {self._loop_timer:.1f}s.") - - # 添加固定时间 - new_timer_value = self._loop_timer + duration_to_add - self._loop_timer = max(0, new_timer_value) - - # 添加时间后,检查是否需要启动循环 - await self._start_loop_if_needed() - async def start(self): """ 显式尝试启动 HeartFChatting 的主循环。 - 如果循环未激活且计时器 > 0,则启动循环。 + 如果循环未激活,则启动循环。 """ log_prefix = self._get_log_prefix() if not self._initialized: @@ -185,14 +153,13 @@ class HeartFChatting: await self._start_loop_if_needed() async def _start_loop_if_needed(self): - """检查是否需要启动主循环,如果未激活且计时器大于0,则启动。""" + """检查是否需要启动主循环,如果未激活则启动。""" log_prefix = self._get_log_prefix() should_start_loop = False - async with self._timer_lock: - # 检查是否满足启动条件:未激活且计时器有时间 - if not self._loop_active and self._loop_timer > 0: - should_start_loop = True - self._loop_active = True # 在锁内标记为活动,防止重复启动 + # 直接检查是否激活,无需检查计时器 + if not self._loop_active: + should_start_loop = True + self._loop_active = True # 标记为活动,防止重复启动 if should_start_loop: # 检查是否已有任务在运行(理论上不应该,因为 _loop_active=False) @@ -206,13 +173,13 @@ class HeartFChatting: pass # 忽略取消或超时错误 self._loop_task = None # 清理旧任务引用 - logger.info(f"{log_prefix} 计时器 > 0 且循环未激活,启动主循环...") + logger.info(f"{log_prefix} 循环未激活,启动主循环...") # 创建新的循环任务 self._loop_task = asyncio.create_task(self._run_pf_loop()) # 添加完成回调 self._loop_task.add_done_callback(self._handle_loop_completion) # else: - # logger.trace(f"{log_prefix} 不需要启动循环(已激活或计时器为0)") # 可以取消注释以进行调试 + # logger.trace(f"{log_prefix} 不需要启动循环(已激活)") # 可以取消注释以进行调试 def _handle_loop_completion(self, task: asyncio.Task): """当 _run_pf_loop 任务完成时执行的回调。""" @@ -223,47 +190,38 @@ class HeartFChatting: logger.error(f"{log_prefix} HeartFChatting: 麦麦脱离了聊天(异常): {exception}") logger.error(traceback.format_exc()) # Log full traceback for exceptions else: - logger.debug(f"{log_prefix} HeartFChatting: 麦麦脱离了聊天 (正常完成)") + # Loop completing normally now means it was cancelled/shutdown externally + logger.info(f"{log_prefix} HeartFChatting: 麦麦脱离了聊天 (外部停止)") except asyncio.CancelledError: logger.info(f"{log_prefix} HeartFChatting: 麦麦脱离了聊天(任务取消)") finally: self._loop_active = False self._loop_task = None - self._last_added_duration = self._initial_duration - self._trigger_count_this_activation = 0 if self._processing_lock.locked(): logger.warning(f"{log_prefix} HeartFChatting: 处理锁在循环结束时仍被锁定,强制释放。") self._processing_lock.release() async def _run_pf_loop(self): """ - 主循环,当计时器>0时持续进行计划并可能回复消息 - 管理每个循环周期的处理锁 + 主循环,持续进行计划并可能回复消息,直到被外部取消。 + 管理每个循环周期的处理锁。 """ log_prefix = self._get_log_prefix() - logger.info(f"{log_prefix} HeartFChatting: 麦麦打算好好聊聊 (定时器: {self._loop_timer:.1f}s)") + logger.info(f"{log_prefix} HeartFChatting: 麦麦打算好好聊聊 (进入专注模式)") try: thinking_id = "" - while True: + while True: # Loop indefinitely until cancelled cycle_timers = {} # <--- Initialize timers dict for this cycle # Access MessageManager directly if message_manager.check_if_sending_message_exist(self.stream_id, thinking_id): - # logger.info(f"{log_prefix} HeartFChatting: 11111111111111111111111111111111麦麦还在发消息,等会再规划") + # logger.info(f"{log_prefix} HeartFChatting: 麦麦还在发消息,等会再规划") await asyncio.sleep(1) continue else: - # logger.info(f"{log_prefix} HeartFChatting: 11111111111111111111111111111111麦麦不发消息了,开始规划") + # logger.info(f"{log_prefix} HeartFChatting: 麦麦不发消息了,开始规划") pass - async with self._timer_lock: - current_timer = self._loop_timer - if current_timer <= 0: - logger.info( - f"{log_prefix} HeartFChatting: 聊太久了,麦麦打算休息一下 (计时器为 {current_timer:.1f}s)。退出HeartFChatting。" - ) - break - # 记录循环周期开始时间,用于计时和休眠计算 loop_cycle_start_time = time.monotonic() action_taken_this_cycle = False @@ -296,7 +254,7 @@ class HeartFChatting: logger.error(f"{log_prefix} Planner LLM 失败,跳过本周期回复尝试。理由: {reasoning}") # Optionally add a longer sleep? action_taken_this_cycle = False # Ensure no action is counted - # Continue to timer decrement and sleep + # Continue to sleep logic elif action == "text_reply": logger.debug(f"{log_prefix} HeartFChatting: 麦麦决定回复文本. 理由: {reasoning}") @@ -371,11 +329,11 @@ class HeartFChatting: with Timer("Wait New Msg", cycle_timers): # <--- Start Wait timer wait_start_time = time.monotonic() while True: - # 检查计时器是否耗尽 - async with self._timer_lock: - if self._loop_timer <= 0: - logger.info(f"{log_prefix} HeartFChatting: 等待新消息时计时器耗尽。") - break # 计时器耗尽,退出等待 + # Removed timer check within wait loop + # async with self._timer_lock: + # if self._loop_timer <= 0: + # logger.info(f"{log_prefix} HeartFChatting: 等待新消息时计时器耗尽。") + # break # 计时器耗尽,退出等待 # 检查是否有新消息 has_new = await observation.has_new_messages_since(planner_start_db_time) @@ -421,7 +379,7 @@ class HeartFChatting: if timer_strings: # 如果有有效计时器数据才打印 logger.debug(f"{log_prefix} 该次决策耗时: {'; '.join(timer_strings)}") - # --- Timer Decrement --- # + # --- Timer Decrement Removed --- # cycle_duration = time.monotonic() - loop_cycle_start_time except Exception as e_cycle: @@ -437,20 +395,24 @@ class HeartFChatting: self._processing_lock.release() # logger.trace(f"{log_prefix} 循环释放了处理锁.") # Reduce noise - async with self._timer_lock: - self._loop_timer -= cycle_duration - # Log timer decrement less aggressively - if cycle_duration > 0.1 or not action_taken_this_cycle: - logger.debug( - f"{log_prefix} HeartFChatting: 周期耗时 {cycle_duration:.2f}s. 剩余时间: {self._loop_timer:.1f}s." - ) + # --- Timer Decrement Logging Removed --- + # async with self._timer_lock: + # self._loop_timer -= cycle_duration + # # Log timer decrement less aggressively + # if cycle_duration > 0.1 or not action_taken_this_cycle: + # logger.debug( + # f"{log_prefix} HeartFChatting: 周期耗时 {cycle_duration:.2f}s. 剩余时间: {self._loop_timer:.1f}s." + # ) + if cycle_duration > 0.1: + logger.debug(f"{log_prefix} HeartFChatting: 周期耗时 {cycle_duration:.2f}s.") + # --- Delay --- # try: sleep_duration = 0.0 if not action_taken_this_cycle and cycle_duration < 1.5: sleep_duration = 1.5 - cycle_duration - elif cycle_duration < 0.2: + elif cycle_duration < 0.2: # Keep minimal sleep even after action sleep_duration = 0.2 if sleep_duration > 0: @@ -459,7 +421,7 @@ class HeartFChatting: except asyncio.CancelledError: logger.info(f"{log_prefix} Sleep interrupted, loop likely cancelling.") - break + break # Exit loop immediately on cancellation except asyncio.CancelledError: logger.info(f"{log_prefix} HeartFChatting: 麦麦的聊天主循环被取消了")