refactor(tool_use): 添加工具调用历史记录功能,优化工具执行器的提示词和历史管理

refactor(chat_stream_impression_tool): 简化聊天流印象更新逻辑,直接使用传入参数更新
refactor(user_profile_tool): 优化用户画像更新逻辑,直接覆盖更新并移除二步调用机制
This commit is contained in:
Windpicker-owo
2025-11-02 12:43:44 +08:00
parent 28a70f85d5
commit dfb0626521
3 changed files with 172 additions and 333 deletions

View File

@@ -26,6 +26,10 @@ def init_tool_executor_prompt():
请仔细分析聊天内容,考虑以下几点: 请仔细分析聊天内容,考虑以下几点:
1. 内容中是否包含需要查询信息的问题 1. 内容中是否包含需要查询信息的问题
2. 是否有明确的工具使用指令 2. 是否有明确的工具使用指令
3. 之前的工具调用是否提供了有用的信息
4. 是否需要基于之前的工具结果进行进一步的查询
{tool_history}
If you need to use a tool, please directly call the corresponding tool function. If you do not need to use any tool, simply output "No tool needed". If you need to use a tool, please directly call the corresponding tool function. If you do not need to use any tool, simply output "No tool needed".
""" """
@@ -61,6 +65,10 @@ class ToolExecutor:
"""待处理的第二步工具调用,格式为 {tool_name: step_two_definition}""" """待处理的第二步工具调用,格式为 {tool_name: step_two_definition}"""
self._log_prefix_initialized = False self._log_prefix_initialized = False
# 工具调用历史
self.tool_call_history: list[dict[str, Any]] = []
"""工具调用历史,包含工具名称、参数和结果"""
# logger.info(f"{self.log_prefix}工具执行器初始化完成") # 移到异步初始化中 # logger.info(f"{self.log_prefix}工具执行器初始化完成") # 移到异步初始化中
async def _initialize_log_prefix(self): async def _initialize_log_prefix(self):
@@ -100,6 +108,9 @@ class ToolExecutor:
bot_name = global_config.bot.nickname bot_name = global_config.bot.nickname
# 构建工具调用历史文本
tool_history = self._format_tool_history()
# 构建工具调用提示词 # 构建工具调用提示词
prompt = await global_prompt_manager.format_prompt( prompt = await global_prompt_manager.format_prompt(
"tool_executor_prompt", "tool_executor_prompt",
@@ -108,6 +119,7 @@ class ToolExecutor:
sender=sender, sender=sender,
bot_name=bot_name, bot_name=bot_name,
time_now=time_now, time_now=time_now,
tool_history=tool_history,
) )
logger.debug(f"{self.log_prefix}开始LLM工具调用分析") logger.debug(f"{self.log_prefix}开始LLM工具调用分析")
@@ -149,6 +161,83 @@ class ToolExecutor:
return tool_definitions return tool_definitions
def _format_tool_history(self, max_history: int = 5) -> str:
"""格式化工具调用历史为文本
Args:
max_history: 最多显示的历史记录数量
Returns:
格式化的工具历史文本
"""
if not self.tool_call_history:
return ""
# 只取最近的几条历史
recent_history = self.tool_call_history[-max_history:]
history_lines = ["历史工具调用记录:"]
for i, record in enumerate(recent_history, 1):
tool_name = record.get("tool_name", "unknown")
args = record.get("args", {})
result_preview = record.get("result_preview", "")
status = record.get("status", "success")
# 格式化参数
args_str = ", ".join([f"{k}={v}" for k, v in args.items()])
# 格式化记录
status_emoji = "" if status == "success" else ""
history_lines.append(f"{i}. {status_emoji} {tool_name}({args_str})")
if result_preview:
# 限制结果预览长度
if len(result_preview) > 200:
result_preview = result_preview[:200] + "..."
history_lines.append(f" 结果: {result_preview}")
return "\n".join(history_lines)
def _add_tool_to_history(self, tool_name: str, args: dict, result: dict | None, status: str = "success"):
"""添加工具调用到历史记录
Args:
tool_name: 工具名称
args: 工具参数
result: 工具结果
status: 执行状态 (success/error)
"""
# 生成结果预览
result_preview = ""
if result:
content = result.get("content", "")
if isinstance(content, str):
result_preview = content
elif isinstance(content, list | dict):
import json
try:
result_preview = json.dumps(content, ensure_ascii=False)
except Exception:
result_preview = str(content)
else:
result_preview = str(content)
record = {
"tool_name": tool_name,
"args": args,
"result_preview": result_preview,
"status": status,
"timestamp": time.time(),
}
self.tool_call_history.append(record)
# 限制历史记录数量,避免内存溢出
max_history_size = 5
if len(self.tool_call_history) > max_history_size:
self.tool_call_history = self.tool_call_history[-max_history_size:]
async def execute_tool_calls(self, tool_calls: list[ToolCall] | None) -> tuple[list[dict[str, Any]], list[str]]: async def execute_tool_calls(self, tool_calls: list[ToolCall] | None) -> tuple[list[dict[str, Any]], list[str]]:
"""执行工具调用 """执行工具调用
@@ -183,6 +272,7 @@ class ToolExecutor:
# 执行每个工具调用 # 执行每个工具调用
for tool_call in tool_calls: for tool_call in tool_calls:
tool_name = getattr(tool_call, "func_name", "unknown_tool") tool_name = getattr(tool_call, "func_name", "unknown_tool")
tool_args = getattr(tool_call, "args", {})
try: try:
logger.debug(f"{self.log_prefix}执行工具: {tool_name}") logger.debug(f"{self.log_prefix}执行工具: {tool_name}")
@@ -204,8 +294,15 @@ class ToolExecutor:
tool_results.append(tool_info) tool_results.append(tool_info)
used_tools.append(tool_name) used_tools.append(tool_name)
logger.info(f"{self.log_prefix}工具{tool_name}执行成功,类型: {tool_info['type']}") logger.info(f"{self.log_prefix}工具{tool_name}执行成功,类型: {tool_info['type']}")
preview = content[:200] preview = content[:200] if isinstance(content, str) else str(content)[:200]
logger.debug(f"{self.log_prefix}工具{tool_name}结果内容: {preview}...") logger.debug(f"{self.log_prefix}工具{tool_name}结果内容: {preview}...")
# 记录到历史
self._add_tool_to_history(tool_name, tool_args, result, status="success")
else:
# 工具返回空结果也记录到历史
self._add_tool_to_history(tool_name, tool_args, None, status="success")
except Exception as e: except Exception as e:
logger.error(f"{self.log_prefix}工具{tool_name}执行失败: {e}") logger.error(f"{self.log_prefix}工具{tool_name}执行失败: {e}")
# 添加错误信息到结果中 # 添加错误信息到结果中
@@ -218,6 +315,9 @@ class ToolExecutor:
} }
tool_results.append(error_info) tool_results.append(error_info)
# 记录失败到历史
self._add_tool_to_history(tool_name, tool_args, None, status="error")
return tool_results, used_tools return tool_results, used_tools
async def execute_tool_call( async def execute_tool_call(
@@ -404,13 +504,32 @@ class ToolExecutor:
"timestamp": time.time(), "timestamp": time.time(),
} }
logger.info(f"{self.log_prefix}直接工具执行成功: {tool_name}") logger.info(f"{self.log_prefix}直接工具执行成功: {tool_name}")
# 记录到历史
self._add_tool_to_history(tool_name, tool_args, result, status="success")
return tool_info return tool_info
except Exception as e: except Exception as e:
logger.error(f"{self.log_prefix}直接工具执行失败 {tool_name}: {e}") logger.error(f"{self.log_prefix}直接工具执行失败 {tool_name}: {e}")
# 记录失败到历史
self._add_tool_to_history(tool_name, tool_args, None, status="error")
return None return None
def clear_tool_history(self):
"""清除工具调用历史"""
self.tool_call_history.clear()
logger.debug(f"{self.log_prefix}已清除工具调用历史")
def get_tool_history(self) -> list[dict[str, Any]]:
"""获取工具调用历史
Returns:
工具调用历史列表
"""
return self.tool_call_history.copy()
""" """
ToolExecutor使用示例 ToolExecutor使用示例
@@ -436,4 +555,22 @@ result = await executor.execute_specific_tool_simple(
tool_name="get_knowledge", tool_name="get_knowledge",
tool_args={"query": "机器学习"} tool_args={"query": "机器学习"}
) )
# 4. 使用工具历史 - 连续对话中的工具调用
# 第一次调用
await executor.execute_from_chat_message(
target_message="查询今天的天气",
chat_history="",
sender="用户"
)
# 第二次调用时会自动包含上次的工具调用历史
await executor.execute_from_chat_message(
target_message="那明天呢?",
chat_history="",
sender="用户"
)
# 5. 获取和清除历史
history = executor.get_tool_history() # 获取历史记录
executor.clear_tool_history() # 清除历史记录
""" """

View File

@@ -1,19 +1,16 @@
""" """
聊天流印象更新工具 聊天流印象更新工具
通过LLM二步调用机制更新对聊天流如QQ群的整体印象包括主观描述、聊天风格、话题关键词和兴趣分数 直接更新对聊天流如QQ群的整体印象包括主观描述、聊天风格、话题关键词和兴趣分数
现在依赖工具调用历史记录LLM可以看到之前的调用结果因此直接覆盖更新即可
""" """
import json
from typing import Any, ClassVar from typing import Any, ClassVar
from src.common.database.api.crud import CRUDBase from src.common.database.api.crud import CRUDBase
from src.common.database.core.models import ChatStreams from src.common.database.core.models import ChatStreams
from src.common.logger import get_logger from src.common.logger import get_logger
from src.config.config import model_config
from src.llm_models.utils_model import LLMRequest
from src.plugin_system import BaseTool, ToolParamType from src.plugin_system import BaseTool, ToolParamType
from src.utils.json_parser import extract_and_parse_json
logger = get_logger("chat_stream_impression_tool") logger = get_logger("chat_stream_impression_tool")
@@ -21,9 +18,8 @@ logger = get_logger("chat_stream_impression_tool")
class ChatStreamImpressionTool(BaseTool): class ChatStreamImpressionTool(BaseTool):
"""聊天流印象更新工具 """聊天流印象更新工具
使用二步调用机制: 直接使用LLM传入的参数更新聊天流印象。
1. LLM决定是否调用工具并传入初步参数stream_id会自动传入 由于工具执行器现在支持历史记录LLM可以看到之前的调用结果因此无需再次调用LLM进行合并。
2. 工具内部调用LLM结合现有数据和传入参数决定最终更新内容
""" """
name = "update_chat_stream_impression" name = "update_chat_stream_impression"
@@ -61,33 +57,6 @@ class ChatStreamImpressionTool(BaseTool):
available_for_llm = True available_for_llm = True
history_ttl = 5 history_ttl = 5
def __init__(self, plugin_config: dict | None = None, chat_stream: Any = None):
super().__init__(plugin_config, chat_stream)
# 初始化用于二步调用的LLM
try:
self.impression_llm = LLMRequest(
model_set=model_config.model_task_config.relationship_tracker,
request_type="chat_stream_impression_update",
)
except AttributeError:
# 降级处理
available_models = [
attr
for attr in dir(model_config.model_task_config)
if not attr.startswith("_") and attr != "model_dump"
]
if available_models:
fallback_model = available_models[0]
logger.warning(f"relationship_tracker配置不存在使用降级模型: {fallback_model}")
self.impression_llm = LLMRequest(
model_set=getattr(model_config.model_task_config, fallback_model),
request_type="chat_stream_impression_update",
)
else:
logger.error("无可用的模型配置")
self.impression_llm = None
async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]: async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]:
"""执行聊天流印象更新 """执行聊天流印象更新
@@ -120,7 +89,7 @@ class ChatStreamImpressionTool(BaseTool):
new_topics = function_args.get("topic_keywords", "") new_topics = function_args.get("topic_keywords", "")
new_score = function_args.get("interest_score") new_score = function_args.get("interest_score")
# 从数据库获取现有聊天流印象 # 从数据库获取现有聊天流印象(用于返回信息)
existing_impression = await self._get_stream_impression(stream_id) existing_impression = await self._get_stream_impression(stream_id)
# 如果LLM没有传入任何有效参数返回提示 # 如果LLM没有传入任何有效参数返回提示
@@ -131,22 +100,16 @@ class ChatStreamImpressionTool(BaseTool):
"content": "提示:需要提供至少一项更新内容(印象描述、聊天风格、话题关键词或兴趣分数)", "content": "提示:需要提供至少一项更新内容(印象描述、聊天风格、话题关键词或兴趣分数)",
} }
# 用LLM进行二步决策 # 直接使用LLM传入的值进行覆盖更新(保留未更新的字段)
if self.impression_llm is None: final_impression = {
logger.error("LLM未正确初始化无法执行二步调用") "stream_impression_text": new_impression if new_impression else existing_impression.get("stream_impression_text", ""),
return {"type": "error", "id": stream_id, "content": "系统错误LLM未正确初始化"} "stream_chat_style": new_style if new_style else existing_impression.get("stream_chat_style", ""),
"stream_topic_keywords": new_topics if new_topics else existing_impression.get("stream_topic_keywords", ""),
"stream_interest_score": new_score if new_score is not None else existing_impression.get("stream_interest_score", 0.5),
}
final_impression = await self._llm_decide_final_impression( # 确保分数在有效范围内
stream_id=stream_id, final_impression["stream_interest_score"] = max(0.0, min(1.0, float(final_impression["stream_interest_score"])))
existing_impression=existing_impression,
new_impression=new_impression,
new_style=new_style,
new_topics=new_topics,
new_score=new_score,
)
if not final_impression:
return {"type": "error", "id": stream_id, "content": "LLM决策失败无法更新聊天流印象"}
# 更新数据库 # 更新数据库
await self._update_stream_impression_in_db(stream_id, final_impression) await self._update_stream_impression_in_db(stream_id, final_impression)
@@ -218,121 +181,7 @@ class ChatStreamImpressionTool(BaseTool):
"group_name": "未知", "group_name": "未知",
} }
async def _llm_decide_final_impression(
self,
stream_id: str,
existing_impression: dict[str, Any],
new_impression: str,
new_style: str,
new_topics: str,
new_score: float | None,
) -> dict[str, Any] | None:
"""使用LLM决策最终的聊天流印象内容
Args:
stream_id: 聊天流ID
existing_impression: 现有印象数据
new_impression: LLM传入的新印象
new_style: LLM传入的新风格
new_topics: LLM传入的新话题
new_score: LLM传入的新分数
Returns:
dict: 最终决定的印象数据如果失败返回None
"""
try:
# 获取bot人设
from src.individuality.individuality import Individuality
individuality = Individuality()
bot_personality = await individuality.get_personality_block()
prompt = f"""
你现在是一个有着特定性格和身份的AI助手。你的人设是{bot_personality}
你正在更新对聊天流 {stream_id} 的整体印象。
【当前聊天流信息】
- 聊天环境: {existing_impression.get("group_name", "未知")}
- 当前印象: {existing_impression.get("stream_impression_text", "暂无印象")}
- 聊天风格: {existing_impression.get("stream_chat_style", "未知")}
- 常见话题: {existing_impression.get("stream_topic_keywords", "未知")}
- 当前兴趣分: {existing_impression.get("stream_interest_score", 0.5):.2f}
【本次想要更新的内容】
- 新的印象描述: {new_impression if new_impression else "不更新"}
- 新的聊天风格: {new_style if new_style else "不更新"}
- 新的话题关键词: {new_topics if new_topics else "不更新"}
- 新的兴趣分数: {new_score if new_score is not None else "不更新"}
请综合考虑现有信息和新信息,决定最终的聊天流印象内容。注意:
1. 印象描述如果提供了新印象应该综合现有印象和新印象形成对这个聊天环境的整体认知100-200字
2. 聊天风格:如果提供了新风格,应该用简洁的词语概括,如"活跃轻松""严肃专业""幽默随性"
3. 话题关键词:如果提供了新话题,应该与现有话题合并(去重),保留最核心和频繁的话题
4. 兴趣分数如果提供了新分数需要结合现有分数合理调整0.0表示完全不感兴趣1.0表示非常感兴趣)
请以JSON格式返回最终决定
{{
"stream_impression_text": "最终的印象描述100-200字整体性的对这个聊天环境的认知",
"stream_chat_style": "最终的聊天风格,简洁概括",
"stream_topic_keywords": "最终的话题关键词,逗号分隔",
"stream_interest_score": 最终的兴趣分数0.0-1.0,
"reasoning": "你的决策理由"
}}
"""
# 调用LLM
if not self.impression_llm:
logger.info("未初始化impression_llm")
return None
llm_response, _ = await self.impression_llm.generate_response_async(prompt=prompt)
if not llm_response:
logger.warning("LLM未返回有效响应")
return None
# 使用统一的 JSON 解析工具
response_data = extract_and_parse_json(llm_response, strict=False)
if not response_data or not isinstance(response_data, dict):
logger.warning("解析LLM响应失败")
return None
# 提取最终决定的数据
final_impression = {
"stream_impression_text": response_data.get(
"stream_impression_text", existing_impression.get("stream_impression_text", "")
),
"stream_chat_style": response_data.get(
"stream_chat_style", existing_impression.get("stream_chat_style", "")
),
"stream_topic_keywords": response_data.get(
"stream_topic_keywords", existing_impression.get("stream_topic_keywords", "")
),
"stream_interest_score": max(
0.0,
min(
1.0,
float(
response_data.get(
"stream_interest_score", existing_impression.get("stream_interest_score", 0.5)
)
),
),
),
}
logger.info(f"LLM决策完成: {stream_id}")
logger.debug(f"决策理由: {response_data.get('reasoning', '')}")
return final_impression
except json.JSONDecodeError as e:
logger.error(f"LLM响应JSON解析失败: {e}")
logger.debug(f"LLM原始响应: {llm_response if 'llm_response' in locals() else 'N/A'}")
return None
except Exception as e:
logger.error(f"LLM决策失败: {e}", exc_info=True)
return None
async def _update_stream_impression_in_db(self, stream_id: str, impression: dict[str, Any]): async def _update_stream_impression_in_db(self, stream_id: str, impression: dict[str, Any]):
"""更新数据库中的聊天流印象 """更新数据库中的聊天流印象
@@ -376,18 +225,4 @@ class ChatStreamImpressionTool(BaseTool):
logger.error(f"更新聊天流印象到数据库失败: {e}", exc_info=True) logger.error(f"更新聊天流印象到数据库失败: {e}", exc_info=True)
raise raise
# 已移除自定义的 _clean_llm_json_response 方法,统一使用 src.utils.json_parser.extract_and_parse_json
def _clean_llm_json_response_deprecated(self, response: str) -> str:
"""已废弃,保留仅用于兼容性
请使用 src.utils.json_parser.extract_and_parse_json 替代
"""
from src.utils.json_parser import extract_and_parse_json
try:
import json
result = extract_and_parse_json(response, strict=False)
return json.dumps(result) if result else response
except Exception as e:
logger.warning(f"清理LLM响应失败: {e}")
return response

View File

@@ -1,22 +1,20 @@
""" """
用户画像更新工具 用户画像更新工具
通过LLM二步调用机制更新用户画像信息,包括别名、主观印象、偏好关键词和好感分数 直接更新用户画像信息,包括别名、主观印象、偏好关键词和好感分数
现在依赖工具调用历史记录LLM可以看到之前的调用结果因此直接覆盖更新即可
""" """
import time import time
from typing import Any, ClassVar from typing import Any
import orjson
from sqlalchemy import select from sqlalchemy import select
from src.common.database.compatibility import get_db_session from src.common.database.compatibility import get_db_session
from src.common.database.core.models import UserRelationships from src.common.database.core.models import UserRelationships
from src.common.logger import get_logger from src.common.logger import get_logger
from src.config.config import global_config, model_config from src.config.config import global_config
from src.llm_models.utils_model import LLMRequest
from src.plugin_system import BaseTool, ToolParamType from src.plugin_system import BaseTool, ToolParamType
from src.utils.json_parser import extract_and_parse_json
logger = get_logger("user_profile_tool") logger = get_logger("user_profile_tool")
@@ -24,14 +22,13 @@ logger = get_logger("user_profile_tool")
class UserProfileTool(BaseTool): class UserProfileTool(BaseTool):
"""用户画像更新工具 """用户画像更新工具
使用二步调用机制: 直接使用LLM传入的参数更新用户画像。
1. LLM决定是否调用工具并传入初步参数 由于工具执行器现在支持历史记录LLM可以看到之前的调用结果因此无需再次调用LLM进行合并。
2. 工具内部调用LLM结合现有数据和传入参数决定最终更新内容
""" """
name = "update_user_profile" name = "update_user_profile"
description = "当你通过聊天记录对某个用户产生了新的认识或印象时使用此工具更新该用户的画像信息。包括用户别名、你对TA的主观印象、TA的偏好兴趣、你对TA的好感程度。调用时机当你发现用户透露了新的个人信息、展现了性格特点、表达了兴趣偏好或者你们的互动让你对TA的看法发生变化时。" description = "当你通过聊天记录对某个用户产生了新的认识或印象时使用此工具更新该用户的画像信息。包括用户别名、你对TA的主观印象、TA的偏好兴趣、你对TA的好感程度。调用时机当你发现用户透露了新的个人信息、展现了性格特点、表达了兴趣偏好或者你们的互动让你对TA的看法发生变化时。"
parameters: ClassVar = [ parameters = [
("target_user_id", ToolParamType.STRING, "目标用户的ID必须", True, None), ("target_user_id", ToolParamType.STRING, "目标用户的ID必须", True, None),
("user_aliases", ToolParamType.STRING, "该用户的昵称或别名,如果发现用户自称或被他人称呼的其他名字时填写,多个别名用逗号分隔(可选)", False, None), ("user_aliases", ToolParamType.STRING, "该用户的昵称或别名,如果发现用户自称或被他人称呼的其他名字时填写,多个别名用逗号分隔(可选)", False, None),
("impression_description", ToolParamType.STRING, "你对该用户的整体印象和性格感受,例如'这个用户很幽默开朗''TA对技术很有热情'等。当你通过对话了解到用户的性格、态度、行为特点时填写(可选)", False, None), ("impression_description", ToolParamType.STRING, "你对该用户的整体印象和性格感受,例如'这个用户很幽默开朗''TA对技术很有热情'等。当你通过对话了解到用户的性格、态度、行为特点时填写(可选)", False, None),
@@ -41,32 +38,6 @@ class UserProfileTool(BaseTool):
available_for_llm = True available_for_llm = True
history_ttl = 5 history_ttl = 5
def __init__(self, plugin_config: dict | None = None, chat_stream: Any = None):
super().__init__(plugin_config, chat_stream)
# 初始化用于二步调用的LLM
try:
self.profile_llm = LLMRequest(
model_set=model_config.model_task_config.relationship_tracker,
request_type="user_profile_update"
)
except AttributeError:
# 降级处理
available_models: ClassVar = [
attr for attr in dir(model_config.model_task_config)
if not attr.startswith("_") and attr != "model_dump"
]
if available_models:
fallback_model = available_models[0]
logger.warning(f"relationship_tracker配置不存在使用降级模型: {fallback_model}")
self.profile_llm = LLMRequest(
model_set=getattr(model_config.model_task_config, fallback_model),
request_type="user_profile_update"
)
else:
logger.error("无可用的模型配置")
self.profile_llm = None
async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]: async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]:
"""执行用户画像更新 """执行用户画像更新
@@ -92,7 +63,7 @@ class UserProfileTool(BaseTool):
new_keywords = function_args.get("preference_keywords", "") new_keywords = function_args.get("preference_keywords", "")
new_score = function_args.get("affection_score") new_score = function_args.get("affection_score")
# 从数据库获取现有用户画像 # 从数据库获取现有用户画像(用于返回信息)
existing_profile = await self._get_user_profile(target_user_id) existing_profile = await self._get_user_profile(target_user_id)
# 如果LLM没有传入任何有效参数返回提示 # 如果LLM没有传入任何有效参数返回提示
@@ -103,36 +74,22 @@ class UserProfileTool(BaseTool):
"content": "提示:需要提供至少一项更新内容(别名、印象描述、偏好关键词或好感分数)" "content": "提示:需要提供至少一项更新内容(别名、印象描述、偏好关键词或好感分数)"
} }
# 用LLM进行二步决策 # 直接使用LLM传入的值进行覆盖更新(保留未更新的字段)
if self.profile_llm is None: final_profile = {
logger.error("LLM未正确初始化无法执行二步调用") "user_aliases": new_aliases if new_aliases else existing_profile.get("user_aliases", ""),
return { "relationship_text": new_impression if new_impression else existing_profile.get("relationship_text", ""),
"type": "error", "preference_keywords": new_keywords if new_keywords else existing_profile.get("preference_keywords", ""),
"id": target_user_id, "relationship_score": new_score if new_score is not None else existing_profile.get("relationship_score", global_config.affinity_flow.base_relationship_score),
"content": "系统错误LLM未正确初始化"
} }
final_profile = await self._llm_decide_final_profile( # 确保分数在有效范围内
target_user_id=target_user_id, final_profile["relationship_score"] = max(0.0, min(1.0, float(final_profile["relationship_score"])))
existing_profile=existing_profile,
new_aliases=new_aliases,
new_impression=new_impression,
new_keywords=new_keywords,
new_score=new_score
)
if not final_profile:
return {
"type": "error",
"id": target_user_id,
"content": "LLM决策失败无法更新用户画像"
}
# 更新数据库 # 更新数据库
await self._update_user_profile_in_db(target_user_id, final_profile) await self._update_user_profile_in_db(target_user_id, final_profile)
# 构建返回信息 # 构建返回信息
updates: ClassVar = [] updates = []
if final_profile.get("user_aliases"): if final_profile.get("user_aliases"):
updates.append(f"别名: {final_profile['user_aliases']}") updates.append(f"别名: {final_profile['user_aliases']}")
if final_profile.get("relationship_text"): if final_profile.get("relationship_text"):
@@ -201,97 +158,7 @@ class UserProfileTool(BaseTool):
"relationship_score": global_config.affinity_flow.base_relationship_score, "relationship_score": global_config.affinity_flow.base_relationship_score,
} }
async def _llm_decide_final_profile(
self,
target_user_id: str,
existing_profile: dict[str, Any],
new_aliases: str,
new_impression: str,
new_keywords: str,
new_score: float | None
) -> dict[str, Any] | None:
"""使用LLM决策最终的用户画像内容
Args:
target_user_id: 目标用户ID
existing_profile: 现有画像数据
new_aliases: LLM传入的新别名
new_impression: LLM传入的新印象
new_keywords: LLM传入的新关键词
new_score: LLM传入的新分数
Returns:
dict: 最终决定的画像数据如果失败返回None
"""
try:
# 获取bot人设
from src.individuality.individuality import Individuality
individuality = Individuality()
bot_personality = await individuality.get_personality_block()
prompt = f"""
你现在是一个有着特定性格和身份的AI助手。你的人设是{bot_personality}
你正在更新对用户 {target_user_id} 的画像认识。
【当前画像信息】
- 用户名: {existing_profile.get('user_name', target_user_id)}
- 已知别名: {existing_profile.get('user_aliases', '')}
- 当前印象: {existing_profile.get('relationship_text', '暂无印象')}
- 偏好关键词: {existing_profile.get('preference_keywords', '未知')}
- 当前好感分: {existing_profile.get('relationship_score', 0.3):.2f}
【本次想要更新的内容】
- 新增/更新别名: {new_aliases if new_aliases else '不更新'}
- 新的印象描述: {new_impression if new_impression else '不更新'}
- 新的偏好关键词: {new_keywords if new_keywords else '不更新'}
- 新的好感分数: {new_score if new_score is not None else '不更新'}
请综合考虑现有信息和新信息,决定最终的用户画像内容。注意:
1. 别名:如果提供了新别名,应该与现有别名合并(去重),而不是替换
2. 印象描述如果提供了新印象应该综合现有印象和新印象形成更完整的认识100-200字
3. 偏好关键词:如果提供了新关键词,应该与现有关键词合并(去重),每个关键词简短
4. 好感分数:如果提供了新分数,需要结合现有分数合理调整(变化不宜过大,遵循现实逻辑)
请以JSON格式返回最终决定
{{
"user_aliases": "最终的别名列表,逗号分隔",
"relationship_text": "最终的印象描述100-200字整体性、泛化的理解",
"preference_keywords": "最终的偏好关键词,逗号分隔",
"relationship_score": 最终的好感分数0.0-1.0,
"reasoning": "你的决策理由"
}}
"""
# 调用LLM
llm_response, _ = await self.profile_llm.generate_response_async(prompt=prompt)
if not llm_response:
logger.warning("LLM未返回有效响应")
return None
# 使用统一的 JSON 解析工具
response_data = extract_and_parse_json(llm_response, strict=False)
if not response_data or not isinstance(response_data, dict):
logger.error("LLM响应JSON解析失败")
logger.debug(f"LLM原始响应: {llm_response[:500] if llm_response else 'N/A'}")
return None
# 提取最终决定的数据
final_profile = {
"user_aliases": response_data.get("user_aliases", existing_profile.get("user_aliases", "")),
"relationship_text": response_data.get("relationship_text", existing_profile.get("relationship_text", "")),
"preference_keywords": response_data.get("preference_keywords", existing_profile.get("preference_keywords", "")),
"relationship_score": max(0.0, min(1.0, float(response_data.get("relationship_score", existing_profile.get("relationship_score", 0.3))))),
}
logger.info(f"LLM决策完成: {target_user_id}")
logger.debug(f"决策理由: {response_data.get('reasoning', '')}")
return final_profile
except Exception as e:
logger.error(f"LLM决策失败: {e}", exc_info=True)
return None
async def _update_user_profile_in_db(self, user_id: str, profile: dict[str, Any]): async def _update_user_profile_in_db(self, user_id: str, profile: dict[str, Any]):
"""更新数据库中的用户画像 """更新数据库中的用户画像
@@ -335,4 +202,4 @@ class UserProfileTool(BaseTool):
logger.error(f"更新用户画像到数据库失败: {e}", exc_info=True) logger.error(f"更新用户画像到数据库失败: {e}", exc_info=True)
raise raise
# 已移除自定义的 _clean_llm_json_response 方法,统一使用 src.utils.json_parser.extract_and_parse_json