fix:尝试优化pfc复读

This commit is contained in:
SengokuCola
2025-04-19 20:34:35 +08:00
parent 560fb738fc
commit 357c72fc4f
5 changed files with 217 additions and 21 deletions

View File

@@ -14,10 +14,10 @@ class GetMemoryTool(BaseTool):
parameters = {
"type": "object",
"properties": {
"text": {"type": "string", "description": "要查询的相关文本"},
"topic": {"type": "string", "description": "要查询的相关主题,用逗号隔开"},
"max_memory_num": {"type": "integer", "description": "最大返回记忆数量"},
},
"required": ["text"],
"required": ["topic"],
}
async def execute(self, function_args: Dict[str, Any], message_txt: str = "") -> Dict[str, Any]:
@@ -31,12 +31,15 @@ class GetMemoryTool(BaseTool):
Dict: 工具执行结果
"""
try:
text = function_args.get("text", message_txt)
topic = function_args.get("topic", message_txt)
max_memory_num = function_args.get("max_memory_num", 2)
# 将主题字符串转换为列表
topic_list = topic.split(",")
# 调用记忆系统
related_memory = await HippocampusManager.get_instance().get_memory_from_text(
text=text, max_memory_num=max_memory_num, max_memory_length=2, max_depth=3, fast_retrieval=False
related_memory = await HippocampusManager.get_instance().get_memory_from_topic(
valid_keywords=topic_list, max_memory_num=max_memory_num, max_memory_length=2, max_depth=3
)
memory_info = ""
@@ -47,7 +50,7 @@ class GetMemoryTool(BaseTool):
if memory_info:
content = f"你记得这些事情: {memory_info}"
else:
content = f"你不太记得有关{text}的记忆,你对此不太了解"
content = f"你不太记得有关{topic}的记忆,你对此不太了解"
return {"name": "get_memory", "content": content}
except Exception as e:

View File

@@ -177,7 +177,7 @@ class ChattingObservation(Observation):
now_message_str += self.translate_message_list_to_str(talking_message=self.talking_message)
self.now_message_info = now_message_str
logger.debug(
logger.trace(
f"Chat {self.chat_id} - 压缩早期记忆:{self.mid_memory_info}\n现在聊天内容:{self.now_message_info}"
)

View File

@@ -45,7 +45,7 @@ def init_prompt():
prompt += "现在请你根据刚刚的想法继续思考,思考时可以想想如何对群聊内容进行回复,关注新话题,可以适当转换话题,大家正在说的话才是聊天的主题。\n"
prompt += "回复的要求是:平淡一些,简短一些,说中文,尽量不要说你说过的话。如果你要回复,最好只回复一个人的一个话题\n"
prompt += "请注意不要输出多余内容(包括前后缀,冒号和引号,括号, 表情,等),不要带有括号和动作描写"
prompt += "现在请你继续生成你在这个聊天中的新的想法,记得结合上述的消息,不要分点输出,生成内心想法,文字不要浮夸,注意{bot_name}指的就是你。"
prompt += "现在请你继续生成你在这个聊天中的想法,不要分点输出,生成内心想法,文字不要浮夸,注意{bot_name}指的就是你。"
Prompt(prompt, "sub_heartflow_prompt_before")

View File

@@ -40,15 +40,15 @@ PLANNER_TOOL_DEFINITION = [
"action": {
"type": "string",
"enum": ["no_reply", "text_reply", "emoji_reply"],
"description": "决定采取的行动:'no_reply'(不回复), 'text_reply'(文本回复) 或 'emoji_reply'(表情回复)。",
"description": "决定采取的行动:'no_reply'(不回复), 'text_reply'(文本回复, 可选附带表情) 或 'emoji_reply'(表情回复)。",
},
"reasoning": {"type": "string", "description": "做出此决定的简要理由。"},
"emoji_query": {
"type": "string",
"description": '如果行动是\'emoji_reply\',则指定表情的主题或概念(例如,"开心""困惑")。仅在需要表情回复时提供',
"description": '如果行动是\'emoji_reply\'指定表情的主题或概念。如果行动是\'text_reply\'且希望在文本后追加表情,也在此指定表情主题',
},
},
"required": ["action", "reasoning"], # 强制要求提供行动和理由
"required": ["action", "reasoning"],
},
},
}
@@ -713,13 +713,10 @@ class PFChatting:
prompt = f"你的名字是 {global_config.BOT_NICKNAME}。是一个女大二学生正在QQ聊天正在决定是否以及如何回应当前的聊天。\n"
# Add current mind state if available
if current_mind:
prompt += f"\n你当前的内部想法是:\n---\n{current_mind}\n---\n\n"
else:
prompt += "\n你当前没有特别的内部想法。\n"
if observed_messages:
context_text = "\n".join(
context_text = " ".join(
[msg.get("detailed_plain_text", "") for msg in observed_messages if msg.get("detailed_plain_text")]
)
prompt += "观察到的最新聊天内容如下:\n---\n"
@@ -728,17 +725,24 @@ class PFChatting:
else:
prompt += "当前没有观察到新的聊天内容。\n"
prompt += "\n看了这些内容,你的想法是:"
if current_mind:
prompt += f"\n---\n{current_mind}\n---\n\n"
prompt += (
"\n请结合你的内部想法和观察到的聊天内容,分析情况并使用 'decide_reply_action' 工具来决定你的最终行动。\n"
"\n请结合你的内部想法和观察到的聊天内容,分析情况并使用 \'decide_reply_action\' 工具来决定你的最终行动。\n"
)
prompt += "决策依据:\n"
prompt += "1. 如果聊天内容无聊、与你无关、或者你的内部想法认为不适合回复,选择 'no_reply'\n"
prompt += "2. 如果聊天内容值得回应,且适合用文字表达(参考你的内部想法),选择 'text_reply'\n"
prompt += "2. 如果聊天内容值得回应,且适合用文字表达(参考你的内部想法),选择 'text_reply'如果想在文字后追加一个表情,请同时提供 'emoji_query'\n"
prompt += (
"3. 如果聊天内容或你的内部想法适合用一个表情来回应,选择 'emoji_reply' 并提供表情主题 'emoji_query'\n"
)
prompt += "4. 如果你已经回复过消息,也没有人又回复你,选择'no_reply'"
prompt += "必须调用 'decide_reply_action' 工具并提供 'action''reasoning'"
prompt += "4. 如果你已经回复过消息,也没有人又回复你,选择'no_reply'\n"
prompt += "5. 除非大家都在这么做,否则不要重复聊相同的内容。\n"
prompt += "必须调用 \'decide_reply_action\' 工具并提供 \'action\'\'reasoning\'。如果选择了 'emoji_reply' 或者选择了 'text_reply' 并想追加表情,则必须提供 \'emoji_query\'"
prompt = await relationship_manager.convert_all_person_sign_to_person_name(prompt)

View File

@@ -63,7 +63,8 @@ def calculate_information_content(text):
"""计算文本的信息量(熵)"""
char_count = Counter(text)
total_chars = len(text)
if total_chars == 0:
return 0
entropy = 0
for count in char_count.values():
probability = count / total_chars
@@ -1257,6 +1258,174 @@ class Hippocampus:
return result
async def get_memory_from_topic(
self,
keywords: list[str],
max_memory_num: int = 3,
max_memory_length: int = 2,
max_depth: int = 3,
) -> list:
"""从文本中提取关键词并获取相关记忆。
Args:
topic (str): 记忆主题
max_memory_num (int, optional): 返回的记忆条目数量上限。默认为3表示最多返回3条与输入文本相关度最高的记忆。
max_memory_length (int, optional): 每个主题最多返回的记忆条目数量。默认为2表示每个主题最多返回2条相似度最高的记忆。
max_depth (int, optional): 记忆检索深度。默认为3。值越大检索范围越广可以获取更多间接相关的记忆但速度会变慢。
Returns:
list: 记忆列表,每个元素是一个元组 (topic, memory_items, similarity)
- topic: str, 记忆主题
- memory_items: list, 该主题下的记忆项列表
- similarity: float, 与文本的相似度
"""
if not keywords:
return []
# logger.info(f"提取的关键词: {', '.join(keywords)}")
# 过滤掉不存在于记忆图中的关键词
valid_keywords = [keyword for keyword in keywords if keyword in self.memory_graph.G]
if not valid_keywords:
# logger.info("没有找到有效的关键词节点")
return []
logger.info(f"有效的关键词: {', '.join(valid_keywords)}")
# 从每个关键词获取记忆
all_memories = []
activate_map = {} # 存储每个词的累计激活值
# 对每个关键词进行扩散式检索
for keyword in valid_keywords:
logger.debug(f"开始以关键词 '{keyword}' 为中心进行扩散检索 (最大深度: {max_depth}):")
# 初始化激活值
activation_values = {keyword: 1.0}
# 记录已访问的节点
visited_nodes = {keyword}
# 待处理的节点队列,每个元素是(节点, 激活值, 当前深度)
nodes_to_process = [(keyword, 1.0, 0)]
while nodes_to_process:
current_node, current_activation, current_depth = nodes_to_process.pop(0)
# 如果激活值小于0或超过最大深度停止扩散
if current_activation <= 0 or current_depth >= max_depth:
continue
# 获取当前节点的所有邻居
neighbors = list(self.memory_graph.G.neighbors(current_node))
for neighbor in neighbors:
if neighbor in visited_nodes:
continue
# 获取连接强度
edge_data = self.memory_graph.G[current_node][neighbor]
strength = edge_data.get("strength", 1)
# 计算新的激活值
new_activation = current_activation - (1 / strength)
if new_activation > 0:
activation_values[neighbor] = new_activation
visited_nodes.add(neighbor)
nodes_to_process.append((neighbor, new_activation, current_depth + 1))
logger.trace(
f"节点 '{neighbor}' 被激活,激活值: {new_activation:.2f} (通过 '{current_node}' 连接,强度: {strength}, 深度: {current_depth + 1})"
) # noqa: E501
# 更新激活映射
for node, activation_value in activation_values.items():
if activation_value > 0:
if node in activate_map:
activate_map[node] += activation_value
else:
activate_map[node] = activation_value
# 基于激活值平方的独立概率选择
remember_map = {}
# logger.info("基于激活值平方的归一化选择:")
# 计算所有激活值的平方和
total_squared_activation = sum(activation**2 for activation in activate_map.values())
if total_squared_activation > 0:
# 计算归一化的激活值
normalized_activations = {
node: (activation**2) / total_squared_activation for node, activation in activate_map.items()
}
# 按归一化激活值排序并选择前max_memory_num个
sorted_nodes = sorted(normalized_activations.items(), key=lambda x: x[1], reverse=True)[:max_memory_num]
# 将选中的节点添加到remember_map
for node, normalized_activation in sorted_nodes:
remember_map[node] = activate_map[node] # 使用原始激活值
logger.debug(
f"节点 '{node}' (归一化激活值: {normalized_activation:.2f}, 激活值: {activate_map[node]:.2f})"
)
else:
logger.info("没有有效的激活值")
# 从选中的节点中提取记忆
all_memories = []
# logger.info("开始从选中的节点中提取记忆:")
for node, activation in remember_map.items():
logger.debug(f"处理节点 '{node}' (激活值: {activation:.2f}):")
node_data = self.memory_graph.G.nodes[node]
memory_items = node_data.get("memory_items", [])
if not isinstance(memory_items, list):
memory_items = [memory_items] if memory_items else []
if memory_items:
logger.debug(f"节点包含 {len(memory_items)} 条记忆")
# 计算每条记忆与输入文本的相似度
memory_similarities = []
for memory in memory_items:
# 计算与输入文本的相似度
memory_words = set(jieba.cut(memory))
text_words = set(keywords)
all_words = memory_words | text_words
v1 = [1 if word in memory_words else 0 for word in all_words]
v2 = [1 if word in text_words else 0 for word in all_words]
similarity = cosine_similarity(v1, v2)
memory_similarities.append((memory, similarity))
# 按相似度排序
memory_similarities.sort(key=lambda x: x[1], reverse=True)
# 获取最匹配的记忆
top_memories = memory_similarities[:max_memory_length]
# 添加到结果中
for memory, similarity in top_memories:
all_memories.append((node, [memory], similarity))
# logger.info(f"选中记忆: {memory} (相似度: {similarity:.2f})")
else:
logger.info("节点没有记忆")
# 去重(基于记忆内容)
logger.debug("开始记忆去重:")
seen_memories = set()
unique_memories = []
for topic, memory_items, activation_value in all_memories:
memory = memory_items[0] # 因为每个topic只有一条记忆
if memory not in seen_memories:
seen_memories.add(memory)
unique_memories.append((topic, memory_items, activation_value))
logger.debug(f"保留记忆: {memory} (来自节点: {topic}, 激活值: {activation_value:.2f})")
else:
logger.debug(f"跳过重复记忆: {memory} (来自节点: {topic})")
# 转换为(关键词, 记忆)格式
result = []
for topic, memory_items, _ in unique_memories:
memory = memory_items[0] # 因为每个topic只有一条记忆
result.append((topic, memory))
logger.info(f"选中记忆: {memory} (来自节点: {topic})")
return result
async def get_activate_from_text(self, text: str, max_depth: int = 3, fast_retrieval: bool = False) -> float:
"""从文本中提取关键词并获取相关记忆。
@@ -1773,6 +1942,26 @@ class HippocampusManager:
response = []
return response
async def get_memory_from_topic(
self,
valid_keywords: list[str],
max_memory_num: int = 3,
max_memory_length: int = 2,
max_depth: int = 3,
fast_retrieval: bool = False,
) -> list:
"""从文本中获取相关记忆的公共接口"""
if not self._initialized:
raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法")
try:
response = await self._hippocampus.get_memory_from_topic(
valid_keywords, max_memory_num, max_memory_length, max_depth, fast_retrieval
)
except Exception as e:
logger.error(f"文本激活记忆失败: {e}")
response = []
return response
async def get_activate_from_text(self, text: str, max_depth: int = 3, fast_retrieval: bool = False) -> float:
"""从文本中获取激活值的公共接口"""
if not self._initialized: