From f83e151d400e3ea71a05901b0b5ebc0fe6439d50 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Mon, 28 Apr 2025 19:31:00 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E4=BC=98=E5=8C=96=E5=B7=A5?= =?UTF-8?q?=E5=85=B7=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/heart_flow/sub_mind.py | 70 ++++----- src/plugins/heartFC_chat/heartFC_chat.py | 152 +++++++++--------- src/plugins/models/utils_model.py | 10 +- src/plugins/utils/json_utils.py | 189 +++++------------------ 4 files changed, 159 insertions(+), 262 deletions(-) diff --git a/src/heart_flow/sub_mind.py b/src/heart_flow/sub_mind.py index bccead7dd..50bb889e0 100644 --- a/src/heart_flow/sub_mind.py +++ b/src/heart_flow/sub_mind.py @@ -66,6 +66,9 @@ class SubMind: self.current_mind = "" self.past_mind = [] self.structured_info = {} + + name = chat_manager.get_stream_name(self.subheartflow_id) + self.log_prefix = f"[{name}] " async def do_thinking_before_reply(self, history_cycle: list[CycleInfo] = None): """ @@ -77,6 +80,8 @@ class SubMind: # 更新活跃时间 self.last_active_time = time.time() + + # ---------- 1. 准备基础数据 ---------- # 获取现有想法和情绪状态 current_thinking_info = self.current_mind @@ -85,7 +90,7 @@ class SubMind: # 获取观察对象 observation = self.observations[0] if not observation: - logger.error(f"[{self.subheartflow_id}] 无法获取观察对象") + logger.error(f"{self.log_prefix} 无法获取观察对象") self.update_current_mind("(我没看到任何聊天内容...)") return self.current_mind, self.past_mind @@ -223,57 +228,48 @@ class SubMind: try: # 调用LLM生成响应 - response = await self.llm_model.generate_response_tool_async(prompt=prompt, tools=tools) + response, _reasoning_content, tool_calls = await self.llm_model.generate_response_tool_async(prompt=prompt, tools=tools) - # 标准化响应格式 - success, normalized_response, error_msg = normalize_llm_response( - response, log_prefix=f"[{self.subheartflow_id}] " - ) + logger.debug(f"{self.log_prefix} 子心流输出的原始LLM响应: {response}") + + # 直接使用LLM返回的文本响应作为 content + content = response if response else "" - if not success: - # 处理标准化失败情况 - logger.warning(f"[{self.subheartflow_id}] {error_msg}") - content = "LLM响应格式无法处理" - else: - # 从标准化响应中提取内容 - if len(normalized_response) >= 2: - content = normalized_response[0] - _reasoning_content = normalized_response[1] if len(normalized_response) > 1 else "" - - # 处理可能的工具调用 - if len(normalized_response) == 3: - # 提取并验证工具调用 - success, valid_tool_calls, error_msg = process_llm_tool_calls( - normalized_response, log_prefix=f"[{self.subheartflow_id}] " + if tool_calls: + # 直接将 tool_calls 传递给处理函数 + success, valid_tool_calls, error_msg = process_llm_tool_calls( + tool_calls, log_prefix=f"{self.log_prefix} " + ) + + if success and valid_tool_calls: + # 记录工具调用信息 + tool_calls_str = ", ".join( + [call.get("function", {}).get("name", "未知工具") for call in valid_tool_calls] + ) + logger.info( + f"{self.log_prefix} 模型请求调用{len(valid_tool_calls)}个工具: {tool_calls_str}" ) - if success and valid_tool_calls: - # 记录工具调用信息 - tool_calls_str = ", ".join( - [call.get("function", {}).get("name", "未知工具") for call in valid_tool_calls] - ) - logger.info( - f"[{self.subheartflow_id}] 模型请求调用{len(valid_tool_calls)}个工具: {tool_calls_str}" - ) + # 收集工具执行结果 + await self._execute_tool_calls(valid_tool_calls, tool_instance) + elif not success: + logger.warning(f"{self.log_prefix} 处理工具调用时出错: {error_msg}") + else: + logger.info(f"{self.log_prefix} 心流未使用工具") # 修改日志信息,明确是未使用工具而不是未处理 - # 收集工具执行结果 - await self._execute_tool_calls(valid_tool_calls, tool_instance) - elif not success: - logger.warning(f"[{self.subheartflow_id}] {error_msg}") except Exception as e: # 处理总体异常 - logger.error(f"[{self.subheartflow_id}] 执行LLM请求或处理响应时出错: {e}") + logger.error(f"{self.log_prefix} 执行LLM请求或处理响应时出错: {e}") logger.error(traceback.format_exc()) content = "思考过程中出现错误" # 记录最终思考结果 - name = chat_manager.get_stream_name(self.subheartflow_id) - logger.debug(f"[{name}] \nPrompt:\n{prompt}\n\n心流思考结果:\n{content}\n") + logger.debug(f"{self.log_prefix} \nPrompt:\n{prompt}\n\n心流思考结果:\n{content}\n") # 处理空响应情况 if not content: content = "(不知道该想些什么...)" - logger.warning(f"[{self.subheartflow_id}] LLM返回空结果,思考失败。") + logger.warning(f"{self.log_prefix} LLM返回空结果,思考失败。") # ---------- 6. 更新思考状态并返回结果 ---------- # 更新当前思考内容 diff --git a/src/plugins/heartFC_chat/heartFC_chat.py b/src/plugins/heartFC_chat/heartFC_chat.py index 1237378a4..83c435341 100644 --- a/src/plugins/heartFC_chat/heartFC_chat.py +++ b/src/plugins/heartFC_chat/heartFC_chat.py @@ -17,7 +17,7 @@ from src.plugins.utils.timer_calculator import Timer # <--- Import Timer from src.plugins.heartFC_chat.heartFC_generator import HeartFCGenerator from src.do_tool.tool_use import ToolUser from src.plugins.emoji_system.emoji_manager import emoji_manager -from src.plugins.utils.json_utils import process_llm_tool_response # 导入新的JSON工具 +from src.plugins.utils.json_utils import process_llm_tool_calls, extract_tool_call_arguments from src.heart_flow.sub_mind import SubMind from src.heart_flow.observation import Observation from src.plugins.heartFC_chat.heartflow_prompt_builder import global_prompt_manager @@ -401,20 +401,24 @@ class HeartFChatting: with Timer("决策", cycle_timers): planner_result = await self._planner(current_mind, cycle_timers) - action = planner_result.get("action", "error") - reasoning = planner_result.get("reasoning", "未提供理由") + + # 效果不太好,还没处理replan导致观察时间点改变的问题 + + # action = planner_result.get("action", "error") + # reasoning = planner_result.get("reasoning", "未提供理由") - self._current_cycle.set_action_info(action, reasoning, False) + # self._current_cycle.set_action_info(action, reasoning, False) # 在获取规划结果后检查新消息 - if await self._check_new_messages(planner_start_db_time): - if random.random() < 0.2: - logger.info(f"{self.log_prefix} 看到了新消息,麦麦决定重新观察和规划...") - # 重新规划 - with Timer("重新决策", cycle_timers): - self._current_cycle.replanned = True - planner_result = await self._planner(current_mind, cycle_timers, is_re_planned=True) - logger.info(f"{self.log_prefix} 重新规划完成.") + + # if await self._check_new_messages(planner_start_db_time): + # if random.random() < 0.2: + # logger.info(f"{self.log_prefix} 看到了新消息,麦麦决定重新观察和规划...") + # # 重新规划 + # with Timer("重新决策", cycle_timers): + # self._current_cycle.replanned = True + # planner_result = await self._planner(current_mind, cycle_timers, is_re_planned=True) + # logger.info(f"{self.log_prefix} 重新规划完成.") # 解析规划结果 action = planner_result.get("action", "error") @@ -736,94 +740,104 @@ class HeartFChatting: observed_messages_str = observation.talking_message_str # --- 使用 LLM 进行决策 --- # - action = "no_reply" # 默认动作 - emoji_query = "" # 默认表情查询 reasoning = "默认决策或获取决策失败" llm_error = False # LLM错误标志 + arguments = None # 初始化参数变量 + emoji_query = "" # <--- 在这里初始化 emoji_query try: - # 构建提示词 - + # --- 构建提示词 --- + replan_prompt_str = "" if is_re_planned: - replan_prompt = await self._build_replan_prompt( + replan_prompt_str = await self._build_replan_prompt( self._current_cycle.action_type, self._current_cycle.reasoning ) - prompt = replan_prompt - else: - replan_prompt = "" prompt = await self._build_planner_prompt( - observed_messages_str, current_mind, self.sub_mind.structured_info, replan_prompt + observed_messages_str, current_mind, self.sub_mind.structured_info, replan_prompt_str ) - payload = { - "model": global_config.llm_plan["name"], - "messages": [{"role": "user", "content": prompt}], - "tools": self.action_manager.get_planner_tool_definition(), - "tool_choice": {"type": "function", "function": {"name": "decide_reply_action"}}, - } - - # 执行LLM请求 + # --- 调用 LLM --- try: - print("prompt") - print("prompt") - print("prompt") - print(payload) - print(prompt) - response = await self.planner_llm._execute_request( - endpoint="/chat/completions", payload=payload, prompt=prompt + planner_tools = self.action_manager.get_planner_tool_definition() + _response_text, _reasoning_content, tool_calls = await self.planner_llm.generate_response_tool_async( + prompt=prompt, + tools=planner_tools, ) - print(response) + logger.debug(f"{self.log_prefix}[Planner] 原始人 LLM响应: {_response_text}") except Exception as req_e: logger.error(f"{self.log_prefix}[Planner] LLM请求执行失败: {req_e}") + action = "error" + reasoning = f"LLM请求失败: {req_e}" + llm_error = True + # 直接返回错误结果 return { - "action": "error", - "reasoning": f"LLM请求执行失败: {req_e}", + "action": action, + "reasoning": reasoning, "emoji_query": "", "current_mind": current_mind, "observed_messages": observed_messages, - "llm_error": True, + "llm_error": llm_error, } - # 处理LLM响应 - with Timer("使用工具", cycle_timers): - # 使用辅助函数处理工具调用响应 - print(1111122222222222) - print(response) + # 默认错误状态 + action = "error" + reasoning = "处理工具调用时出错" + llm_error = True - success, arguments, error_msg = process_llm_tool_response( - response, expected_tool_name="decide_reply_action", log_prefix=f"{self.log_prefix}[Planner] " - ) + # 1. 验证工具调用 + success, valid_tool_calls, error_msg = process_llm_tool_calls( + tool_calls, log_prefix=f"{self.log_prefix}[Planner] " + ) - if success: - # 提取决策参数 - action = arguments.get("action", "no_reply") - # 验证动作是否在可用动作集中 - if action not in self.action_manager.get_available_actions(): + if success and valid_tool_calls: + # 2. 提取第一个调用并获取参数 + first_tool_call = valid_tool_calls[0] + tool_name = first_tool_call.get("function", {}).get("name") + arguments = extract_tool_call_arguments(first_tool_call, None) + + # 3. 检查名称和参数 + expected_tool_name = "decide_reply_action" + if tool_name == expected_tool_name and arguments is not None: + # 4. 成功,提取决策 + extracted_action = arguments.get("action", "no_reply") + # 验证动作 + if extracted_action not in self.action_manager.get_available_actions(): logger.warning( - f"{self.log_prefix}[Planner] LLM返回了未授权的动作: {action},使用默认动作no_reply" + f"{self.log_prefix}[Planner] LLM返回了未授权的动作: {extracted_action},使用默认动作no_reply" ) action = "no_reply" - reasoning = f"LLM返回了未授权的动作: {action}" + reasoning = f"LLM返回了未授权的动作: {extracted_action}" + emoji_query = "" + llm_error = False # 视为非LLM错误,只是逻辑修正 else: + # 动作有效,使用提取的值 + action = extracted_action reasoning = arguments.get("reasoning", "未提供理由") emoji_query = arguments.get("emoji_query", "") - - # 记录决策结果 - logger.debug( - f"{self.log_prefix}[要做什么]\nPrompt:\n{prompt}\n\n决策结果: {action}, 理由: {reasoning}, 表情查询: '{emoji_query}'" - ) - else: - # 处理工具调用失败 - logger.warning(f"{self.log_prefix}[Planner] {error_msg}") - action = "error" - reasoning = error_msg - llm_error = True + llm_error = False # 成功处理 + # 记录决策结果 + logger.debug( + f"{self.log_prefix}[要做什么]\nPrompt:\n{prompt}\n\n决策结果: {action}, 理由: {reasoning}, 表情查询: '{emoji_query}'" + ) + elif tool_name != expected_tool_name: + reasoning = f"LLM返回了非预期的工具: {tool_name}" + logger.warning(f"{self.log_prefix}[Planner] {reasoning}") + else: # arguments is None + reasoning = f"无法提取工具 {tool_name} 的参数" + logger.warning(f"{self.log_prefix}[Planner] {reasoning}") + elif not success: + reasoning = f"验证工具调用失败: {error_msg}" + logger.warning(f"{self.log_prefix}[Planner] {reasoning}") + else: # not valid_tool_calls + reasoning = "LLM未返回有效的工具调用" + logger.warning(f"{self.log_prefix}[Planner] {reasoning}") + # 如果 llm_error 仍然是 True,说明在处理过程中有错误发生 except Exception as llm_e: - logger.error(f"{self.log_prefix}[Planner] Planner LLM处理过程中出错: {llm_e}") - logger.error(traceback.format_exc()) # 记录完整堆栈以便调试 + logger.error(f"{self.log_prefix}[Planner] Planner LLM处理过程中发生意外错误: {llm_e}") + logger.error(traceback.format_exc()) action = "error" - reasoning = f"LLM处理失败: {llm_e}" + reasoning = f"Planner内部处理错误: {llm_e}" llm_error = True # --- 结束 LLM 决策 --- # diff --git a/src/plugins/models/utils_model.py b/src/plugins/models/utils_model.py index 7c87cf946..2ff4c3949 100644 --- a/src/plugins/models/utils_model.py +++ b/src/plugins/models/utils_model.py @@ -739,7 +739,7 @@ class LLMRequest: return response - async def generate_response_tool_async(self, prompt: str, tools: list, **kwargs) -> Union[str, Tuple]: + async def generate_response_tool_async(self, prompt: str, tools: list, **kwargs) -> tuple[str, str, list]: """异步方式根据输入的提示生成模型的响应""" # 构建请求体,不硬编码max_tokens data = { @@ -750,16 +750,18 @@ class LLMRequest: "tools": tools, } - logger.debug(f"向模型 {self.model_name} 发送工具调用请求,包含 {len(tools)} 个工具") + response = await self._execute_request(endpoint="/chat/completions", payload=data, prompt=prompt) + logger.debug(f"向模型 {self.model_name} 发送工具调用请求,包含 {len(tools)} 个工具,返回结果: {response}") # 检查响应是否包含工具调用 - if isinstance(response, tuple) and len(response) == 3: + if len(response) == 3: content, reasoning_content, tool_calls = response logger.debug(f"收到工具调用响应,包含 {len(tool_calls) if tool_calls else 0} 个工具调用") return content, reasoning_content, tool_calls else: + content, reasoning_content = response logger.debug("收到普通响应,无工具调用") - return response + return content, reasoning_content, None async def get_embedding(self, text: str) -> Union[list, None]: """异步方法:获取文本的embedding向量 diff --git a/src/plugins/utils/json_utils.py b/src/plugins/utils/json_utils.py index a76c83e39..63e39198a 100644 --- a/src/plugins/utils/json_utils.py +++ b/src/plugins/utils/json_utils.py @@ -70,55 +70,6 @@ def extract_tool_call_arguments(tool_call: Dict[str, Any], default_value: Dict[s return default_result -def get_json_value( - json_obj: Dict[str, Any], key_path: str, default_value: T = None, transform_func: Callable[[Any], T] = None -) -> Union[Any, T]: - """ - 从JSON对象中按照路径提取值,支持点表示法路径,如"data.items.0.name" - - 参数: - json_obj: JSON对象(已解析的字典) - key_path: 键路径,使用点表示法,如"data.items.0.name" - default_value: 获取失败时返回的默认值 - transform_func: 可选的转换函数,用于对获取的值进行转换 - - 返回: - 路径指向的值,或在获取失败时返回default_value - """ - if not json_obj or not key_path: - return default_value - - try: - # 分割路径 - keys = key_path.split(".") - current = json_obj - - # 遍历路径 - for key in keys: - # 处理数组索引 - if key.isdigit() and isinstance(current, list): - index = int(key) - if 0 <= index < len(current): - current = current[index] - else: - return default_value - # 处理字典键 - elif isinstance(current, dict): - if key in current: - current = current[key] - else: - return default_value - else: - return default_value - - # 应用转换函数(如果提供) - if transform_func and current is not None: - return transform_func(current) - return current - except Exception as e: - logger.error(f"从JSON获取值时出错: {e}, 路径: {key_path}") - return default_value - def safe_json_dumps(obj: Any, default_value: str = "{}", ensure_ascii: bool = False, pretty: bool = False) -> str: """ @@ -144,21 +95,6 @@ def safe_json_dumps(obj: Any, default_value: str = "{}", ensure_ascii: bool = Fa return default_value -def merge_json_objects(*objects: Dict[str, Any]) -> Dict[str, Any]: - """ - 合并多个JSON对象(字典) - - 参数: - *objects: 要合并的JSON对象(字典) - - 返回: - 合并后的字典,后面的对象会覆盖前面对象的相同键 - """ - result = {} - for obj in objects: - if obj and isinstance(obj, dict): - result.update(obj) - return result def normalize_llm_response(response: Any, log_prefix: str = "") -> Tuple[bool, List[Any], str]: @@ -172,6 +108,9 @@ def normalize_llm_response(response: Any, log_prefix: str = "") -> Tuple[bool, L 返回: 元组 (成功标志, 标准化后的响应列表, 错误消息) """ + + logger.debug(f"{log_prefix}原始人 LLM响应: {response}") + # 检查是否为None if response is None: return False, [], "LLM响应为None" @@ -201,114 +140,60 @@ def normalize_llm_response(response: Any, log_prefix: str = "") -> Tuple[bool, L return True, response, "" -def process_llm_tool_calls(response: List[Any], log_prefix: str = "") -> Tuple[bool, List[Dict[str, Any]], str]: +def process_llm_tool_calls(tool_calls: List[Dict[str, Any]], log_prefix: str = "") -> Tuple[bool, List[Dict[str, Any]], str]: """ - 处理并提取LLM响应中的工具调用列表 + 处理并验证LLM响应中的工具调用列表 参数: - response: 标准化后的LLM响应列表 + tool_calls: 从LLM响应中直接获取的工具调用列表 log_prefix: 日志前缀 返回: - 元组 (成功标志, 工具调用列表, 错误消息) + 元组 (成功标志, 验证后的工具调用列表, 错误消息) """ - # 确保响应格式正确 - print(response) - print(11111111111111111) - if len(response) != 3: - return False, [], f"LLM响应元素数量不正确: 预期3个元素,实际{len(response)}个" + # 如果列表为空,表示没有工具调用,这不是错误 + if not tool_calls: + return True, [], "工具调用列表为空" - # 提取工具调用部分 - tool_calls = response[2] - - # 检查工具调用是否有效 - if tool_calls is None: - return False, [], "工具调用部分为None" - - if not isinstance(tool_calls, list): - return False, [], f"工具调用部分不是列表: {type(tool_calls).__name__}" - - if len(tool_calls) == 0: - return False, [], "工具调用列表为空" - - # 检查工具调用是否格式正确 + # 验证每个工具调用的格式 valid_tool_calls = [] for i, tool_call in enumerate(tool_calls): if not isinstance(tool_call, dict): - logger.warning(f"{log_prefix}工具调用[{i}]不是字典: {type(tool_call).__name__}") + logger.warning(f"{log_prefix}工具调用[{i}]不是字典: {type(tool_call).__name__}, 内容: {tool_call}") continue + # 检查基本结构 if tool_call.get("type") != "function": - logger.warning(f"{log_prefix}工具调用[{i}]不是函数类型: {tool_call.get('type', '未知')}") + logger.warning(f"{log_prefix}工具调用[{i}]不是function类型: type={tool_call.get('type', '未定义')}, 内容: {tool_call}") continue - if "function" not in tool_call or not isinstance(tool_call["function"], dict): - logger.warning(f"{log_prefix}工具调用[{i}]缺少function字段或格式不正确") + if "function" not in tool_call or not isinstance(tool_call.get("function"), dict): + logger.warning(f"{log_prefix}工具调用[{i}]缺少'function'字段或其类型不正确: {tool_call}") + continue + + func_details = tool_call["function"] + if "name" not in func_details or not isinstance(func_details.get("name"), str): + logger.warning(f"{log_prefix}工具调用[{i}]的'function'字段缺少'name'或类型不正确: {func_details}") + continue + if "arguments" not in func_details or not isinstance(func_details.get("arguments"), str): # 参数是字符串形式的JSON + logger.warning(f"{log_prefix}工具调用[{i}]的'function'字段缺少'arguments'或类型不正确: {func_details}") + continue + + # 可选:尝试解析参数JSON,确保其有效 + args_str = func_details["arguments"] + try: + json.loads(args_str) # 尝试解析,但不存储结果 + except json.JSONDecodeError as e: + logger.warning(f"{log_prefix}工具调用[{i}]的'arguments'不是有效的JSON字符串: {e}, 内容: {args_str[:100]}...") + continue + except Exception as e: + logger.warning(f"{log_prefix}解析工具调用[{i}]的'arguments'时发生意外错误: {e}, 内容: {args_str[:100]}...") continue valid_tool_calls.append(tool_call) - # 检查是否有有效的工具调用 - if not valid_tool_calls: - return False, [], "没有找到有效的工具调用" + if not valid_tool_calls and tool_calls: # 如果原始列表不为空,但验证后为空 + return False, [], "所有工具调用格式均无效" return True, valid_tool_calls, "" - - -def process_llm_tool_response( - response: Any, expected_tool_name: str = None, log_prefix: str = "" -) -> Tuple[bool, Dict[str, Any], str]: - """ - 处理LLM返回的工具调用响应,进行常见错误检查并提取参数 - - 参数: - response: LLM的响应,预期是[content, reasoning, tool_calls]格式的列表或元组 - expected_tool_name: 预期的工具名称,如不指定则不检查 - log_prefix: 日志前缀,用于标识日志来源 - - 返回: - 三元组(成功标志, 参数字典, 错误描述) - - 如果成功解析,返回(True, 参数字典, "") - - 如果解析失败,返回(False, {}, 错误描述) - """ - # 使用新的标准化函数 - success, normalized_response, error_msg = normalize_llm_response(response, log_prefix) - if not success: - return False, {}, error_msg - - # 新增检查:确保响应包含预期的工具调用部分 - if len(normalized_response) != 3: - # 如果长度不为3,说明LLM响应不包含工具调用部分,这在期望工具调用的上下文中是错误的 - error_msg = ( - f"LLM响应未包含预期的工具调用部分: 元素数量{len(normalized_response)},响应内容:{normalized_response}" - ) - logger.warning(f"{log_prefix}{error_msg}") - return False, {}, error_msg - - # 使用新的工具调用处理函数 - # 此时已知 normalized_response 长度必定为 3 - success, valid_tool_calls, error_msg = process_llm_tool_calls(normalized_response, log_prefix) - if not success: - return False, {}, error_msg - - # 检查是否有工具调用 - if not valid_tool_calls: - return False, {}, "没有有效的工具调用" - - # 获取第一个工具调用 - tool_call = valid_tool_calls[0] - - # 检查工具名称(如果提供了预期名称) - if expected_tool_name: - actual_name = tool_call.get("function", {}).get("name") - if actual_name != expected_tool_name: - return False, {}, f"工具名称不匹配: 预期'{expected_tool_name}',实际'{actual_name}'" - - # 提取并解析参数 - try: - arguments = extract_tool_call_arguments(tool_call, {}) - return True, arguments, "" - except Exception as e: - logger.error(f"{log_prefix}解析工具参数时出错: {e}") - return False, {}, f"解析参数失败: {str(e)}"