From 042e969292c50107ade9d4634d781ba0559db124 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sat, 26 Apr 2025 17:35:23 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E4=BF=AE=E5=A4=8D=E9=BA=A6?= =?UTF-8?q?=E9=BA=A6=E5=9B=9E=E5=A4=8D=E8=BF=87=E5=8E=BB=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/heart_flow/sub_heartflow.py | 18 +++ src/heart_flow/subheartflow_manager.py | 143 ++++++++++-------- src/plugins/heartFC_chat/heartFC_chat.py | 16 +- .../heartFC_chat/heartflow_prompt_builder.py | 1 + src/plugins/heartFC_chat/normal_chat.py | 67 +++++++- 5 files changed, 172 insertions(+), 73 deletions(-) diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index 33218f5fd..efd0ea1ed 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -86,8 +86,26 @@ class InterestChatting: logger.debug("后台兴趣更新任务已创建并启动。") def add_interest_dict(self, message: MessageRecv, interest_value: float, is_mentioned: bool): + """添加消息到兴趣字典 + + 参数: + message: 接收到的消息 + interest_value: 兴趣值 + is_mentioned: 是否被提及 + + 功能: + 1. 将消息添加到兴趣字典 + 2. 更新最后交互时间 + 3. 如果字典长度超过10,删除最旧的消息 + """ + # 添加新消息 self.interest_dict[message.message_info.message_id] = (message, interest_value, is_mentioned) self.last_interaction_time = time.time() + + # 如果字典长度超过10,删除最旧的消息 + if len(self.interest_dict) > 10: + oldest_key = next(iter(self.interest_dict)) + self.interest_dict.pop(oldest_key) async def _calculate_decay(self): """计算兴趣值的衰减 diff --git a/src/heart_flow/subheartflow_manager.py b/src/heart_flow/subheartflow_manager.py index 9357ff3b6..62d9e2f7b 100644 --- a/src/heart_flow/subheartflow_manager.py +++ b/src/heart_flow/subheartflow_manager.py @@ -2,6 +2,7 @@ import asyncio import time import random from typing import Dict, Any, Optional, List +import json # 导入 json 模块 # 导入日志模块 from src.common.logger import get_module_logger, LogConfig, SUBHEARTFLOW_MANAGER_STYLE_CONFIG @@ -400,69 +401,65 @@ class SubHeartflowManager: if current_subflow_state == ChatState.ABSENT: # 构建Prompt prompt = ( - f"子心流 [{stream_name}] 当前处于非活跃(ABSENT)状态。\n" + f"子心流 [{stream_name}] 当前处于非活跃(ABSENT)状态.\n" f"{mai_state_description}\n" f"最近观察到的内容摘要:\n---\n{combined_summary}\n---\n" f"基于以上信息,该子心流是否表现出足够的活跃迹象或重要性," - f"值得将其唤醒并进入常规聊天(CHAT)状态?" - f"请回答 '是' 或 '否'。" + f"值得将其唤醒并进入常规聊天(CHAT)状态?\n" + f"请以 JSON 格式回答,包含一个键 'decision',其值为 true 或 false.\n" + f"例如:{{\"decision\": true}}\n" + f"请只输出有效的 JSON 对象。" ) # 调用LLM评估 - try: - # 使用 self._llm_evaluate_state_transition - should_activate = await self._llm_evaluate_state_transition(prompt) - if should_activate: - # 检查CHAT限额 - if current_chat_count < chat_limit: - logger.info( - f"{log_prefix} [{stream_name}] LLM建议激活到CHAT状态,且未达上限({current_chat_count}/{chat_limit})。正在尝试转换..." - ) - await sub_hf.change_chat_state(ChatState.CHAT) - if sub_hf.chat_state.chat_status == ChatState.CHAT: - transitioned_to_chat += 1 - current_chat_count += 1 # 更新计数器 - else: - logger.warning(f"{log_prefix} [{stream_name}] 尝试激活到CHAT失败。") + should_activate = await self._llm_evaluate_state_transition(prompt) + if should_activate is None: # 处理解析失败或意外情况 + logger.warning(f"{log_prefix} [{stream_name}] LLM评估返回无效结果,跳过。") + continue + + if should_activate: + # 检查CHAT限额 + # 使用不上锁的版本,因为我们已经在锁内 + current_chat_count = self.count_subflows_by_state_nolock(ChatState.CHAT) + if current_chat_count < chat_limit: + logger.info( + f"{log_prefix} [{stream_name}] LLM建议激活到CHAT状态,且未达上限({current_chat_count}/{chat_limit})。正在尝试转换..." + ) + await sub_hf.change_chat_state(ChatState.CHAT) + if sub_hf.chat_state.chat_status == ChatState.CHAT: + transitioned_to_chat += 1 else: - logger.info( - f"{log_prefix} [{stream_name}] LLM建议激活到CHAT状态,但已达到上限({current_chat_count}/{chat_limit})。跳过转换。" - ) - except Exception as e: - logger.error( - f"{log_prefix} [{stream_name}] LLM评估或状态转换(ABSENT->CHAT)时出错: {e}", exc_info=True - ) + logger.warning(f"{log_prefix} [{stream_name}] 尝试激活到CHAT失败。") + else: + logger.info( + f"{log_prefix} [{stream_name}] LLM建议激活到CHAT状态,但已达到上限({current_chat_count}/{chat_limit})。跳过转换。" + ) # --- 针对 CHAT 状态 --- elif current_subflow_state == ChatState.CHAT: # 构建Prompt prompt = ( - f"子心流 [{stream_name}] 当前处于常规聊天(CHAT)状态。\n" + f"子心流 [{stream_name}] 当前处于常规聊天(CHAT)状态.\n" f"{mai_state_description}\n" f"最近观察到的内容摘要:\n---\n{combined_summary}\n---\n" f"基于以上信息,该子心流是否表现出不活跃、对话结束或不再需要关注的迹象," - f"应该让其进入休眠(ABSENT)状态?" - f"请回答 '是' 或 '否'。" + f"应该让其进入休眠(ABSENT)状态?\n" + f"请以 JSON 格式回答,包含一个键 'decision',其值为 true (表示应休眠) 或 false (表示不应休眠).\n" + f"例如:{{\"decision\": true}}\n" + f"请只输出有效的 JSON 对象。" ) # 调用LLM评估 - try: - # 使用 self._llm_evaluate_state_transition - should_deactivate = await self._llm_evaluate_state_transition(prompt) - if should_deactivate: - logger.info(f"{log_prefix} [{stream_name}] LLM建议进入ABSENT状态。正在尝试转换...") - await sub_hf.change_chat_state(ChatState.ABSENT) - if sub_hf.chat_state.chat_status == ChatState.ABSENT: - transitioned_to_absent += 1 - current_chat_count -= 1 # 更新计数器 - else: - logger.warning(f"{log_prefix} [{stream_name}] 尝试转换为ABSENT失败。") - except Exception as e: - logger.error( - f"{log_prefix} [{stream_name}] LLM评估或状态转换(CHAT->ABSENT)时出错: {e}", exc_info=True - ) + should_deactivate = await self._llm_evaluate_state_transition(prompt) + if should_deactivate is None: # 处理解析失败或意外情况 + logger.warning(f"{log_prefix} [{stream_name}] LLM评估返回无效结果,跳过。") + continue - # 可以选择性地为 FOCUSED 状态添加评估逻辑,例如判断是否降级回 CHAT 或 ABSENT + if should_deactivate: + logger.info(f"{log_prefix} [{stream_name}] LLM建议进入ABSENT状态。正在尝试转换...") + await sub_hf.change_chat_state(ChatState.ABSENT) + if sub_hf.chat_state.chat_status == ChatState.ABSENT: + transitioned_to_absent += 1 logger.info( f"{log_prefix} LLM评估周期结束。" @@ -470,38 +467,58 @@ class SubHeartflowManager: f" 成功转换到ABSENT: {transitioned_to_absent}." ) - async def _llm_evaluate_state_transition(self, prompt: str) -> bool: + async def _llm_evaluate_state_transition(self, prompt: str) -> Optional[bool]: """ - 使用 LLM 评估是否应进行状态转换。 + 使用 LLM 评估是否应进行状态转换,期望 LLM 返回 JSON 格式。 Args: - prompt: 提供给 LLM 的提示信息。 + prompt: 提供给 LLM 的提示信息,要求返回 {"decision": true/false}。 Returns: - bool: True 表示应该转换,False 表示不应该转换。 + Optional[bool]: 如果成功解析 LLM 的 JSON 响应并提取了 'decision' 键的值,则返回该布尔值。 + 如果 LLM 调用失败、返回无效 JSON 或 JSON 中缺少 'decision' 键或其值不是布尔型,则返回 None。 """ log_prefix = "[LLM状态评估]" try: # --- 真实的 LLM 调用 --- response_text, _ = await self.llm_state_evaluator.generate_response_async(prompt) - logger.debug(f"{log_prefix} 使用模型 {self.llm_state_evaluator.model_name} 评估,原始响应: {response_text}") - # 解析响应 - 这里需要根据你的LLM的确切输出来调整逻辑 - # 假设 LLM 会明确回答 "是" 或 "否" - if response_text and "是" in response_text.strip(): - logger.debug(f"{log_prefix} LLM评估结果: 建议转换 (响应包含 '是')") - return True - elif response_text and "否" in response_text.strip(): - logger.debug(f"{log_prefix} LLM评估结果: 建议不转换 (响应包含 '否')") - return False - else: - logger.warning(f"{log_prefix} LLM 未明确回答 '是' 或 '否',响应: {response_text}") - # 可以设定一个默认行为,例如默认不转换 - return False + logger.debug(f"{log_prefix} 使用模型 {self.llm_state_evaluator.model_name} 评估,原始响应: ```{response_text}```") + + # --- 解析 JSON 响应 --- + try: + # 尝试去除可能的Markdown代码块标记 + cleaned_response = response_text.strip().strip('`').strip() + if cleaned_response.startswith('json'): + cleaned_response = cleaned_response[4:].strip() + + data = json.loads(cleaned_response) + decision = data.get("decision") # 使用 .get() 避免 KeyError + + if isinstance(decision, bool): + logger.debug(f"{log_prefix} LLM评估结果 (来自JSON): {'建议转换' if decision else '建议不转换'}") + return decision + else: + logger.warning(f"{log_prefix} LLM 返回的 JSON 中 'decision' 键的值不是布尔型: {decision}。响应: {response_text}") + return None # 值类型不正确 + + except json.JSONDecodeError as json_err: + logger.warning(f"{log_prefix} LLM 返回的响应不是有效的 JSON: {json_err}。响应: {response_text}") + # 尝试在非JSON响应中查找关键词作为后备方案 (可选) + if "true" in response_text.lower(): + logger.debug(f"{log_prefix} 在非JSON响应中找到 'true',解释为建议转换") + return True + if "false" in response_text.lower(): + logger.debug(f"{log_prefix} 在非JSON响应中找到 'false',解释为建议不转换") + return False + return None # JSON 解析失败,也未找到关键词 + except Exception as parse_err: # 捕获其他可能的解析错误 + logger.warning(f"{log_prefix} 解析 LLM JSON 响应时发生意外错误: {parse_err}。响应: {response_text}") + return None except Exception as e: - logger.error(f"{log_prefix} 调用 LLM 进行状态评估时出错: {e}", exc_info=True) + logger.error(f"{log_prefix} 调用 LLM 或处理其响应时出错: {e}", exc_info=True) traceback.print_exc() - return False + return None # LLM 调用或处理失败 def count_subflows_by_state(self, state: ChatState) -> int: """统计指定状态的子心流数量""" diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index e9577e411..2a33d0671 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -404,10 +404,10 @@ class HeartFChatting: return False, "" # execute:执行 - with Timer("执行动作", cycle_timers): - return await self._handle_action( - action, reasoning, planner_result.get("emoji_query", ""), cycle_timers, planner_start_db_time - ) + + return await self._handle_action( + action, reasoning, planner_result.get("emoji_query", ""), cycle_timers, planner_start_db_time + ) except PlannerError as e: logger.error(f"{self.log_prefix} 规划错误: {e}") @@ -560,7 +560,7 @@ class HeartFChatting: observation = self.observations[0] if self.observations else None try: - with Timer("Wait New Msg", cycle_timers): + with Timer("等待新消息", cycle_timers): return await self._wait_for_new_message(observation, planner_start_db_time, self.log_prefix) except asyncio.CancelledError: logger.info(f"{self.log_prefix} 等待被中断") @@ -584,8 +584,8 @@ class HeartFChatting: logger.info(f"{log_prefix} 检测到新消息") return True - if time.monotonic() - wait_start_time > 300: - logger.warning(f"{log_prefix} 等待超时(300秒)") + if time.monotonic() - wait_start_time > 120: + logger.warning(f"{log_prefix} 等待超时(120秒)") return False await asyncio.sleep(1.5) @@ -604,8 +604,6 @@ class HeartFChatting: async def _handle_cycle_delay(self, action_taken_this_cycle: bool, cycle_start_time: float, log_prefix: str): """处理循环延迟""" cycle_duration = time.monotonic() - cycle_start_time - # if cycle_duration > 0.1: - # logger.debug(f"{log_prefix} HeartFChatting: 周期耗时 {cycle_duration:.2f}s.") try: sleep_duration = 0.0 diff --git a/src/plugins/heartFC_chat/heartflow_prompt_builder.py b/src/plugins/heartFC_chat/heartflow_prompt_builder.py index 584205a73..5308ce6e3 100644 --- a/src/plugins/heartFC_chat/heartflow_prompt_builder.py +++ b/src/plugins/heartFC_chat/heartflow_prompt_builder.py @@ -67,6 +67,7 @@ def init_prompt(): 2. 文字回复(text_reply)适用: - 有实质性内容需要表达 +- 有人提到你,但你还没有回应他 - 可以追加emoji_query表达情绪(格式:情绪描述,如"俏皮的调侃") - 不要追加太多表情 diff --git a/src/plugins/heartFC_chat/normal_chat.py b/src/plugins/heartFC_chat/normal_chat.py index 6687421e5..d7be9bef0 100644 --- a/src/plugins/heartFC_chat/normal_chat.py +++ b/src/plugins/heartFC_chat/normal_chat.py @@ -1,6 +1,7 @@ import time import asyncio import traceback +import statistics # 导入 statistics 模块 from random import random from typing import List, Optional # 导入 Optional @@ -46,6 +47,8 @@ class NormalChat: self.gpt = NormalChatGenerator() self.mood_manager = MoodManager.get_instance() # MoodManager 保持单例 # 存储此实例的兴趣监控任务 + self.start_time = time.time() + self._chat_task: Optional[asyncio.Task] = None logger.info(f"[{self.stream_name}] NormalChat 实例初始化完成。") @@ -317,6 +320,59 @@ class NormalChat: # 意愿管理器:注销当前message信息 (无论是否回复,只要处理过就删除) willing_manager.delete(message.message_info.message_id) + # --- 新增:处理初始高兴趣消息的私有方法 --- + async def _process_initial_interest_messages(self): + """处理启动时存在于 interest_dict 中的高兴趣消息。""" + items_to_process = list(self.interest_dict.items()) + if not items_to_process: + return # 没有初始消息,直接返回 + + logger.info(f"[{self.stream_name}] 发现 {len(items_to_process)} 条初始兴趣消息,开始处理高兴趣部分...") + interest_values = [item[1][1] for item in items_to_process] # 提取兴趣值列表 + + messages_to_reply = [] # 需要立即回复的消息 + + if len(interest_values) == 1: + # 如果只有一个消息,直接处理 + messages_to_reply.append(items_to_process[0]) + logger.info(f"[{self.stream_name}] 只有一条初始消息,直接处理。") + elif len(interest_values) > 1: + # 计算均值和标准差 + try: + mean_interest = statistics.mean(interest_values) + stdev_interest = statistics.stdev(interest_values) + threshold = mean_interest + stdev_interest + logger.info(f"[{self.stream_name}] 初始兴趣值 均值: {mean_interest:.2f}, 标准差: {stdev_interest:.2f}, 阈值: {threshold:.2f}") + + # 找出高于阈值的消息 + for item in items_to_process: + msg_id, (message, interest_value, is_mentioned) = item + if interest_value > threshold: + messages_to_reply.append(item) + logger.info(f"[{self.stream_name}] 找到 {len(messages_to_reply)} 条高于阈值的初始消息进行处理。") + except statistics.StatisticsError as e: + logger.error(f"[{self.stream_name}] 计算初始兴趣统计值时出错: {e},跳过初始处理。") + + # 处理需要回复的消息 + processed_count = 0 + for item in messages_to_reply: + msg_id, (message, interest_value, is_mentioned) = item + try: + logger.info(f"[{self.stream_name}] 处理初始高兴趣消息 {msg_id} (兴趣值: {interest_value:.2f})") + await self.normal_response( + message=message, is_mentioned=is_mentioned, interested_rate=interest_value + ) + processed_count += 1 + except Exception as e: + logger.error(f"[{self.stream_name}] 处理初始兴趣消息 {msg_id} 时出错: {e}\n{traceback.format_exc()}") + finally: + # 无论成功与否都清空兴趣字典 + self.interest_dict.clear() + + + logger.info(f"[{self.stream_name}] 初始高兴趣消息处理完毕,共处理 {processed_count} 条。剩余 {len(self.interest_dict)} 条待轮询。") + # --- 新增结束 --- + # 保持 staticmethod, 因为不依赖实例状态, 但需要 chat 对象来获取日志上下文 @staticmethod def _check_ban_words(text: str, chat: ChatStream, userinfo: UserInfo) -> bool: @@ -350,11 +406,20 @@ class NormalChat: # 改为实例方法, 移除 chat 参数 async def start_chat(self): - """为此 NormalChat 实例关联的 ChatStream 启动聊天任务(如果尚未运行)。""" + """为此 NormalChat 实例关联的 ChatStream 启动聊天任务(如果尚未运行), + 并在启动前处理一次初始的高兴趣消息。""" if self._chat_task is None or self._chat_task.done(): + # --- 修改:调用新的私有方法处理初始消息 --- + await self._process_initial_interest_messages() + # --- 修改结束 --- + + # 启动后台轮询任务 + logger.info(f"[{self.stream_name}] 启动后台兴趣消息轮询任务...") task = asyncio.create_task(self._reply_interested_message()) task.add_done_callback(lambda t: self._handle_task_completion(t)) # 回调现在是实例方法 self._chat_task = task + else: + logger.info(f"[{self.stream_name}] 聊天任务已在运行中。") def _handle_task_completion(self, task: asyncio.Task): """任务完成回调处理"""