fix:优化工具解析

This commit is contained in:
SengokuCola
2025-04-28 19:31:00 +08:00
parent 629cdb007b
commit f83e151d40
4 changed files with 159 additions and 262 deletions

View File

@@ -66,6 +66,9 @@ class SubMind:
self.current_mind = ""
self.past_mind = []
self.structured_info = {}
name = chat_manager.get_stream_name(self.subheartflow_id)
self.log_prefix = f"[{name}] "
async def do_thinking_before_reply(self, history_cycle: list[CycleInfo] = None):
"""
@@ -77,6 +80,8 @@ class SubMind:
# 更新活跃时间
self.last_active_time = time.time()
# ---------- 1. 准备基础数据 ----------
# 获取现有想法和情绪状态
current_thinking_info = self.current_mind
@@ -85,7 +90,7 @@ class SubMind:
# 获取观察对象
observation = self.observations[0]
if not observation:
logger.error(f"[{self.subheartflow_id}] 无法获取观察对象")
logger.error(f"{self.log_prefix} 无法获取观察对象")
self.update_current_mind("(我没看到任何聊天内容...)")
return self.current_mind, self.past_mind
@@ -223,57 +228,48 @@ class SubMind:
try:
# 调用LLM生成响应
response = await self.llm_model.generate_response_tool_async(prompt=prompt, tools=tools)
response, _reasoning_content, tool_calls = await self.llm_model.generate_response_tool_async(prompt=prompt, tools=tools)
# 标准化响应格式
success, normalized_response, error_msg = normalize_llm_response(
response, log_prefix=f"[{self.subheartflow_id}] "
)
logger.debug(f"{self.log_prefix} 子心流输出的原始LLM响应: {response}")
# 直接使用LLM返回的文本响应作为 content
content = response if response else ""
if not success:
# 处理标准化失败情况
logger.warning(f"[{self.subheartflow_id}] {error_msg}")
content = "LLM响应格式无法处理"
else:
# 从标准化响应中提取内容
if len(normalized_response) >= 2:
content = normalized_response[0]
_reasoning_content = normalized_response[1] if len(normalized_response) > 1 else ""
# 处理可能的工具调用
if len(normalized_response) == 3:
# 提取并验证工具调用
success, valid_tool_calls, error_msg = process_llm_tool_calls(
normalized_response, log_prefix=f"[{self.subheartflow_id}] "
if tool_calls:
# 直接将 tool_calls 传递给处理函数
success, valid_tool_calls, error_msg = process_llm_tool_calls(
tool_calls, log_prefix=f"{self.log_prefix} "
)
if success and valid_tool_calls:
# 记录工具调用信息
tool_calls_str = ", ".join(
[call.get("function", {}).get("name", "未知工具") for call in valid_tool_calls]
)
logger.info(
f"{self.log_prefix} 模型请求调用{len(valid_tool_calls)}个工具: {tool_calls_str}"
)
if success and valid_tool_calls:
# 记录工具调用信息
tool_calls_str = ", ".join(
[call.get("function", {}).get("name", "未知工具") for call in valid_tool_calls]
)
logger.info(
f"[{self.subheartflow_id}] 模型请求调用{len(valid_tool_calls)}个工具: {tool_calls_str}"
)
# 收集工具执行结果
await self._execute_tool_calls(valid_tool_calls, tool_instance)
elif not success:
logger.warning(f"{self.log_prefix} 处理工具调用时出错: {error_msg}")
else:
logger.info(f"{self.log_prefix} 心流未使用工具") # 修改日志信息,明确是未使用工具而不是未处理
# 收集工具执行结果
await self._execute_tool_calls(valid_tool_calls, tool_instance)
elif not success:
logger.warning(f"[{self.subheartflow_id}] {error_msg}")
except Exception as e:
# 处理总体异常
logger.error(f"[{self.subheartflow_id}] 执行LLM请求或处理响应时出错: {e}")
logger.error(f"{self.log_prefix} 执行LLM请求或处理响应时出错: {e}")
logger.error(traceback.format_exc())
content = "思考过程中出现错误"
# 记录最终思考结果
name = chat_manager.get_stream_name(self.subheartflow_id)
logger.debug(f"[{name}] \nPrompt:\n{prompt}\n\n心流思考结果:\n{content}\n")
logger.debug(f"{self.log_prefix} \nPrompt:\n{prompt}\n\n心流思考结果:\n{content}\n")
# 处理空响应情况
if not content:
content = "(不知道该想些什么...)"
logger.warning(f"[{self.subheartflow_id}] LLM返回空结果思考失败。")
logger.warning(f"{self.log_prefix} LLM返回空结果思考失败。")
# ---------- 6. 更新思考状态并返回结果 ----------
# 更新当前思考内容

View File

@@ -17,7 +17,7 @@ from src.plugins.utils.timer_calculator import Timer # <--- Import Timer
from src.plugins.heartFC_chat.heartFC_generator import HeartFCGenerator
from src.do_tool.tool_use import ToolUser
from src.plugins.emoji_system.emoji_manager import emoji_manager
from src.plugins.utils.json_utils import process_llm_tool_response # 导入新的JSON工具
from src.plugins.utils.json_utils import process_llm_tool_calls, extract_tool_call_arguments
from src.heart_flow.sub_mind import SubMind
from src.heart_flow.observation import Observation
from src.plugins.heartFC_chat.heartflow_prompt_builder import global_prompt_manager
@@ -401,20 +401,24 @@ class HeartFChatting:
with Timer("决策", cycle_timers):
planner_result = await self._planner(current_mind, cycle_timers)
action = planner_result.get("action", "error")
reasoning = planner_result.get("reasoning", "未提供理由")
# 效果不太好还没处理replan导致观察时间点改变的问题
# action = planner_result.get("action", "error")
# reasoning = planner_result.get("reasoning", "未提供理由")
self._current_cycle.set_action_info(action, reasoning, False)
# self._current_cycle.set_action_info(action, reasoning, False)
# 在获取规划结果后检查新消息
if await self._check_new_messages(planner_start_db_time):
if random.random() < 0.2:
logger.info(f"{self.log_prefix} 看到了新消息,麦麦决定重新观察和规划...")
# 重新规划
with Timer("重新决策", cycle_timers):
self._current_cycle.replanned = True
planner_result = await self._planner(current_mind, cycle_timers, is_re_planned=True)
logger.info(f"{self.log_prefix} 重新规划完成.")
# if await self._check_new_messages(planner_start_db_time):
# if random.random() < 0.2:
# logger.info(f"{self.log_prefix} 看到了新消息,麦麦决定重新观察和规划...")
# # 重新规划
# with Timer("重新决策", cycle_timers):
# self._current_cycle.replanned = True
# planner_result = await self._planner(current_mind, cycle_timers, is_re_planned=True)
# logger.info(f"{self.log_prefix} 重新规划完成.")
# 解析规划结果
action = planner_result.get("action", "error")
@@ -736,94 +740,104 @@ class HeartFChatting:
observed_messages_str = observation.talking_message_str
# --- 使用 LLM 进行决策 --- #
action = "no_reply" # 默认动作
emoji_query = "" # 默认表情查询
reasoning = "默认决策或获取决策失败"
llm_error = False # LLM错误标志
arguments = None # 初始化参数变量
emoji_query = "" # <--- 在这里初始化 emoji_query
try:
# 构建提示词
# --- 构建提示词 ---
replan_prompt_str = ""
if is_re_planned:
replan_prompt = await self._build_replan_prompt(
replan_prompt_str = await self._build_replan_prompt(
self._current_cycle.action_type, self._current_cycle.reasoning
)
prompt = replan_prompt
else:
replan_prompt = ""
prompt = await self._build_planner_prompt(
observed_messages_str, current_mind, self.sub_mind.structured_info, replan_prompt
observed_messages_str, current_mind, self.sub_mind.structured_info, replan_prompt_str
)
payload = {
"model": global_config.llm_plan["name"],
"messages": [{"role": "user", "content": prompt}],
"tools": self.action_manager.get_planner_tool_definition(),
"tool_choice": {"type": "function", "function": {"name": "decide_reply_action"}},
}
# 执行LLM请求
# --- 调用 LLM ---
try:
print("prompt")
print("prompt")
print("prompt")
print(payload)
print(prompt)
response = await self.planner_llm._execute_request(
endpoint="/chat/completions", payload=payload, prompt=prompt
planner_tools = self.action_manager.get_planner_tool_definition()
_response_text, _reasoning_content, tool_calls = await self.planner_llm.generate_response_tool_async(
prompt=prompt,
tools=planner_tools,
)
print(response)
logger.debug(f"{self.log_prefix}[Planner] 原始人 LLM响应: {_response_text}")
except Exception as req_e:
logger.error(f"{self.log_prefix}[Planner] LLM请求执行失败: {req_e}")
action = "error"
reasoning = f"LLM请求失败: {req_e}"
llm_error = True
# 直接返回错误结果
return {
"action": "error",
"reasoning": f"LLM请求执行失败: {req_e}",
"action": action,
"reasoning": reasoning,
"emoji_query": "",
"current_mind": current_mind,
"observed_messages": observed_messages,
"llm_error": True,
"llm_error": llm_error,
}
# 处理LLM响应
with Timer("使用工具", cycle_timers):
# 使用辅助函数处理工具调用响应
print(1111122222222222)
print(response)
# 默认错误状态
action = "error"
reasoning = "处理工具调用时出错"
llm_error = True
success, arguments, error_msg = process_llm_tool_response(
response, expected_tool_name="decide_reply_action", log_prefix=f"{self.log_prefix}[Planner] "
)
# 1. 验证工具调用
success, valid_tool_calls, error_msg = process_llm_tool_calls(
tool_calls, log_prefix=f"{self.log_prefix}[Planner] "
)
if success:
# 提取决策参数
action = arguments.get("action", "no_reply")
# 验证动作是否在可用动作集中
if action not in self.action_manager.get_available_actions():
if success and valid_tool_calls:
# 2. 提取第一个调用并获取参数
first_tool_call = valid_tool_calls[0]
tool_name = first_tool_call.get("function", {}).get("name")
arguments = extract_tool_call_arguments(first_tool_call, None)
# 3. 检查名称和参数
expected_tool_name = "decide_reply_action"
if tool_name == expected_tool_name and arguments is not None:
# 4. 成功,提取决策
extracted_action = arguments.get("action", "no_reply")
# 验证动作
if extracted_action not in self.action_manager.get_available_actions():
logger.warning(
f"{self.log_prefix}[Planner] LLM返回了未授权的动作: {action}使用默认动作no_reply"
f"{self.log_prefix}[Planner] LLM返回了未授权的动作: {extracted_action}使用默认动作no_reply"
)
action = "no_reply"
reasoning = f"LLM返回了未授权的动作: {action}"
reasoning = f"LLM返回了未授权的动作: {extracted_action}"
emoji_query = ""
llm_error = False # 视为非LLM错误只是逻辑修正
else:
# 动作有效,使用提取的值
action = extracted_action
reasoning = arguments.get("reasoning", "未提供理由")
emoji_query = arguments.get("emoji_query", "")
# 记录决策结果
logger.debug(
f"{self.log_prefix}[要做什么]\nPrompt:\n{prompt}\n\n决策结果: {action}, 理由: {reasoning}, 表情查询: '{emoji_query}'"
)
else:
# 处理工具调用失败
logger.warning(f"{self.log_prefix}[Planner] {error_msg}")
action = "error"
reasoning = error_msg
llm_error = True
llm_error = False # 成功处理
# 记录决策结果
logger.debug(
f"{self.log_prefix}[要做什么]\nPrompt:\n{prompt}\n\n决策结果: {action}, 理由: {reasoning}, 表情查询: '{emoji_query}'"
)
elif tool_name != expected_tool_name:
reasoning = f"LLM返回了非预期的工具: {tool_name}"
logger.warning(f"{self.log_prefix}[Planner] {reasoning}")
else: # arguments is None
reasoning = f"无法提取工具 {tool_name} 的参数"
logger.warning(f"{self.log_prefix}[Planner] {reasoning}")
elif not success:
reasoning = f"验证工具调用失败: {error_msg}"
logger.warning(f"{self.log_prefix}[Planner] {reasoning}")
else: # not valid_tool_calls
reasoning = "LLM未返回有效的工具调用"
logger.warning(f"{self.log_prefix}[Planner] {reasoning}")
# 如果 llm_error 仍然是 True说明在处理过程中有错误发生
except Exception as llm_e:
logger.error(f"{self.log_prefix}[Planner] Planner LLM处理过程中出错: {llm_e}")
logger.error(traceback.format_exc()) # 记录完整堆栈以便调试
logger.error(f"{self.log_prefix}[Planner] Planner LLM处理过程中发生意外错误: {llm_e}")
logger.error(traceback.format_exc())
action = "error"
reasoning = f"LLM处理失败: {llm_e}"
reasoning = f"Planner内部处理错误: {llm_e}"
llm_error = True
# --- 结束 LLM 决策 --- #

View File

@@ -739,7 +739,7 @@ class LLMRequest:
return response
async def generate_response_tool_async(self, prompt: str, tools: list, **kwargs) -> Union[str, Tuple]:
async def generate_response_tool_async(self, prompt: str, tools: list, **kwargs) -> tuple[str, str, list]:
"""异步方式根据输入的提示生成模型的响应"""
# 构建请求体不硬编码max_tokens
data = {
@@ -750,16 +750,18 @@ class LLMRequest:
"tools": tools,
}
logger.debug(f"向模型 {self.model_name} 发送工具调用请求,包含 {len(tools)} 个工具")
response = await self._execute_request(endpoint="/chat/completions", payload=data, prompt=prompt)
logger.debug(f"向模型 {self.model_name} 发送工具调用请求,包含 {len(tools)} 个工具,返回结果: {response}")
# 检查响应是否包含工具调用
if isinstance(response, tuple) and len(response) == 3:
if len(response) == 3:
content, reasoning_content, tool_calls = response
logger.debug(f"收到工具调用响应,包含 {len(tool_calls) if tool_calls else 0} 个工具调用")
return content, reasoning_content, tool_calls
else:
content, reasoning_content = response
logger.debug("收到普通响应,无工具调用")
return response
return content, reasoning_content, None
async def get_embedding(self, text: str) -> Union[list, None]:
"""异步方法获取文本的embedding向量

View File

@@ -70,55 +70,6 @@ def extract_tool_call_arguments(tool_call: Dict[str, Any], default_value: Dict[s
return default_result
def get_json_value(
json_obj: Dict[str, Any], key_path: str, default_value: T = None, transform_func: Callable[[Any], T] = None
) -> Union[Any, T]:
"""
从JSON对象中按照路径提取值支持点表示法路径"data.items.0.name"
参数:
json_obj: JSON对象(已解析的字典)
key_path: 键路径,使用点表示法,如"data.items.0.name"
default_value: 获取失败时返回的默认值
transform_func: 可选的转换函数,用于对获取的值进行转换
返回:
路径指向的值或在获取失败时返回default_value
"""
if not json_obj or not key_path:
return default_value
try:
# 分割路径
keys = key_path.split(".")
current = json_obj
# 遍历路径
for key in keys:
# 处理数组索引
if key.isdigit() and isinstance(current, list):
index = int(key)
if 0 <= index < len(current):
current = current[index]
else:
return default_value
# 处理字典键
elif isinstance(current, dict):
if key in current:
current = current[key]
else:
return default_value
else:
return default_value
# 应用转换函数(如果提供)
if transform_func and current is not None:
return transform_func(current)
return current
except Exception as e:
logger.error(f"从JSON获取值时出错: {e}, 路径: {key_path}")
return default_value
def safe_json_dumps(obj: Any, default_value: str = "{}", ensure_ascii: bool = False, pretty: bool = False) -> str:
"""
@@ -144,21 +95,6 @@ def safe_json_dumps(obj: Any, default_value: str = "{}", ensure_ascii: bool = Fa
return default_value
def merge_json_objects(*objects: Dict[str, Any]) -> Dict[str, Any]:
"""
合并多个JSON对象(字典)
参数:
*objects: 要合并的JSON对象(字典)
返回:
合并后的字典,后面的对象会覆盖前面对象的相同键
"""
result = {}
for obj in objects:
if obj and isinstance(obj, dict):
result.update(obj)
return result
def normalize_llm_response(response: Any, log_prefix: str = "") -> Tuple[bool, List[Any], str]:
@@ -172,6 +108,9 @@ def normalize_llm_response(response: Any, log_prefix: str = "") -> Tuple[bool, L
返回:
元组 (成功标志, 标准化后的响应列表, 错误消息)
"""
logger.debug(f"{log_prefix}原始人 LLM响应: {response}")
# 检查是否为None
if response is None:
return False, [], "LLM响应为None"
@@ -201,114 +140,60 @@ def normalize_llm_response(response: Any, log_prefix: str = "") -> Tuple[bool, L
return True, response, ""
def process_llm_tool_calls(response: List[Any], log_prefix: str = "") -> Tuple[bool, List[Dict[str, Any]], str]:
def process_llm_tool_calls(tool_calls: List[Dict[str, Any]], log_prefix: str = "") -> Tuple[bool, List[Dict[str, Any]], str]:
"""
处理并提取LLM响应中的工具调用列表
处理并验证LLM响应中的工具调用列表
参数:
response: 标准化后的LLM响应列表
tool_calls: 从LLM响应中直接获取的工具调用列表
log_prefix: 日志前缀
返回:
元组 (成功标志, 工具调用列表, 错误消息)
元组 (成功标志, 验证后的工具调用列表, 错误消息)
"""
# 确保响应格式正确
print(response)
print(11111111111111111)
if len(response) != 3:
return False, [], f"LLM响应元素数量不正确: 预期3个元素实际{len(response)}"
# 如果列表为空,表示没有工具调用,这不是错误
if not tool_calls:
return True, [], "工具调用列表为空"
# 提取工具调用部分
tool_calls = response[2]
# 检查工具调用是否有效
if tool_calls is None:
return False, [], "工具调用部分为None"
if not isinstance(tool_calls, list):
return False, [], f"工具调用部分不是列表: {type(tool_calls).__name__}"
if len(tool_calls) == 0:
return False, [], "工具调用列表为空"
# 检查工具调用是否格式正确
# 验证每个工具调用的格式
valid_tool_calls = []
for i, tool_call in enumerate(tool_calls):
if not isinstance(tool_call, dict):
logger.warning(f"{log_prefix}工具调用[{i}]不是字典: {type(tool_call).__name__}")
logger.warning(f"{log_prefix}工具调用[{i}]不是字典: {type(tool_call).__name__}, 内容: {tool_call}")
continue
# 检查基本结构
if tool_call.get("type") != "function":
logger.warning(f"{log_prefix}工具调用[{i}]不是函数类型: {tool_call.get('type', '')}")
logger.warning(f"{log_prefix}工具调用[{i}]不是function类型: type={tool_call.get('type', '定义')}, 内容: {tool_call}")
continue
if "function" not in tool_call or not isinstance(tool_call["function"], dict):
logger.warning(f"{log_prefix}工具调用[{i}]缺少function字段或格式不正确")
if "function" not in tool_call or not isinstance(tool_call.get("function"), dict):
logger.warning(f"{log_prefix}工具调用[{i}]缺少'function'字段或其类型不正确: {tool_call}")
continue
func_details = tool_call["function"]
if "name" not in func_details or not isinstance(func_details.get("name"), str):
logger.warning(f"{log_prefix}工具调用[{i}]的'function'字段缺少'name'或类型不正确: {func_details}")
continue
if "arguments" not in func_details or not isinstance(func_details.get("arguments"), str): # 参数是字符串形式的JSON
logger.warning(f"{log_prefix}工具调用[{i}]的'function'字段缺少'arguments'或类型不正确: {func_details}")
continue
# 可选尝试解析参数JSON确保其有效
args_str = func_details["arguments"]
try:
json.loads(args_str) # 尝试解析,但不存储结果
except json.JSONDecodeError as e:
logger.warning(f"{log_prefix}工具调用[{i}]的'arguments'不是有效的JSON字符串: {e}, 内容: {args_str[:100]}...")
continue
except Exception as e:
logger.warning(f"{log_prefix}解析工具调用[{i}]的'arguments'时发生意外错误: {e}, 内容: {args_str[:100]}...")
continue
valid_tool_calls.append(tool_call)
# 检查是否有有效的工具调用
if not valid_tool_calls:
return False, [], "没有找到有效的工具调用"
if not valid_tool_calls and tool_calls: # 如果原始列表不为空,但验证后为空
return False, [], "所有工具调用格式均无效"
return True, valid_tool_calls, ""
def process_llm_tool_response(
response: Any, expected_tool_name: str = None, log_prefix: str = ""
) -> Tuple[bool, Dict[str, Any], str]:
"""
处理LLM返回的工具调用响应进行常见错误检查并提取参数
参数:
response: LLM的响应预期是[content, reasoning, tool_calls]格式的列表或元组
expected_tool_name: 预期的工具名称,如不指定则不检查
log_prefix: 日志前缀,用于标识日志来源
返回:
三元组(成功标志, 参数字典, 错误描述)
- 如果成功解析,返回(True, 参数字典, "")
- 如果解析失败,返回(False, {}, 错误描述)
"""
# 使用新的标准化函数
success, normalized_response, error_msg = normalize_llm_response(response, log_prefix)
if not success:
return False, {}, error_msg
# 新增检查:确保响应包含预期的工具调用部分
if len(normalized_response) != 3:
# 如果长度不为3说明LLM响应不包含工具调用部分这在期望工具调用的上下文中是错误的
error_msg = (
f"LLM响应未包含预期的工具调用部分: 元素数量{len(normalized_response)},响应内容:{normalized_response}"
)
logger.warning(f"{log_prefix}{error_msg}")
return False, {}, error_msg
# 使用新的工具调用处理函数
# 此时已知 normalized_response 长度必定为 3
success, valid_tool_calls, error_msg = process_llm_tool_calls(normalized_response, log_prefix)
if not success:
return False, {}, error_msg
# 检查是否有工具调用
if not valid_tool_calls:
return False, {}, "没有有效的工具调用"
# 获取第一个工具调用
tool_call = valid_tool_calls[0]
# 检查工具名称(如果提供了预期名称)
if expected_tool_name:
actual_name = tool_call.get("function", {}).get("name")
if actual_name != expected_tool_name:
return False, {}, f"工具名称不匹配: 预期'{expected_tool_name}',实际'{actual_name}'"
# 提取并解析参数
try:
arguments = extract_tool_call_arguments(tool_call, {})
return True, arguments, ""
except Exception as e:
logger.error(f"{log_prefix}解析工具参数时出错: {e}")
return False, {}, f"解析参数失败: {str(e)}"