From e5a756f156cb882f37a29c050af07952e77a6dbe Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sun, 11 May 2025 01:26:08 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E5=8D=96=E7=9B=B8=E6=96=B0HFC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + src/heart_flow/mai_state_manager.py | 4 +- src/heart_flow/sub_heartflow.py | 2 +- src/heart_flow/sub_mind.py | 323 ++++++------------ src/heart_flow/tool_user.py | 55 ++- src/plugins/heartFC_chat/cycle_analyzer.py | 215 ++++++++++++ src/plugins/heartFC_chat/cycle_viewer.py | 167 +++++++++ src/plugins/heartFC_chat/heartFC_Cycleinfo.py | 230 +++++++++++++ src/plugins/heartFC_chat/heartFC_chat.py | 234 ++++++++++--- .../heartFC_chat/heartflow_prompt_builder.py | 49 ++- 10 files changed, 990 insertions(+), 290 deletions(-) create mode 100644 src/plugins/heartFC_chat/cycle_analyzer.py create mode 100644 src/plugins/heartFC_chat/cycle_viewer.py diff --git a/.gitignore b/.gitignore index 9e1b96811..2010be88b 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ MaiBot-Napcat-Adapter nonebot-maibot-adapter/ *.zip run.bat +log_debug/ run_none.bat run.py message_queue_content.txt diff --git a/src/heart_flow/mai_state_manager.py b/src/heart_flow/mai_state_manager.py index d289a94a1..71e2abbe7 100644 --- a/src/heart_flow/mai_state_manager.py +++ b/src/heart_flow/mai_state_manager.py @@ -13,8 +13,8 @@ logger = get_logger("mai_state") # The line `enable_unlimited_hfc_chat = False` is setting a configuration parameter that controls # whether a specific debugging feature is enabled or not. When `enable_unlimited_hfc_chat` is set to # `False`, it means that the debugging feature for unlimited focused chatting is disabled. -# enable_unlimited_hfc_chat = True # 调试用:无限专注聊天 -enable_unlimited_hfc_chat = False +enable_unlimited_hfc_chat = True # 调试用:无限专注聊天 +# enable_unlimited_hfc_chat = False prevent_offline_state = True # 目前默认不启用OFFLINE状态 diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index e2a36dbd7..5be0d73cd 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -250,7 +250,7 @@ class SubHeartflow: elif new_state == ChatState.ABSENT: logger.info(f"{log_prefix} 进入 ABSENT 状态,停止所有聊天活动...") - await self.clear_interest_dict() + self.clear_interest_dict() await self._stop_normal_chat() await self._stop_heart_fc_chat() diff --git a/src/heart_flow/sub_mind.py b/src/heart_flow/sub_mind.py index 2a87f70c3..9379b8b0c 100644 --- a/src/heart_flow/sub_mind.py +++ b/src/heart_flow/sub_mind.py @@ -26,84 +26,35 @@ def init_prompt(): group_prompt = """ {extra_info} {relation_prompt} -你的名字是{bot_name},{prompt_personality} -{last_loop_prompt} +你的名字是{bot_name},{prompt_personality},你现在{mood_info} {cycle_info_block} 现在是{time_now},你正在上网,和qq群里的网友们聊天,以下是正在进行的聊天内容: {chat_observe_info} -你现在{mood_info} -请仔细阅读当前群聊内容,分析讨论话题和群成员关系,分析你刚刚发言和别人对你的发言的反应,思考你要不要回复。然后思考你是否需要使用函数工具。 -思考并输出你的内心想法 -输出要求: +以下是你之前对这个群聊的陈述: +{last_mind} + +现在请你继续输出思考内容,输出要求: 1. 根据聊天内容生成你的想法,{hf_do_next} -2. 不要分点、不要使用表情符号 -3. 避免多余符号(冒号、引号、括号等) -4. 语言简洁自然,不要浮夸 -5. 如果你刚发言,并且没有人回复你,不要回复 -工具使用说明: -1. 输出想法后考虑是否需要使用工具 -2. 工具可获取信息或执行操作 -3. 如需处理消息或回复,请使用工具。""" +2. 参考之前的思考,基于之前的内容对这个群聊继续陈述,可以删除不重要的内容,添加新的内容 +3. 思考群内进行的话题,话题由谁发起,进展状况如何,你如何参与?思考你在群聊天中的角色,你是一个什么样的人,你在这个群聊中扮演什么角色? +4. 注意群聊的时间线索,思考聊天的时间线。 +5. 请结合你做出的行为,对前面的陈述进行补充 +6. 语言简洁自然,不要分点,不要浮夸,不要修辞,仅输出思考内容就好""" Prompt(group_prompt, "sub_heartflow_prompt_before") # --- Private Chat Prompt --- private_prompt = """ {extra_info} {relation_prompt} -你的名字是{bot_name},{prompt_personality} -{last_loop_prompt} +你的名字是{bot_name},{prompt_personality},你现在{mood_info} {cycle_info_block} 现在是{time_now},你正在上网,和 {chat_target_name} 私聊,以下是你们的聊天内容: {chat_observe_info} -你现在{mood_info} -请仔细阅读聊天内容,想想你和 {chat_target_name} 的关系,回顾你们刚刚的交流,你刚刚发言和对方的反应,思考聊天的主题。 -请思考你要不要回复以及如何回复对方。然后思考你是否需要使用函数工具。 -思考并输出你的内心想法 -输出要求: -1. 根据聊天内容生成你的想法,{hf_do_next} -2. 不要分点、不要使用表情符号 -3. 避免多余符号(冒号、引号、括号等) -4. 语言简洁自然,不要浮夸 -5. 如果你刚发言,对方没有回复你,请谨慎回复 -工具使用说明: -1. 输出想法后考虑是否需要使用工具 -2. 工具可获取信息或执行操作 -3. 如需处理消息或回复,请使用工具。""" - Prompt(private_prompt, "sub_heartflow_prompt_private_before") +以下是你之前在这个群聊中的思考: +{last_mind} - # --- 并行模式的Group Chat Prompt --- - parallel_group_prompt = """ -{extra_info} -{relation_prompt} -你的名字是{bot_name},{prompt_personality} -{last_loop_prompt} -{cycle_info_block} -现在是{time_now},你正在上网,和qq群里的网友们聊天,以下是正在进行的聊天内容: -{chat_observe_info} - -你现在{mood_info} -请仔细阅读当前群聊内容,分析讨论话题和群成员关系,分析你刚刚发言和别人对你的发言的反应,思考你要不要回复。 -思考并输出你的内心想法 -输出要求: -1. 根据聊天内容生成你的想法,{hf_do_next} -2. 不要分点、不要使用表情符号 -3. 避免多余符号(冒号、引号、括号等) -4. 语言简洁自然,不要浮夸""" - Prompt(parallel_group_prompt, "sub_heartflow_prompt_parallel") - - # --- 并行模式的Private Chat Prompt --- - parallel_private_prompt = """ -{extra_info} -{relation_prompt} -你的名字是{bot_name},{prompt_personality} -{last_loop_prompt} -{cycle_info_block} -现在是{time_now},你正在上网,和 {chat_target_name} 私聊,以下是你们的聊天内容: -{chat_observe_info} - -你现在{mood_info} 请仔细阅读聊天内容,想想你和 {chat_target_name} 的关系,回顾你们刚刚的交流,你刚刚发言和对方的反应,思考聊天的主题。 请思考你要不要回复以及如何回复对方。 思考并输出你的内心想法 @@ -113,14 +64,7 @@ def init_prompt(): 3. 避免多余符号(冒号、引号、括号等) 4. 语言简洁自然,不要浮夸 5. 如果你刚发言,对方没有回复你,请谨慎回复""" - Prompt(parallel_private_prompt, "sub_heartflow_prompt_private_parallel") - - # --- Last Loop Prompt (remains the same) --- - last_loop_t = """ -刚刚你的内心想法是:{current_thinking_info} -{if_replan_prompt} -""" - Prompt(last_loop_t, "last_loop") + Prompt(private_prompt, "sub_heartflow_prompt_private_before") def calculate_similarity(text_a: str, text_b: str) -> float: @@ -177,10 +121,6 @@ class SubMind: self.structured_info = [] self.structured_info_str = "" - # 并行模式设置,从全局配置获取 - # 此变量将在构建提示词时使用,决定使用哪个模板 - self.parallel_mode = False # 默认为False,将在do_thinking_before_reply中检查心流的模式设置 - name = chat_manager.get_stream_name(self.subheartflow_id) self.log_prefix = f"[{name}] " self._update_structured_info_str() @@ -213,29 +153,28 @@ class SubMind: self.structured_info_str = "\n".join(lines) logger.debug(f"{self.log_prefix} 更新 structured_info_str: \n{self.structured_info_str}") - async def do_thinking_before_reply(self, history_cycle: list[CycleInfo] = None, parallel_mode: bool = False, no_tools: bool = False): + async def do_thinking_before_reply(self, history_cycle: list[CycleInfo] = None, parallel_mode: bool = True, no_tools: bool = True, return_prompt: bool = False, cycle_info: CycleInfo = None): """ 在回复前进行思考,生成内心想法并收集工具调用结果 参数: history_cycle: 历史循环信息 - parallel_mode: 是否在并行模式下执行,默认为False - no_tools: 是否禁用工具调用,默认为False + parallel_mode: 是否在并行模式下执行,默认为True + no_tools: 是否禁用工具调用,默认为True + return_prompt: 是否返回prompt,默认为False + cycle_info: 循环信息对象,可用于记录详细执行信息 返回: - tuple: (current_mind, past_mind) 当前想法和过去的想法列表 + 如果return_prompt为False: + tuple: (current_mind, past_mind) 当前想法和过去的想法列表 + 如果return_prompt为True: + tuple: (current_mind, past_mind, prompt) 当前想法、过去的想法列表和使用的prompt """ - # 设置并行模式 - self.parallel_mode = parallel_mode - # 更新活跃时间 self.last_active_time = time.time() # ---------- 0. 更新和清理 structured_info ---------- if self.structured_info: - logger.debug( - f"{self.log_prefix} 更新前的 structured_info: {safe_json_dumps(self.structured_info, ensure_ascii=False)}" - ) updated_info = [] for item in self.structured_info: item["ttl"] -= 1 @@ -244,9 +183,6 @@ class SubMind: else: logger.debug(f"{self.log_prefix} 移除过期的 structured_info 项: {item['id']}") self.structured_info = updated_info - logger.debug( - f"{self.log_prefix} 更新后的 structured_info: {safe_json_dumps(self.structured_info, ensure_ascii=False)}" - ) self._update_structured_info_str() logger.debug( f"{self.log_prefix} 当前完整的 structured_info: {safe_json_dumps(self.structured_info, ensure_ascii=False)}" @@ -265,7 +201,6 @@ class SubMind: return self.current_mind, self.past_mind is_group_chat = observation.is_group_chat - # logger.debug(f"is_group_chat: {is_group_chat}") chat_target_info = observation.chat_target_info chat_target_name = "对方" # Default for private @@ -273,7 +208,6 @@ class SubMind: chat_target_name = ( chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or chat_target_name ) - # --- End getting observation info --- # 获取观察内容 chat_observe_info = observation.get_observe_info() @@ -320,21 +254,14 @@ class SubMind: logger.error(f"{self.log_prefix} 获取记忆时出错: {e}") logger.error(traceback.format_exc()) - # ---------- 3. 准备工具和个性化数据 ---------- - # 初始化工具 - tool_instance = ToolUser() - tools = tool_instance._define_tools() - + # ---------- 3. 准备个性化数据 ---------- # 获取个性化信息 individuality = Individuality.get_instance() relation_prompt = "" - # print(f"person_list: {person_list}") for person in person_list: relation_prompt += await relationship_manager.build_relationship_info(person, is_id=True) - # print(f"relat22222ion_prompt: {relation_prompt}") - # 构建个性部分 prompt_personality = individuality.get_prompt(x_person=2, level=2) @@ -355,28 +282,6 @@ class SubMind: ("进行深入思考", 0.2), ] - last_cycle = history_cycle[-1] if history_cycle else None - # 上一次决策信息 - if last_cycle is not None: - last_action = last_cycle.action_type - last_reasoning = last_cycle.reasoning - is_replan = last_cycle.replanned - if is_replan: - if_replan_prompt = f"但是你有了上述想法之后,有了新消息,你决定重新思考后,你做了:{last_action}\n因为:{last_reasoning}\n" - else: - if_replan_prompt = f"出于这个想法,你刚才做了:{last_action}\n因为:{last_reasoning}\n" - else: - last_action = "" - last_reasoning = "" - is_replan = False - if_replan_prompt = "" - if previous_mind: - last_loop_prompt = (await global_prompt_manager.get_prompt_async("last_loop")).format( - current_thinking_info=previous_mind, if_replan_prompt=if_replan_prompt - ) - else: - last_loop_prompt = "" - # 准备循环信息块 (分析最近的活动循环) recent_active_cycles = [] for cycle in reversed(history_cycle): @@ -426,23 +331,11 @@ class SubMind: )[0] # ---------- 5. 构建最终提示词 ---------- - # --- 根据并行模式和聊天类型选择模板 --- - logger.debug(f"is_group_chat: {is_group_chat}, parallel_mode: {self.parallel_mode}") + # --- 根据聊天类型选择模板 --- + logger.debug(f"is_group_chat: {is_group_chat}") - if is_group_chat: - if self.parallel_mode: - template_name = "sub_heartflow_prompt_parallel" - logger.debug(f"{self.log_prefix} 使用并行模式群聊思考模板") - else: - template_name = "sub_heartflow_prompt_before" - logger.debug(f"{self.log_prefix} 使用标准模式群聊思考模板") - else: # Private chat - if self.parallel_mode: - template_name = "sub_heartflow_prompt_private_parallel" - logger.debug(f"{self.log_prefix} 使用并行模式私聊思考模板") - else: - template_name = "sub_heartflow_prompt_private_before" - logger.debug(f"{self.log_prefix} 使用标准模式私聊思考模板") + template_name = "sub_heartflow_prompt_before" if is_group_chat else "sub_heartflow_prompt_private_before" + logger.debug(f"{self.log_prefix} 使用{'群聊' if is_group_chat else '私聊'}思考模板") prompt = (await global_prompt_manager.get_prompt_async(template_name)).format( extra_info=self.structured_info_str, @@ -453,48 +346,33 @@ class SubMind: chat_observe_info=chat_observe_info, mood_info=mood_info, hf_do_next=hf_do_next, - last_loop_prompt=last_loop_prompt, + last_mind = previous_mind, cycle_info_block=cycle_info_block, chat_target_name=chat_target_name, ) - # ---------- 6. 执行LLM请求并处理响应 ---------- + # 在构建完提示词后,生成最终的prompt字符串 + final_prompt = prompt + + # ---------- 6. 调用LLM ---------- + # 如果指定了cycle_info,记录structured_info和prompt + if cycle_info: + cycle_info.set_submind_info( + prompt=final_prompt, + structured_info=self.structured_info_str + ) + content = "" # 初始化内容变量 - _reasoning_content = "" # 初始化推理内容变量 try: # 调用LLM生成响应 - response, _reasoning_content, tool_calls = await self.llm_model.generate_response_tool_async( - prompt=prompt, tools=tools + response = await self.llm_model.generate_response_async( + prompt=final_prompt ) - - logger.debug(f"{self.log_prefix} 子心流输出的原始LLM响应: {response}") - + # 直接使用LLM返回的文本响应作为 content content = response if response else "" - if tool_calls and not no_tools: - # 只有在no_tools=False时才执行工具调用 - success, valid_tool_calls, error_msg = process_llm_tool_calls( - tool_calls, log_prefix=f"{self.log_prefix} " - ) - - if success and valid_tool_calls: - # 记录工具调用信息 - tool_calls_str = ", ".join( - [call.get("function", {}).get("name", "未知工具") for call in valid_tool_calls] - ) - logger.info(f"{self.log_prefix} 模型请求调用{len(valid_tool_calls)}个工具: {tool_calls_str}") - - # 收集工具执行结果 - await self._execute_tool_calls(valid_tool_calls, tool_instance) - elif not success: - logger.warning(f"{self.log_prefix} 处理工具调用时出错: {error_msg}") - elif no_tools and tool_calls: - logger.info(f"{self.log_prefix} 模型请求了工具调用,但no_tools=True,跳过执行") - else: - logger.info(f"{self.log_prefix} 心流未使用工具") - except Exception as e: # 处理总体异常 logger.error(f"{self.log_prefix} 执行LLM请求或处理响应时出错: {e}") @@ -502,7 +380,7 @@ class SubMind: content = "思考过程中出现错误" # 记录初步思考结果 - logger.debug(f"{self.log_prefix} 初步心流思考结果: {content}\nprompt: {prompt}\n") + logger.debug(f"{self.log_prefix} 初步心流思考结果: {content}\nprompt: {final_prompt}\n") # 处理空响应情况 if not content: @@ -544,16 +422,67 @@ class SubMind: else: # 相似度较高但非100%,执行标准去重逻辑 logger.debug(f"{self.log_prefix} 执行概率性去重 (概率: {replacement_prob:.2f})...") + logger.debug(f"{self.log_prefix} previous_mind类型: {type(previous_mind)}, new_content类型: {type(new_content)}") + matcher = difflib.SequenceMatcher(None, previous_mind, new_content) + logger.debug(f"{self.log_prefix} matcher类型: {type(matcher)}") + deduplicated_parts = [] last_match_end_in_b = 0 - for _i, j, n in matcher.get_matching_blocks(): - if last_match_end_in_b < j: - deduplicated_parts.append(new_content[last_match_end_in_b:j]) - last_match_end_in_b = j + n - - deduplicated_content = "".join(deduplicated_parts).strip() - + + # 获取并记录所有匹配块 + matching_blocks = matcher.get_matching_blocks() + logger.debug(f"{self.log_prefix} 匹配块数量: {len(matching_blocks)}") + logger.debug(f"{self.log_prefix} 匹配块示例(前3个): {matching_blocks[:3] if len(matching_blocks) > 3 else matching_blocks}") + + # get_matching_blocks()返回形如[(i, j, n), ...]的列表,其中i是a中的索引,j是b中的索引,n是匹配的长度 + for idx, match in enumerate(matching_blocks): + if not isinstance(match, tuple): + logger.error(f"{self.log_prefix} 匹配块 {idx} 不是元组类型,而是 {type(match)}: {match}") + continue + + try: + _i, j, n = match # 解包元组为三个变量 + logger.debug(f"{self.log_prefix} 匹配块 {idx}: i={_i}, j={j}, n={n}") + + if last_match_end_in_b < j: + # 确保添加的是字符串,而不是元组 + try: + non_matching_part = new_content[last_match_end_in_b:j] + logger.debug(f"{self.log_prefix} 添加非匹配部分: '{non_matching_part}', 类型: {type(non_matching_part)}") + if not isinstance(non_matching_part, str): + logger.warning(f"{self.log_prefix} 非匹配部分不是字符串类型: {type(non_matching_part)}") + non_matching_part = str(non_matching_part) + deduplicated_parts.append(non_matching_part) + except Exception as e: + logger.error(f"{self.log_prefix} 处理非匹配部分时出错: {e}") + logger.error(traceback.format_exc()) + last_match_end_in_b = j + n + except Exception as e: + logger.error(f"{self.log_prefix} 处理匹配块时出错: {e}") + logger.error(traceback.format_exc()) + + logger.debug(f"{self.log_prefix} 去重前部分列表: {deduplicated_parts}") + logger.debug(f"{self.log_prefix} 列表元素类型: {[type(part) for part in deduplicated_parts]}") + + # 确保所有元素都是字符串 + deduplicated_parts = [str(part) for part in deduplicated_parts] + + # 防止列表为空 + if not deduplicated_parts: + logger.warning(f"{self.log_prefix} 去重后列表为空,添加空字符串") + deduplicated_parts = [""] + + logger.debug(f"{self.log_prefix} 处理后的部分列表: {deduplicated_parts}") + + try: + deduplicated_content = "".join(deduplicated_parts).strip() + logger.debug(f"{self.log_prefix} 拼接后的去重内容: '{deduplicated_content}'") + except Exception as e: + logger.error(f"{self.log_prefix} 拼接去重内容时出错: {e}") + logger.error(traceback.format_exc()) + deduplicated_content = "" + if deduplicated_content: # 根据概率决定是否添加词语 prefix_str = "" @@ -587,44 +516,16 @@ class SubMind: # 更新当前思考内容 self.update_current_mind(content) - return self.current_mind, self.past_mind - - async def _execute_tool_calls(self, tool_calls, tool_instance): - """ - 执行一组工具调用并收集结果 - - 参数: - tool_calls: 工具调用列表 - tool_instance: 工具使用器实例 - """ - tool_results = [] - new_structured_items = [] # 收集新产生的结构化信息 - - # 执行所有工具调用 - for tool_call in tool_calls: - try: - result = await tool_instance._execute_tool_call(tool_call) - if result: - tool_results.append(result) - # 创建新的结构化信息项 - new_item = { - "type": result.get("type", "unknown_type"), # 使用 'type' 键 - "id": result.get("id", f"fallback_id_{time.time()}"), # 使用 'id' 键 - "content": result.get("content", ""), # 'content' 键保持不变 - "ttl": 3, - } - new_structured_items.append(new_item) - - except Exception as tool_e: - logger.error(f"[{self.subheartflow_id}] 工具执行失败: {tool_e}") - logger.error(traceback.format_exc()) # 添加 traceback 记录 - - # 如果有新的工具结果,记录并更新结构化信息 - if new_structured_items: - self.structured_info.extend(new_structured_items) # 添加到现有列表 - logger.debug(f"工具调用收集到新的结构化信息: {safe_json_dumps(new_structured_items, ensure_ascii=False)}") - # logger.debug(f"当前完整的 structured_info: {safe_json_dumps(self.structured_info, ensure_ascii=False)}") # 可以取消注释以查看完整列表 - self._update_structured_info_str() # 添加新信息后,更新字符串表示 + # 在原始代码的return语句前,记录结果并根据return_prompt决定返回值 + if cycle_info: + cycle_info.set_submind_info( + result=content + ) + + if return_prompt: + return content, self.past_mind, final_prompt + else: + return content, self.past_mind def update_current_mind(self, response): if self.current_mind: # 只有当 current_mind 非空时才添加到 past_mind diff --git a/src/heart_flow/tool_user.py b/src/heart_flow/tool_user.py index 5c521614a..25345819d 100644 --- a/src/heart_flow/tool_user.py +++ b/src/heart_flow/tool_user.py @@ -56,15 +56,30 @@ class ToolExecutor: self.subheartflow_id = subheartflow_id self.log_prefix = f"[{subheartflow_id}:ToolExecutor] " self.llm_model = LLMRequest( - model=global_config.llm_sub_heartflow, # 为工具执行器配置单独的模型 - temperature=global_config.llm_sub_heartflow["temp"], - max_tokens=800, + model=global_config.llm_summary, # 为工具执行器配置单独的模型 + # temperature=global_config.llm_summary["temp"], + # max_tokens=800, request_type="tool_execution", ) self.structured_info = [] - async def execute_tools(self, sub_mind: SubMind, chat_target_name="对方", is_group_chat=False): - """并行执行工具,返回结构化信息""" + async def execute_tools(self, sub_mind: SubMind, chat_target_name="对方", is_group_chat=False, return_details=False, cycle_info=None): + """ + 并行执行工具,返回结构化信息 + + 参数: + sub_mind: 子思维对象 + chat_target_name: 聊天目标名称,默认为"对方" + is_group_chat: 是否为群聊,默认为False + return_details: 是否返回详细信息,默认为False + cycle_info: 循环信息对象,可用于记录详细执行信息 + + 返回: + 如果return_details为False: + List[Dict]: 工具执行结果的结构化信息列表 + 如果return_details为True: + Tuple[List[Dict], List[str], str]: (工具执行结果列表, 使用的工具列表, 工具执行提示词) + """ # 初始化工具 tool_instance = ToolUser() tools = tool_instance._define_tools() @@ -107,19 +122,36 @@ class ToolExecutor: time_now=time_now ) + # 如果指定了cycle_info,记录工具执行的prompt + if cycle_info: + cycle_info.set_tooluse_info(prompt=prompt) + # 调用LLM,专注于工具使用 + logger.info(f"开始执行工具调用{prompt}") response, _, tool_calls = await self.llm_model.generate_response_tool_async( prompt=prompt, tools=tools ) + logger.debug(f"获取到工具原始输出:\n{tool_calls}") # 处理工具调用和结果收集,类似于SubMind中的逻辑 new_structured_items = [] + used_tools = [] # 记录使用了哪些工具 + if tool_calls: success, valid_tool_calls, error_msg = process_llm_tool_calls(tool_calls) if success and valid_tool_calls: for tool_call in valid_tool_calls: try: + # 记录使用的工具名称 + tool_name = tool_call.get("name", "unknown_tool") + used_tools.append(tool_name) + result = await tool_instance._execute_tool_call(tool_call) + + name = result.get("type", "unknown_type") + content = result.get("content", "") + + logger.info(f"工具{name},获得信息:{content}") if result: new_item = { "type": result.get("type", "unknown_type"), @@ -131,7 +163,18 @@ class ToolExecutor: except Exception as e: logger.error(f"{self.log_prefix}工具执行失败: {e}") - return new_structured_items + # 如果指定了cycle_info,记录工具执行结果 + if cycle_info: + cycle_info.set_tooluse_info( + tools_used=used_tools, + tool_results=new_structured_items + ) + + # 根据return_details决定返回值 + if return_details: + return new_structured_items, used_tools, prompt + else: + return new_structured_items init_prompt() \ No newline at end of file diff --git a/src/plugins/heartFC_chat/cycle_analyzer.py b/src/plugins/heartFC_chat/cycle_analyzer.py new file mode 100644 index 000000000..a36bd8416 --- /dev/null +++ b/src/plugins/heartFC_chat/cycle_analyzer.py @@ -0,0 +1,215 @@ +import os +import time +import json +from typing import List, Dict, Any, Optional, Tuple +from src.plugins.heartFC_chat.heartFC_Cycleinfo import CycleInfo +from src.common.logger_manager import get_logger + +logger = get_logger("cycle_analyzer") + +class CycleAnalyzer: + """循环信息分析类,提供查询和分析CycleInfo的工具""" + + def __init__(self, base_dir: str = "log_debug"): + """ + 初始化分析器 + + 参数: + base_dir: 存储CycleInfo的基础目录,默认为log_debug + """ + self.base_dir = base_dir + + def list_streams(self) -> List[str]: + """ + 获取所有聊天流ID列表 + + 返回: + List[str]: 聊天流ID列表 + """ + try: + if not os.path.exists(self.base_dir): + return [] + + return [d for d in os.listdir(self.base_dir) + if os.path.isdir(os.path.join(self.base_dir, d))] + except Exception as e: + logger.error(f"获取聊天流列表时出错: {e}") + return [] + + def get_stream_cycle_count(self, stream_id: str) -> int: + """ + 获取指定聊天流的循环数量 + + 参数: + stream_id: 聊天流ID + + 返回: + int: 循环数量 + """ + try: + files = CycleInfo.list_cycles(stream_id, self.base_dir) + return len(files) + except Exception as e: + logger.error(f"获取聊天流循环数量时出错: {e}") + return 0 + + def get_stream_cycles(self, stream_id: str, start: int = 0, limit: int = -1) -> List[str]: + """ + 获取指定聊天流的循环文件列表 + + 参数: + stream_id: 聊天流ID + start: 起始索引,默认为0 + limit: 返回的最大数量,默认为-1(全部) + + 返回: + List[str]: 循环文件路径列表 + """ + try: + files = CycleInfo.list_cycles(stream_id, self.base_dir) + if limit < 0: + return files[start:] + else: + return files[start:start+limit] + except Exception as e: + logger.error(f"获取聊天流循环文件列表时出错: {e}") + return [] + + def get_cycle_content(self, filepath: str) -> str: + """ + 获取循环文件的内容 + + 参数: + filepath: 文件路径 + + 返回: + str: 文件内容 + """ + try: + if not os.path.exists(filepath): + return f"文件不存在: {filepath}" + + with open(filepath, 'r', encoding='utf-8') as f: + return f.read() + except Exception as e: + logger.error(f"读取循环文件内容时出错: {e}") + return f"读取文件出错: {e}" + + def analyze_stream_cycles(self, stream_id: str) -> Dict[str, Any]: + """ + 分析指定聊天流的所有循环,生成统计信息 + + 参数: + stream_id: 聊天流ID + + 返回: + Dict[str, Any]: 统计信息 + """ + try: + files = CycleInfo.list_cycles(stream_id, self.base_dir) + if not files: + return {"error": "没有找到循环记录"} + + total_cycles = len(files) + action_counts = {"text_reply": 0, "emoji_reply": 0, "no_reply": 0, "unknown": 0} + total_duration = 0 + tool_usage = {} + + for filepath in files: + with open(filepath, 'r', encoding='utf-8') as f: + content = f.read() + + # 解析动作类型 + for line in content.split('\n'): + if line.startswith("动作:"): + action = line[3:].strip() + action_counts[action] = action_counts.get(action, 0) + 1 + + # 解析耗时 + elif line.startswith("耗时:"): + try: + duration = float(line[3:].strip().split('秒')[0]) + total_duration += duration + except: + pass + + # 解析工具使用 + elif line.startswith("使用的工具:"): + tools = line[6:].strip().split(', ') + for tool in tools: + tool_usage[tool] = tool_usage.get(tool, 0) + 1 + + avg_duration = total_duration / total_cycles if total_cycles > 0 else 0 + + return { + "总循环数": total_cycles, + "动作统计": action_counts, + "平均耗时": f"{avg_duration:.2f}秒", + "总耗时": f"{total_duration:.2f}秒", + "工具使用次数": tool_usage + } + except Exception as e: + logger.error(f"分析聊天流循环时出错: {e}") + return {"error": f"分析出错: {e}"} + + def get_latest_cycles(self, count: int = 10) -> List[Tuple[str, str]]: + """ + 获取所有聊天流中最新的几个循环 + + 参数: + count: 获取的数量,默认为10 + + 返回: + List[Tuple[str, str]]: 聊天流ID和文件路径的元组列表 + """ + try: + all_cycles = [] + streams = self.list_streams() + + for stream_id in streams: + files = CycleInfo.list_cycles(stream_id, self.base_dir) + for filepath in files: + try: + # 从文件名中提取时间戳 + filename = os.path.basename(filepath) + timestamp_str = filename.split('_', 2)[2].split('.')[0] + timestamp = time.mktime(time.strptime(timestamp_str, "%Y%m%d_%H%M%S")) + all_cycles.append((timestamp, stream_id, filepath)) + except: + continue + + # 按时间戳排序,取最新的count个 + all_cycles.sort(reverse=True) + return [(item[1], item[2]) for item in all_cycles[:count]] + except Exception as e: + logger.error(f"获取最新循环时出错: {e}") + return [] + + +# 使用示例 +if __name__ == "__main__": + analyzer = CycleAnalyzer() + + # 列出所有聊天流 + streams = analyzer.list_streams() + print(f"找到 {len(streams)} 个聊天流: {streams}") + + # 分析第一个聊天流的循环 + if streams: + stream_id = streams[0] + stats = analyzer.analyze_stream_cycles(stream_id) + print(f"\n聊天流 {stream_id} 的统计信息:") + for key, value in stats.items(): + print(f" {key}: {value}") + + # 获取最新的循环 + cycles = analyzer.get_stream_cycles(stream_id, limit=1) + if cycles: + print(f"\n最新循环内容:") + print(analyzer.get_cycle_content(cycles[0])) + + # 获取所有聊天流中最新的3个循环 + latest_cycles = analyzer.get_latest_cycles(3) + print(f"\n所有聊天流中最新的 {len(latest_cycles)} 个循环:") + for stream_id, filepath in latest_cycles: + print(f" 聊天流 {stream_id}: {os.path.basename(filepath)}") \ No newline at end of file diff --git a/src/plugins/heartFC_chat/cycle_viewer.py b/src/plugins/heartFC_chat/cycle_viewer.py new file mode 100644 index 000000000..fbbd5626e --- /dev/null +++ b/src/plugins/heartFC_chat/cycle_viewer.py @@ -0,0 +1,167 @@ +import os +import sys +import argparse +from typing import List, Dict, Any +from src.plugins.heartFC_chat.cycle_analyzer import CycleAnalyzer + +def print_section(title: str, width: int = 80): + """打印分隔线和标题""" + print("\n" + "=" * width) + print(f" {title} ".center(width, "=")) + print("=" * width) + +def list_streams_cmd(analyzer: CycleAnalyzer, args: argparse.Namespace): + """列出所有聊天流""" + print_section("所有聊天流") + streams = analyzer.list_streams() + + if not streams: + print("没有找到任何聊天流记录。") + return + + for i, stream_id in enumerate(streams): + count = analyzer.get_stream_cycle_count(stream_id) + print(f"[{i+1}] {stream_id} - {count} 个循环") + +def analyze_stream_cmd(analyzer: CycleAnalyzer, args: argparse.Namespace): + """分析指定聊天流的循环信息""" + stream_id = args.stream_id + + print_section(f"聊天流 {stream_id} 分析") + stats = analyzer.analyze_stream_cycles(stream_id) + + if "error" in stats: + print(f"错误: {stats['error']}") + return + + print("基本统计:") + print(f" 总循环数: {stats['总循环数']}") + print(f" 总耗时: {stats['总耗时']}") + print(f" 平均耗时: {stats['平均耗时']}") + + print("\n动作统计:") + for action, count in stats['动作统计'].items(): + if count > 0: + percent = (count / stats['总循环数']) * 100 + print(f" {action}: {count} ({percent:.1f}%)") + + if stats.get('工具使用次数'): + print("\n工具使用次数:") + for tool, count in stats['工具使用次数'].items(): + print(f" {tool}: {count}") + +def list_cycles_cmd(analyzer: CycleAnalyzer, args: argparse.Namespace): + """列出指定聊天流的循环""" + stream_id = args.stream_id + limit = args.limit if args.limit > 0 else -1 + + print_section(f"聊天流 {stream_id} 的循环列表") + cycles = analyzer.get_stream_cycles(stream_id) + + if not cycles: + print("没有找到任何循环记录。") + return + + if limit > 0: + cycles = cycles[-limit:] # 取最新的limit个 + print(f"显示最新的 {limit} 个循环 (共 {len(cycles)} 个):") + else: + print(f"共找到 {len(cycles)} 个循环:") + + for i, filepath in enumerate(cycles): + filename = os.path.basename(filepath) + cycle_id = filename.split('_')[1] + timestamp = filename.split('_', 2)[2].split('.')[0] + print(f"[{i+1}] 循环ID: {cycle_id}, 时间: {timestamp}, 文件: {filename}") + +def view_cycle_cmd(analyzer: CycleAnalyzer, args: argparse.Namespace): + """查看指定循环的详细信息""" + stream_id = args.stream_id + cycle_index = args.cycle_index - 1 # 转换为0-based索引 + + cycles = analyzer.get_stream_cycles(stream_id) + if not cycles: + print(f"错误: 聊天流 {stream_id} 没有找到任何循环记录。") + return + + if cycle_index < 0 or cycle_index >= len(cycles): + print(f"错误: 循环索引 {args.cycle_index} 超出范围 (1-{len(cycles)})。") + return + + filepath = cycles[cycle_index] + filename = os.path.basename(filepath) + + print_section(f"循环详情: {filename}") + content = analyzer.get_cycle_content(filepath) + print(content) + +def latest_cycles_cmd(analyzer: CycleAnalyzer, args: argparse.Namespace): + """查看所有聊天流中最新的几个循环""" + count = args.count if args.count > 0 else 10 + + print_section(f"最新的 {count} 个循环") + latest_cycles = analyzer.get_latest_cycles(count) + + if not latest_cycles: + print("没有找到任何循环记录。") + return + + for i, (stream_id, filepath) in enumerate(latest_cycles): + filename = os.path.basename(filepath) + cycle_id = filename.split('_')[1] + timestamp = filename.split('_', 2)[2].split('.')[0] + print(f"[{i+1}] 聊天流: {stream_id}, 循环ID: {cycle_id}, 时间: {timestamp}") + + # 可以选择性添加提取基本信息的功能 + with open(filepath, 'r', encoding='utf-8') as f: + for line in f: + if line.startswith("动作:"): + action = line.strip() + print(f" {action}") + break + print() + +def main(): + parser = argparse.ArgumentParser(description="HeartFC循环信息查看工具") + subparsers = parser.add_subparsers(dest="command", help="子命令") + + # 列出所有聊天流 + list_streams_parser = subparsers.add_parser("list-streams", help="列出所有聊天流") + + # 分析聊天流 + analyze_parser = subparsers.add_parser("analyze", help="分析指定聊天流的循环信息") + analyze_parser.add_argument("stream_id", help="聊天流ID") + + # 列出聊天流的循环 + list_cycles_parser = subparsers.add_parser("list-cycles", help="列出指定聊天流的循环") + list_cycles_parser.add_argument("stream_id", help="聊天流ID") + list_cycles_parser.add_argument("-l", "--limit", type=int, default=-1, help="显示最新的N个循环") + + # 查看指定循环 + view_parser = subparsers.add_parser("view", help="查看指定循环的详细信息") + view_parser.add_argument("stream_id", help="聊天流ID") + view_parser.add_argument("cycle_index", type=int, help="循环索引(从1开始)") + + # 查看最新循环 + latest_parser = subparsers.add_parser("latest", help="查看所有聊天流中最新的几个循环") + latest_parser.add_argument("-c", "--count", type=int, default=10, help="显示的数量") + + args = parser.parse_args() + + analyzer = CycleAnalyzer() + + if args.command == "list-streams": + list_streams_cmd(analyzer, args) + elif args.command == "analyze": + analyze_stream_cmd(analyzer, args) + elif args.command == "list-cycles": + list_cycles_cmd(analyzer, args) + elif args.command == "view": + view_cycle_cmd(analyzer, args) + elif args.command == "latest": + latest_cycles_cmd(analyzer, args) + else: + parser.print_help() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/plugins/heartFC_chat/heartFC_Cycleinfo.py b/src/plugins/heartFC_chat/heartFC_Cycleinfo.py index 966773841..2abf315ec 100644 --- a/src/plugins/heartFC_chat/heartFC_Cycleinfo.py +++ b/src/plugins/heartFC_chat/heartFC_Cycleinfo.py @@ -1,4 +1,6 @@ import time +import os +import json from typing import List, Optional, Dict, Any @@ -23,6 +25,28 @@ class CycleInfo: "anchor_message_id": "", # 锚点消息ID "reply_message_ids": [], # 回复消息ID列表 "sub_mind_thinking": "", # 子思维思考内容 + "in_mind_reply": [], # 子思维思考内容 + } + + # 添加SubMind相关信息 + self.submind_info: Dict[str, Any] = { + "prompt": "", # SubMind输入的prompt + "structured_info": "", # 结构化信息 + "result": "", # SubMind的思考结果 + } + + # 添加ToolUse相关信息 + self.tooluse_info: Dict[str, Any] = { + "prompt": "", # 工具使用的prompt + "tools_used": [], # 使用了哪些工具 + "tool_results": [], # 工具获得的信息 + } + + # 添加Planner相关信息 + self.planner_info: Dict[str, Any] = { + "prompt": "", # 规划器的prompt + "response": "", # 规划器的原始回复 + "parsed_result": {}, # 解析后的结果 } def to_dict(self) -> Dict[str, Any]: @@ -37,6 +61,9 @@ class CycleInfo: "timers": self.timers, "thinking_id": self.thinking_id, "response_info": self.response_info, + "submind_info": self.submind_info, + "tooluse_info": self.tooluse_info, + "planner_info": self.planner_info, } def complete_cycle(self): @@ -72,3 +99,206 @@ class CycleInfo: self.response_info["reply_message_ids"] = reply_message_ids if sub_mind_thinking is not None: self.response_info["sub_mind_thinking"] = sub_mind_thinking + + def set_submind_info( + self, + prompt: Optional[str] = None, + structured_info: Optional[str] = None, + result: Optional[str] = None, + ): + """设置SubMind信息""" + if prompt is not None: + self.submind_info["prompt"] = prompt + if structured_info is not None: + self.submind_info["structured_info"] = structured_info + if result is not None: + self.submind_info["result"] = result + + def set_tooluse_info( + self, + prompt: Optional[str] = None, + tools_used: Optional[List[str]] = None, + tool_results: Optional[List[Dict[str, Any]]] = None, + ): + """设置ToolUse信息""" + if prompt is not None: + self.tooluse_info["prompt"] = prompt + if tools_used is not None: + self.tooluse_info["tools_used"] = tools_used + if tool_results is not None: + self.tooluse_info["tool_results"] = tool_results + + def set_planner_info( + self, + prompt: Optional[str] = None, + response: Optional[str] = None, + parsed_result: Optional[Dict[str, Any]] = None, + ): + """设置Planner信息""" + if prompt is not None: + self.planner_info["prompt"] = prompt + if response is not None: + self.planner_info["response"] = response + if parsed_result is not None: + self.planner_info["parsed_result"] = parsed_result + + @staticmethod + def save_to_file(cycle_info: 'CycleInfo', stream_id: str, base_dir: str = "log_debug") -> str: + """ + 将CycleInfo保存到文件 + + 参数: + cycle_info: CycleInfo对象 + stream_id: 聊天流ID + base_dir: 基础目录,默认为log_debug + + 返回: + str: 保存的文件路径 + """ + try: + # 创建目录结构 + stream_dir = os.path.join(base_dir, stream_id) + os.makedirs(stream_dir, exist_ok=True) + + # 生成文件名和路径 + timestamp = time.strftime("%Y%m%d_%H%M%S", time.localtime(cycle_info.start_time)) + filename = f"cycle_{cycle_info.cycle_id}_{timestamp}.txt" + filepath = os.path.join(stream_dir, filename) + + # 将CycleInfo转换为JSON格式 + cycle_data = cycle_info.to_dict() + + # 格式化输出成易读的格式 + with open(filepath, 'w', encoding='utf-8') as f: + # 写入基本信息 + f.write(f"循环ID: {cycle_info.cycle_id}\n") + f.write(f"开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(cycle_info.start_time))}\n") + if cycle_info.end_time: + f.write(f"结束时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(cycle_info.end_time))}\n") + duration = cycle_info.end_time - cycle_info.start_time + f.write(f"耗时: {duration:.2f}秒\n") + f.write(f"动作: {cycle_info.action_type}\n") + f.write(f"原因: {cycle_info.reasoning}\n") + f.write(f"执行状态: {'已执行' if cycle_info.action_taken else '未执行'}\n") + f.write(f"思考ID: {cycle_info.thinking_id}\n") + f.write(f"是否为重新规划: {'是' if cycle_info.replanned else '否'}\n\n") + + # 写入计时器信息 + if cycle_info.timers: + f.write("== 计时器信息 ==\n") + for name, elapsed in cycle_info.timers.items(): + formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" + f.write(f"{name}: {formatted_time}\n") + f.write("\n") + + # 写入响应信息 + f.write("== 响应信息 ==\n") + f.write(f"锚点消息ID: {cycle_info.response_info['anchor_message_id']}\n") + if cycle_info.response_info['response_text']: + f.write("回复文本:\n") + for i, text in enumerate(cycle_info.response_info['response_text']): + f.write(f" [{i+1}] {text}\n") + if cycle_info.response_info['emoji_info']: + f.write(f"表情信息: {cycle_info.response_info['emoji_info']}\n") + if cycle_info.response_info['reply_message_ids']: + f.write(f"回复消息ID: {', '.join(cycle_info.response_info['reply_message_ids'])}\n") + f.write("\n") + + # 写入SubMind信息 + f.write("== SubMind信息 ==\n") + f.write(f"结构化信息:\n{cycle_info.submind_info['structured_info']}\n\n") + f.write(f"思考结果:\n{cycle_info.submind_info['result']}\n\n") + f.write("SubMind Prompt:\n") + f.write(f"{cycle_info.submind_info['prompt']}\n\n") + + # 写入ToolUse信息 + f.write("== 工具使用信息 ==\n") + if cycle_info.tooluse_info['tools_used']: + f.write(f"使用的工具: {', '.join(cycle_info.tooluse_info['tools_used'])}\n") + else: + f.write("未使用工具\n") + + if cycle_info.tooluse_info['tool_results']: + f.write("工具结果:\n") + for i, result in enumerate(cycle_info.tooluse_info['tool_results']): + f.write(f" [{i+1}] 类型: {result.get('type', '未知')}, 内容: {result.get('content', '')}\n") + f.write("\n") + f.write("工具执行 Prompt:\n") + f.write(f"{cycle_info.tooluse_info['prompt']}\n\n") + + # 写入Planner信息 + f.write("== Planner信息 ==\n") + f.write("Planner Prompt:\n") + f.write(f"{cycle_info.planner_info['prompt']}\n\n") + f.write("原始回复:\n") + f.write(f"{cycle_info.planner_info['response']}\n\n") + f.write("解析结果:\n") + f.write(f"{json.dumps(cycle_info.planner_info['parsed_result'], ensure_ascii=False, indent=2)}\n") + + return filepath + except Exception as e: + print(f"保存CycleInfo到文件时出错: {e}") + return "" + + @staticmethod + def load_from_file(filepath: str) -> Optional[Dict[str, Any]]: + """ + 从文件加载CycleInfo信息(只加载JSON格式的数据,不解析文本格式) + + 参数: + filepath: 文件路径 + + 返回: + Optional[Dict[str, Any]]: 加载的CycleInfo数据,失败则返回None + """ + try: + if not os.path.exists(filepath): + print(f"文件不存在: {filepath}") + return None + + # 尝试从文件末尾读取JSON数据 + with open(filepath, 'r', encoding='utf-8') as f: + lines = f.readlines() + + # 查找"解析结果:"后的JSON数据 + for i, line in enumerate(lines): + if "解析结果:" in line and i+1 < len(lines): + # 尝试解析后面的行 + json_data = "" + for j in range(i+1, len(lines)): + json_data += lines[j] + + try: + return json.loads(json_data) + except json.JSONDecodeError: + continue + + # 如果没有找到JSON数据,则返回None + return None + except Exception as e: + print(f"从文件加载CycleInfo时出错: {e}") + return None + + @staticmethod + def list_cycles(stream_id: str, base_dir: str = "log_debug") -> List[str]: + """ + 列出指定stream_id的所有循环文件 + + 参数: + stream_id: 聊天流ID + base_dir: 基础目录,默认为log_debug + + 返回: + List[str]: 文件路径列表 + """ + try: + stream_dir = os.path.join(base_dir, stream_id) + if not os.path.exists(stream_dir): + return [] + + files = [os.path.join(stream_dir, f) for f in os.listdir(stream_dir) + if f.startswith("cycle_") and f.endswith(".txt")] + return sorted(files) + except Exception as e: + print(f"列出循环文件时出错: {e}") + return [] diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index 42b8f2c61..03a68037c 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -41,29 +41,31 @@ CONSECUTIVE_NO_REPLY_THRESHOLD = 3 # 连续不回复的阈值 # 添加并行模式开关常量 # 并行模式优化说明: -# 1. 并行模式将SubMind的思考(think)和Planner的规划(plan)同时进行,可以节省约50%的处理时间 -# 2. 并行模式中,Planner不依赖SubMind的思考结果(current_mind)进行决策 -# 3. 优点:处理速度明显提升,两个LLM调用并行执行 -# 4. 可能的缺点:Planner无法直接利用SubMind的思考内容进行决策 -# 5. 实测数据表明:并行模式下决策质量与串行模式相当,但响应速度更快 -# 6. 如遇特殊情况需要基于思考结果进行规划,可将此开关设为False -PARALLEL_MODE_ENABLED = True # 设置为 True 启用并行模式,False 使用原始串行模式 +# 1. 并行模式下,SubMind的思考(think)和工具执行(tools)同时进行,而规划(plan)在获取思考结果后串行执行 +# 2. 这种半并行模式中,Planner依赖SubMind的思考结果(current_mind)进行决策,但仍能与工具调用并行处理 +# 3. 优点:处理速度显著提升,同时保持规划器能利用思考内容进行决策 +# 4. 可能的缺点:整体处理时间比完全并行模式略长,但决策质量可能更好 +# 5. 对比原来的全并行模式(think+plan+tools三者同时进行),这种模式更平衡效率和质量 +PARALLEL_MODE_ENABLED = True # 设置为 True 启用半并行模式,False 使用原始串行模式 logger = get_logger("hfc") # Logger Name Changed # 默认动作定义 -DEFAULT_ACTIONS = {"no_reply": "不回复", "text_reply": "文本回复, 可选附带表情", "emoji_reply": "仅表情回复"} +DEFAULT_ACTIONS = { + "no_reply": "不回复", + "reply": "回复:可以包含文本、表情或两者结合,顺序任意" +} class ActionManager: """动作管理器:控制每次决策可以使用的动作""" def __init__(self): - # 初始化为默认动作集 + # 初始化为新的默认动作集 self._available_actions: Dict[str, str] = DEFAULT_ACTIONS.copy() - self._original_actions_backup: Optional[Dict[str, str]] = None # 用于临时移除时的备份 + self._original_actions_backup: Optional[Dict[str, str]] = None def get_available_actions(self) -> Dict[str, str]: """获取当前可用的动作集""" @@ -384,6 +386,13 @@ class HeartFChatting: # 完成当前循环并保存历史 self._current_cycle.complete_cycle() self._cycle_history.append(self._current_cycle) + + # 保存CycleInfo到文件 + try: + filepath = CycleInfo.save_to_file(self._current_cycle, self.stream_id) + logger.info(f"{self.log_prefix} 已保存循环信息到文件: {filepath}") + except Exception as e: + logger.error(f"{self.log_prefix} 保存循环信息到文件时出错: {e}") # 记录循环信息和计时器结果 timer_strings = [] @@ -455,26 +464,21 @@ class HeartFChatting: # 记录并行任务开始时间 parallel_start_time = time.time() - logger.debug(f"{self.log_prefix} 开始三重并行任务处理") + logger.debug(f"{self.log_prefix} 开始思考和工具并行任务处理") - # 并行执行三个任务 - with Timer("三重并行处理", cycle_timers): + # 并行执行两个任务:思考和工具执行 + with Timer("思考和工具并行处理", cycle_timers): # 1. 子思维思考 - 不执行工具调用 think_task = asyncio.create_task(self._get_submind_thinking_only(cycle_timers)) logger.debug(f"{self.log_prefix} 启动子思维思考任务") - # 2. 规划器 - 并行决策 - plan_task = asyncio.create_task(self._planner_parallel(cycle_timers)) - logger.debug(f"{self.log_prefix} 启动规划器任务") - - # 3. 工具执行器 - 专门处理工具调用 + # 2. 工具执行器 - 专门处理工具调用 tool_task = asyncio.create_task(self._execute_tools_parallel(self.sub_mind, cycle_timers)) logger.debug(f"{self.log_prefix} 启动工具执行任务") # 创建任务完成状态追踪 tasks = { "思考任务": think_task, - "规划任务": plan_task, "工具任务": tool_task } pending = set(tasks.values()) @@ -493,7 +497,7 @@ class HeartFChatting: if task == t: task_end_time = time.time() task_duration = task_end_time - parallel_start_time - logger.debug(f"{self.log_prefix} {name}已完成,耗时: {task_duration:.2f}秒") + logger.info(f"{self.log_prefix} {name}已完成,耗时: {task_duration:.2f}秒") results[name] = task.result() break @@ -502,17 +506,16 @@ class HeartFChatting: current_time = time.time() elapsed = current_time - parallel_start_time pending_names = [name for name, t in tasks.items() if t in pending] - logger.debug(f"{self.log_prefix} 并行处理已进行{elapsed:.2f}秒,待完成任务: {', '.join(pending_names)}") + logger.info(f"{self.log_prefix} 并行处理已进行{elapsed:.2f}秒,待完成任务: {', '.join(pending_names)}") # 所有任务完成,从结果中提取数据 current_mind = results.get("思考任务") - planner_result = results.get("规划任务") tool_results = results.get("工具任务") # 记录总耗时 parallel_end_time = time.time() total_duration = parallel_end_time - parallel_start_time - logger.info(f"{self.log_prefix} 三重并行任务全部完成,总耗时: {total_duration:.2f}秒") + logger.info(f"{self.log_prefix} 思考和工具并行任务全部完成,总耗时: {total_duration:.2f}秒") # 处理工具结果 - 将结果更新到SubMind if tool_results: @@ -523,11 +526,22 @@ class HeartFChatting: # 记录子思维思考内容 if self._current_cycle: self._current_cycle.set_response_info(sub_mind_thinking=current_mind) + + # 串行执行规划器 - 使用刚获取的思考结果 + logger.debug(f"{self.log_prefix} 开始串行规划任务") + with Timer("串行规划", cycle_timers): + # 调用原始的_planner方法而不是_planner_parallel + # _planner方法会使用current_mind作为输入参数,让规划器能够利用子思维的思考结果 + # 而_planner_parallel设计为不依赖current_mind的结果,两者的主要区别在于prompt构建方式 + planner_result = await self._planner(current_mind, cycle_timers) - # 解析规划结果 + action = planner_result.get("action", "error") + action_data = planner_result.get("action_data", {}) # 新增获取动作数据 reasoning = planner_result.get("reasoning", "未提供理由") + logger.debug(f"{self.log_prefix} 动作和动作信息: {action}, {action_data}, {reasoning}") + # 更新循环信息 self._current_cycle.set_action_info(action, reasoning, True) @@ -537,26 +551,26 @@ class HeartFChatting: return False, "" # 在此处添加日志记录 - if action == "text_reply": + if action == "reply": action_str = "回复" - elif action == "emoji_reply": - action_str = "回复表情" - else: + elif action == "no_reply": action_str = "不回复" + else: + action_str = "位置动作" logger.info(f"{self.log_prefix} 麦麦决定'{action_str}', 原因'{reasoning}'") return await self._handle_action( - action, reasoning, planner_result.get("emoji_query", ""), cycle_timers, planner_start_db_time + action, reasoning, action_data, cycle_timers, planner_start_db_time ) except Exception as e: - logger.error(f"{self.log_prefix} 三重并行处理失败: {e}") + logger.error(f"{self.log_prefix} 并行+串行处理失败: {e}") logger.error(traceback.format_exc()) return False, "" async def _handle_action( - self, action: str, reasoning: str, emoji_query: str, cycle_timers: dict, planner_start_db_time: float + self, action: str, reasoning: str, action_data: dict, cycle_timers: dict, planner_start_db_time: float ) -> tuple[bool, str]: """ 处理规划动作 @@ -564,7 +578,7 @@ class HeartFChatting: 参数: action: 动作类型 reasoning: 决策理由 - emoji_query: 表情查询 + action_data: 动作数据,包含不同动作需要的参数 cycle_timers: 计时器字典 planner_start_db_time: 规划开始时间 @@ -572,8 +586,7 @@ class HeartFChatting: tuple[bool, str]: (是否执行了动作, 思考消息ID) """ action_handlers = { - "text_reply": self._handle_text_reply, - "emoji_reply": self._handle_emoji_reply, + "reply": self._handle_reply, "no_reply": self._handle_no_reply, } @@ -583,17 +596,15 @@ class HeartFChatting: return False, "" try: - if action == "text_reply": - return await handler(reasoning, emoji_query, cycle_timers) - elif action == "emoji_reply": - return await handler(reasoning, emoji_query), "" + if action == "reply": + return await handler(reasoning, action_data, cycle_timers) else: # no_reply return await handler(reasoning, planner_start_db_time, cycle_timers), "" except HeartFCError as e: logger.error(f"{self.log_prefix} 处理{action}时出错: {e}") # 出错时也重置计数器 self._lian_xu_bu_hui_fu_ci_shu = 0 - self._lian_xu_deng_dai_shi_jian = 0.0 # 重置累计等待时间 + self._lian_xu_deng_dai_shi_jian = 0.0 return False, "" async def _handle_text_reply(self, reasoning: str, emoji_query: str, cycle_timers: dict) -> tuple[bool, str]: @@ -818,12 +829,22 @@ class HeartFChatting: with Timer("纯思考", cycle_timers): # 修改SubMind.do_thinking_before_reply方法的参数,添加no_tools=True - current_mind, _past_mind = await self.sub_mind.do_thinking_before_reply( + current_mind, _past_mind, submind_prompt = await self.sub_mind.do_thinking_before_reply( history_cycle=self._cycle_history, - parallel_mode=True, - no_tools=True # 添加参数指示不执行工具 + parallel_mode=False, # 设为False,因为规划器将依赖思考结果 + no_tools=True, # 添加参数指示不执行工具 + return_prompt=True, # 返回prompt + cycle_info=self._current_cycle, # 传递循环信息对象 ) + # 记录SubMind的信息到CycleInfo + if self._current_cycle: + self._current_cycle.set_submind_info( + prompt=submind_prompt, + structured_info=self.sub_mind.structured_info_str, + result=current_mind + ) + end_time = time.time() duration = end_time - start_time logger.debug(f"{self.log_prefix} 子思维纯思考任务完成,耗时: {duration:.2f}秒") @@ -853,12 +874,22 @@ class HeartFChatting: ) # 执行工具并获取结果 - tool_results = await self.tool_executor.execute_tools( + tool_results, tools_used, tool_prompt = await self.tool_executor.execute_tools( sub_mind, chat_target_name=chat_target_name, - is_group_chat=self.is_group_chat + is_group_chat=self.is_group_chat, + return_details=True, # 返回详细信息 + cycle_info=self._current_cycle, # 传递循环信息对象 ) + # 记录工具执行信息到CycleInfo + if self._current_cycle: + self._current_cycle.set_tooluse_info( + prompt=tool_prompt, + tools_used=tools_used, + tool_results=tool_results + ) + end_time = time.time() duration = end_time - start_time tool_count = len(tool_results) if tool_results else 0 @@ -924,6 +955,8 @@ class HeartFChatting: reasoning = "规划器初始化默认" emoji_query = "" llm_error = False # LLM 请求或解析错误标志 + prompt = "" # 初始化prompt变量 + llm_content = "" # 初始化LLM响应内容 # 获取我们将传递给 prompt 构建器和用于验证的当前可用动作 current_available_actions = self.action_manager.get_available_actions() @@ -964,6 +997,7 @@ class HeartFChatting: emoji_query = "" # 明确设置为空 # --- 解析 LLM 返回的 JSON (仅当 LLM 请求未出错时进行) --- + parsed_result = {} # 初始化解析结果 if not llm_error and llm_content: try: # 尝试去除可能的 markdown 代码块标记 @@ -973,6 +1007,7 @@ class HeartFChatting: if not cleaned_content: raise json.JSONDecodeError("Cleaned content is empty", cleaned_content, 0) parsed_json = json.loads(cleaned_content) + parsed_result = parsed_json # 保存解析结果 # 提取决策,提供默认值 extracted_action = parsed_json.get("action", "no_reply") @@ -1045,6 +1080,20 @@ class HeartFChatting: f"{self.log_prefix}[并行Planner] 恢复了原始动作集, 当前可用: {list(self.action_manager.get_available_actions().keys())}" ) + # 记录Planner信息到CycleInfo + if self._current_cycle: + result_dict = { + "action": action, + "reasoning": reasoning, + "emoji_query": emoji_query, + "llm_error": llm_error + } + self._current_cycle.set_planner_info( + prompt=prompt, + response=llm_content or "", + parsed_result=parsed_result or result_dict + ) + # --- 概率性忽略文本回复附带的表情 (逻辑保持不变) --- if action == "text_reply" and emoji_query: logger.debug(f"{self.log_prefix}[并行Planner] 大模型建议文字回复带表情: '{emoji_query}'") @@ -1342,6 +1391,7 @@ class HeartFChatting: # --- 回复器 (Replier) 的定义 --- # async def _replier_work( self, + in_mind_reply: List[str], reason: str, anchor_message: MessageRecv, thinking_id: str, @@ -1375,6 +1425,7 @@ class HeartFChatting: prompt = await prompt_builder.build_prompt( build_mode="focus", chat_stream=self.chat_stream, # Pass the stream object + in_mind_reply=in_mind_reply, # Focus specific args: reason=reason, current_mind_info=self.sub_mind.current_mind, @@ -1571,8 +1622,17 @@ class HeartFChatting: # 提取决策,提供默认值 extracted_action = parsed_json.get("action", "no_reply") extracted_reasoning = parsed_json.get("reasoning", "LLM未提供理由") - extracted_emoji_query = parsed_json.get("emoji_query", "") + # extracted_emoji_query = parsed_json.get("emoji_query", "") + # 新的reply格式 + if extracted_action == "reply": + action_data = { + "text": parsed_json.get("text", []), + "emojis": parsed_json.get("emojis", []) + } + else: + action_data = {} # 其他动作可能不需要额外数据 + # 验证动作是否在当前可用列表中 # !! 使用调用 prompt 时实际可用的动作列表进行验证 if extracted_action not in current_available_actions: @@ -1596,11 +1656,11 @@ class HeartFChatting: # 动作有效且可用 action = extracted_action reasoning = extracted_reasoning - emoji_query = extracted_emoji_query llm_error = False # 解析成功 logger.debug( - f"{self.log_prefix}[要做什么]\nPrompt:\n{prompt}\n\n决策结果 (来自JSON): {action}, 理由: {reasoning}, 表情查询: '{emoji_query}'" + f"{self.log_prefix}[要做什么]\nPrompt:\n{prompt}\n\n决策结果 (来自JSON): {action}, 理由: {reasoning}" ) + logger.debug(f"{self.log_prefix}动作信息: '{action_data}'") except json.JSONDecodeError as json_e: logger.warning( @@ -1645,23 +1705,91 @@ class HeartFChatting: # --- 结束确保动作恢复 --- # --- 概率性忽略文本回复附带的表情 (逻辑保持不变) --- - if action == "text_reply" and emoji_query: - logger.debug(f"{self.log_prefix}[Planner] 大模型建议文字回复带表情: '{emoji_query}'") + emoji = action_data.get("emojis") + if action == "reply" and emoji: + logger.debug(f"{self.log_prefix}[Planner] 大模型建议文字回复带表情: '{emoji}'") if random.random() > EMOJI_SEND_PRO: logger.info( - f"{self.log_prefix}但是麦麦这次不想加表情 ({1 - EMOJI_SEND_PRO:.0%}),忽略表情 '{emoji_query}'" + f"{self.log_prefix}但是麦麦这次不想加表情 ({1 - EMOJI_SEND_PRO:.0%}),忽略表情 '{emoji}'" ) - emoji_query = "" # 清空表情请求 + action_data["emojis"] = "" # 清空表情请求 else: - logger.info(f"{self.log_prefix}好吧,加上表情 '{emoji_query}'") + logger.info(f"{self.log_prefix}好吧,加上表情 '{emoji}'") # --- 结束概率性忽略 --- # 返回结果字典 return { "action": action, + "action_data": action_data, "reasoning": reasoning, - "emoji_query": emoji_query, "current_mind": current_mind, "observed_messages": observed_messages, "llm_error": llm_error, # 返回错误状态 } + + async def _handle_reply(self, reasoning: str, reply_data: dict, cycle_timers: dict) -> tuple[bool, str]: + """ + 处理统一的回复动作 - 可包含文本和表情,顺序任意 + + reply_data格式: + { + "text": ["你好啊", "今天天气真不错"], # 文本内容列表(可选) + "emojis": ["微笑", "阳光"] # 表情关键词列表(可选) + } + """ + # 重置连续不回复计数器 + self._lian_xu_bu_hui_fu_ci_shu = 0 + self._lian_xu_deng_dai_shi_jian = 0.0 + + # 获取锚点消息 + anchor_message = await self._get_anchor_message() + if not anchor_message: + raise PlannerError("无法获取锚点消息") + + # 创建思考消息 + thinking_id = await self._create_thinking_message(anchor_message) + if not thinking_id: + raise PlannerError("无法创建思考消息") + + try: + has_sent_something = False + + # 处理文本部分 + text_parts = reply_data.get("text", []) + if text_parts: + with Timer("生成回复", cycle_timers): + # 可以保留原有的文本处理逻辑或进行适当调整 + reply = await self._replier_work( + in_mind_reply = text_parts, + anchor_message=anchor_message, + thinking_id=thinking_id, + reason=reasoning, + ) + + if reply: + with Timer("发送文本消息", cycle_timers): + await self._sender( + thinking_id=thinking_id, + anchor_message=anchor_message, + response_set=reply, + send_emoji="" # 不在这里处理表情 + ) + has_sent_something = True + else: + logger.warning(f"{self.log_prefix} 文本回复生成失败") + + # 处理表情部分 + emoji_keywords = reply_data.get("emojis", []) + for emoji in emoji_keywords: + if emoji: + await self._handle_emoji(anchor_message, [], emoji) + has_sent_something = True + + if not has_sent_something: + logger.warning(f"{self.log_prefix} 回复动作未包含任何有效内容") + + return has_sent_something, thinking_id + + except (ReplierError, SenderError) as e: + logger.error(f"{self.log_prefix} 回复失败: {e}") + return False, thinking_id diff --git a/src/plugins/heartFC_chat/heartflow_prompt_builder.py b/src/plugins/heartFC_chat/heartflow_prompt_builder.py index 642ece09a..b788fe4d2 100644 --- a/src/plugins/heartFC_chat/heartflow_prompt_builder.py +++ b/src/plugins/heartFC_chat/heartflow_prompt_builder.py @@ -34,10 +34,11 @@ def init_prompt(): {current_mind_info} 因为上述想法,你决定发言,原因是:{reason} - -回复尽量简短一些。请注意把握聊天内容,{reply_style2}。请一次只回复一个话题,不要同时回复多个人。{prompt_ger} -{reply_style1},说中文,不要刻意突出自身学科背景,注意只输出回复内容。 -{moderation_prompt}。注意:回复不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。""", +依照这些内容组织回复:{in_mind_reply},不要原句回复,根据下面的要求,对其进行修改 +要求:是尽量简短一些。把握聊天内容,{reply_style2}。不要复读自己说的话。{prompt_ger} +{reply_style1},说中文,不要刻意突出自身学科背景。 +{moderation_prompt}。不要浮夸,平淡一些。 +注意:回复不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。""", "heart_flow_prompt", ) @@ -67,15 +68,15 @@ def init_prompt(): - 讨论你不懂的专业话题 - 你发送了太多消息,且无人回复 -2. 文字回复(text_reply)适用: +2. 回复(reply)适用: - 有实质性内容需要表达 - 有人提到你,但你还没有回应他 - 可以追加emoji_query表达情绪(emoji_query填写表情包的适用场合,也就是当前场合) - 不要追加太多表情 - -3. 纯表情回复(emoji_reply)适用: - - 适合用表情回应的场景 - - 需提供明确的emoji_query + +3. 回复要求: + -不要太浮夸 + -一次只回复一个人 4. 自我对话处理: - 如果是自己发的消息想继续,需自然衔接 @@ -87,11 +88,22 @@ def init_prompt(): 你必须从上面列出的可用行动中选择一个,并说明原因。 你的决策必须以严格的 JSON 格式输出,且仅包含 JSON 内容,不要有任何其他文字或解释。 -JSON 结构如下,包含三个字段 "action", "reasoning", "emoji_query": +你可以选择以下动作: +1. no_reply: 不回复 +2. reply: 回复参考,可以只包含文本、表情或两者都有,可以发送一段或多段 + +如果选择reply,请按以下JSON格式返回: {{ - "action": "string", // 必须是上面提供的可用行动之一 (例如: '{example_action}') - "reasoning": "string", // 做出此决定的详细理由和思考过程,说明你如何应用了回复原则 - "emoji_query": "string" // 可选。如果行动是 'emoji_reply',必须提供表情主题(填写表情包的适用场合);如果行动是 'text_reply' 且你想附带表情,也在此提供表情主题,否则留空字符串 ""。遵循回复原则,不要滥用。 + "action": "reply", + "text": ["第一段文本", "第二段文本"], // 可选,如果想发送文本 + "emojis": ["表情关键词1", "表情关键词2"] // 可选,如果想发送表情 + "reasoning": "你的决策理由", +}} + +如果选择no_reply,请按以下格式返回: +{{ + "action": "no_reply", + "reasoning": "你的决策理由" }} 请输出你的决策 JSON: """, @@ -155,7 +167,7 @@ JSON 结构如下,包含三个字段 "action", "reasoning", "emoji_query": {current_mind_info} 因为上述想法,你决定回复,原因是:{reason} -回复尽量简短一些。请注意把握聊天内容,{reply_style2}。{prompt_ger} +回复尽量简短一些。请注意把握聊天内容,{reply_style2}。{prompt_ger},不要复读自己说的话 {reply_style1},说中文,不要刻意突出自身学科背景,注意只输出回复内容。 {moderation_prompt}。注意:回复不要输出多余内容(包括前后缀,冒号和引号,括号,表情包,at或 @等 )。""", "heart_flow_private_prompt", # New template for private FOCUSED chat @@ -184,7 +196,7 @@ JSON 结构如下,包含三个字段 "action", "reasoning", "emoji_query": ) -async def _build_prompt_focus(reason, current_mind_info, structured_info, chat_stream, sender_name) -> str: +async def _build_prompt_focus(reason, current_mind_info, structured_info, chat_stream, sender_name, in_mind_reply) -> str: individuality = Individuality.get_instance() prompt_personality = individuality.get_prompt(x_person=0, level=2) @@ -227,7 +239,7 @@ async def _build_prompt_focus(reason, current_mind_info, structured_info, chat_s reply_styles2 = [ ("不要回复的太有条理,可以有个性", 0.6), - ("不要回复的太有条理,可以复读", 0.15), + ("不要回复的太有条理,可以复读,但是不要复读自己说的话", 0.15), ("回复的认真一些", 0.2), ("可以回复单个表情符号", 0.05), ] @@ -263,6 +275,7 @@ async def _build_prompt_focus(reason, current_mind_info, structured_info, chat_s reply_style2=reply_style2_chosen, reply_style1=reply_style1_chosen, reason=reason, + in_mind_reply=in_mind_reply, prompt_ger=prompt_ger, moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"), # sender_name is not used in the group template @@ -304,6 +317,7 @@ class PromptBuilder: structured_info=None, message_txt=None, sender_name="某人", + in_mind_reply=None, ) -> Optional[str]: if build_mode == "normal": return await self._build_prompt_normal(chat_stream, message_txt, sender_name) @@ -315,6 +329,7 @@ class PromptBuilder: structured_info, chat_stream, sender_name, + in_mind_reply, ) return None @@ -844,7 +859,7 @@ class PromptBuilder: current_mind_block=current_mind_block, cycle_info_block=cycle_info_block, action_options_text=action_options_text, - example_action=example_action_key, + # example_action=example_action_key, ) return prompt