From 357c72fc4ffa702b5e240f80a0a5b0e566217a85 Mon Sep 17 00:00:00 2001 From: SengokuCola <1026294844@qq.com> Date: Sat, 19 Apr 2025 20:34:35 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E5=B0=9D=E8=AF=95=E4=BC=98?= =?UTF-8?q?=E5=8C=96pfc=E5=A4=8D=E8=AF=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/do_tool/tool_can_use/get_memory.py | 15 +- src/heart_flow/observation.py | 2 +- src/heart_flow/sub_heartflow.py | 2 +- .../chat_module/heartFC_chat/pf_chatting.py | 28 +-- src/plugins/memory_system/Hippocampus.py | 191 +++++++++++++++++- 5 files changed, 217 insertions(+), 21 deletions(-) diff --git a/src/do_tool/tool_can_use/get_memory.py b/src/do_tool/tool_can_use/get_memory.py index ae1677006..a2921938e 100644 --- a/src/do_tool/tool_can_use/get_memory.py +++ b/src/do_tool/tool_can_use/get_memory.py @@ -14,10 +14,10 @@ class GetMemoryTool(BaseTool): parameters = { "type": "object", "properties": { - "text": {"type": "string", "description": "要查询的相关文本"}, + "topic": {"type": "string", "description": "要查询的相关主题,用逗号隔开"}, "max_memory_num": {"type": "integer", "description": "最大返回记忆数量"}, }, - "required": ["text"], + "required": ["topic"], } async def execute(self, function_args: Dict[str, Any], message_txt: str = "") -> Dict[str, Any]: @@ -31,12 +31,15 @@ class GetMemoryTool(BaseTool): Dict: 工具执行结果 """ try: - text = function_args.get("text", message_txt) + topic = function_args.get("topic", message_txt) max_memory_num = function_args.get("max_memory_num", 2) + + # 将主题字符串转换为列表 + topic_list = topic.split(",") # 调用记忆系统 - related_memory = await HippocampusManager.get_instance().get_memory_from_text( - text=text, max_memory_num=max_memory_num, max_memory_length=2, max_depth=3, fast_retrieval=False + related_memory = await HippocampusManager.get_instance().get_memory_from_topic( + valid_keywords=topic_list, max_memory_num=max_memory_num, max_memory_length=2, max_depth=3 ) memory_info = "" @@ -47,7 +50,7 @@ class GetMemoryTool(BaseTool): if memory_info: content = f"你记得这些事情: {memory_info}" else: - content = f"你不太记得有关{text}的记忆,你对此不太了解" + content = f"你不太记得有关{topic}的记忆,你对此不太了解" return {"name": "get_memory", "content": content} except Exception as e: diff --git a/src/heart_flow/observation.py b/src/heart_flow/observation.py index abb942881..8874d8914 100644 --- a/src/heart_flow/observation.py +++ b/src/heart_flow/observation.py @@ -177,7 +177,7 @@ class ChattingObservation(Observation): now_message_str += self.translate_message_list_to_str(talking_message=self.talking_message) self.now_message_info = now_message_str - logger.debug( + logger.trace( f"Chat {self.chat_id} - 压缩早期记忆:{self.mid_memory_info}\n现在聊天内容:{self.now_message_info}" ) diff --git a/src/heart_flow/sub_heartflow.py b/src/heart_flow/sub_heartflow.py index f4ff995fb..8a5a44a70 100644 --- a/src/heart_flow/sub_heartflow.py +++ b/src/heart_flow/sub_heartflow.py @@ -45,7 +45,7 @@ def init_prompt(): prompt += "现在请你根据刚刚的想法继续思考,思考时可以想想如何对群聊内容进行回复,关注新话题,可以适当转换话题,大家正在说的话才是聊天的主题。\n" prompt += "回复的要求是:平淡一些,简短一些,说中文,尽量不要说你说过的话。如果你要回复,最好只回复一个人的一个话题\n" prompt += "请注意不要输出多余内容(包括前后缀,冒号和引号,括号, 表情,等),不要带有括号和动作描写" - prompt += "现在请你继续生成你在这个聊天中的新的想法,记得结合上述的消息,不要分点输出,生成内心想法,文字不要浮夸,注意{bot_name}指的就是你。" + prompt += "现在请你继续生成你在这个聊天中的想法,不要分点输出,生成内心想法,文字不要浮夸,注意{bot_name}指的就是你。" Prompt(prompt, "sub_heartflow_prompt_before") diff --git a/src/plugins/chat_module/heartFC_chat/pf_chatting.py b/src/plugins/chat_module/heartFC_chat/pf_chatting.py index d6bfdfa29..94e51ae20 100644 --- a/src/plugins/chat_module/heartFC_chat/pf_chatting.py +++ b/src/plugins/chat_module/heartFC_chat/pf_chatting.py @@ -40,15 +40,15 @@ PLANNER_TOOL_DEFINITION = [ "action": { "type": "string", "enum": ["no_reply", "text_reply", "emoji_reply"], - "description": "决定采取的行动:'no_reply'(不回复), 'text_reply'(文本回复) 或 'emoji_reply'(表情回复)。", + "description": "决定采取的行动:'no_reply'(不回复), 'text_reply'(文本回复, 可选附带表情) 或 'emoji_reply'(仅表情回复)。", }, "reasoning": {"type": "string", "description": "做出此决定的简要理由。"}, "emoji_query": { "type": "string", - "description": '如果行动是\'emoji_reply\',则指定表情的主题或概念(例如,"开心"、"困惑")。仅在需要表情回复时提供。', + "description": '如果行动是\'emoji_reply\',指定表情的主题或概念。如果行动是\'text_reply\'且希望在文本后追加表情,也在此指定表情主题。', }, }, - "required": ["action", "reasoning"], # 强制要求提供行动和理由 + "required": ["action", "reasoning"], }, }, } @@ -713,13 +713,10 @@ class PFChatting: prompt = f"你的名字是 {global_config.BOT_NICKNAME}。是一个女大二学生,正在QQ聊天,正在决定是否以及如何回应当前的聊天。\n" # Add current mind state if available - if current_mind: - prompt += f"\n你当前的内部想法是:\n---\n{current_mind}\n---\n\n" - else: - prompt += "\n你当前没有特别的内部想法。\n" + if observed_messages: - context_text = "\n".join( + context_text = " ".join( [msg.get("detailed_plain_text", "") for msg in observed_messages if msg.get("detailed_plain_text")] ) prompt += "观察到的最新聊天内容如下:\n---\n" @@ -727,18 +724,25 @@ class PFChatting: prompt += "\n---\n" else: prompt += "当前没有观察到新的聊天内容。\n" + + prompt += "\n看了这些内容,你的想法是:" + + if current_mind: + prompt += f"\n---\n{current_mind}\n---\n\n" + prompt += ( - "\n请结合你的内部想法和观察到的聊天内容,分析情况并使用 'decide_reply_action' 工具来决定你的最终行动。\n" + "\n请结合你的内部想法和观察到的聊天内容,分析情况并使用 \'decide_reply_action\' 工具来决定你的最终行动。\n" ) prompt += "决策依据:\n" prompt += "1. 如果聊天内容无聊、与你无关、或者你的内部想法认为不适合回复,选择 'no_reply'。\n" - prompt += "2. 如果聊天内容值得回应,且适合用文字表达(参考你的内部想法),选择 'text_reply'。\n" + prompt += "2. 如果聊天内容值得回应,且适合用文字表达(参考你的内部想法),选择 'text_reply'。如果想在文字后追加一个表情,请同时提供 'emoji_query'。\n" prompt += ( "3. 如果聊天内容或你的内部想法适合用一个表情来回应,选择 'emoji_reply' 并提供表情主题 'emoji_query'。\n" ) - prompt += "4. 如果你已经回复过消息,也没有人又回复你,选择'no_reply'。" - prompt += "必须调用 'decide_reply_action' 工具并提供 'action' 和 'reasoning'。" + prompt += "4. 如果你已经回复过消息,也没有人又回复你,选择'no_reply'。\n" + prompt += "5. 除非大家都在这么做,否则不要重复聊相同的内容。\n" + prompt += "必须调用 \'decide_reply_action\' 工具并提供 \'action\' 和 \'reasoning\'。如果选择了 'emoji_reply' 或者选择了 'text_reply' 并想追加表情,则必须提供 \'emoji_query\'。" prompt = await relationship_manager.convert_all_person_sign_to_person_name(prompt) diff --git a/src/plugins/memory_system/Hippocampus.py b/src/plugins/memory_system/Hippocampus.py index 8e19f1a87..3a9c27dbd 100644 --- a/src/plugins/memory_system/Hippocampus.py +++ b/src/plugins/memory_system/Hippocampus.py @@ -63,7 +63,8 @@ def calculate_information_content(text): """计算文本的信息量(熵)""" char_count = Counter(text) total_chars = len(text) - + if total_chars == 0: + return 0 entropy = 0 for count in char_count.values(): probability = count / total_chars @@ -1256,6 +1257,174 @@ class Hippocampus: logger.info(f"选中记忆: {memory} (来自节点: {topic})") return result + + async def get_memory_from_topic( + self, + keywords: list[str], + max_memory_num: int = 3, + max_memory_length: int = 2, + max_depth: int = 3, + ) -> list: + """从文本中提取关键词并获取相关记忆。 + + Args: + topic (str): 记忆主题 + max_memory_num (int, optional): 返回的记忆条目数量上限。默认为3,表示最多返回3条与输入文本相关度最高的记忆。 + max_memory_length (int, optional): 每个主题最多返回的记忆条目数量。默认为2,表示每个主题最多返回2条相似度最高的记忆。 + max_depth (int, optional): 记忆检索深度。默认为3。值越大,检索范围越广,可以获取更多间接相关的记忆,但速度会变慢。 + + Returns: + list: 记忆列表,每个元素是一个元组 (topic, memory_items, similarity) + - topic: str, 记忆主题 + - memory_items: list, 该主题下的记忆项列表 + - similarity: float, 与文本的相似度 + """ + if not keywords: + return [] + + # logger.info(f"提取的关键词: {', '.join(keywords)}") + + # 过滤掉不存在于记忆图中的关键词 + valid_keywords = [keyword for keyword in keywords if keyword in self.memory_graph.G] + if not valid_keywords: + # logger.info("没有找到有效的关键词节点") + return [] + + logger.info(f"有效的关键词: {', '.join(valid_keywords)}") + + # 从每个关键词获取记忆 + all_memories = [] + activate_map = {} # 存储每个词的累计激活值 + + # 对每个关键词进行扩散式检索 + for keyword in valid_keywords: + logger.debug(f"开始以关键词 '{keyword}' 为中心进行扩散检索 (最大深度: {max_depth}):") + # 初始化激活值 + activation_values = {keyword: 1.0} + # 记录已访问的节点 + visited_nodes = {keyword} + # 待处理的节点队列,每个元素是(节点, 激活值, 当前深度) + nodes_to_process = [(keyword, 1.0, 0)] + + while nodes_to_process: + current_node, current_activation, current_depth = nodes_to_process.pop(0) + + # 如果激活值小于0或超过最大深度,停止扩散 + if current_activation <= 0 or current_depth >= max_depth: + continue + + # 获取当前节点的所有邻居 + neighbors = list(self.memory_graph.G.neighbors(current_node)) + + for neighbor in neighbors: + if neighbor in visited_nodes: + continue + + # 获取连接强度 + edge_data = self.memory_graph.G[current_node][neighbor] + strength = edge_data.get("strength", 1) + + # 计算新的激活值 + new_activation = current_activation - (1 / strength) + + if new_activation > 0: + activation_values[neighbor] = new_activation + visited_nodes.add(neighbor) + nodes_to_process.append((neighbor, new_activation, current_depth + 1)) + logger.trace( + f"节点 '{neighbor}' 被激活,激活值: {new_activation:.2f} (通过 '{current_node}' 连接,强度: {strength}, 深度: {current_depth + 1})" + ) # noqa: E501 + + # 更新激活映射 + for node, activation_value in activation_values.items(): + if activation_value > 0: + if node in activate_map: + activate_map[node] += activation_value + else: + activate_map[node] = activation_value + + + # 基于激活值平方的独立概率选择 + remember_map = {} + # logger.info("基于激活值平方的归一化选择:") + + # 计算所有激活值的平方和 + total_squared_activation = sum(activation**2 for activation in activate_map.values()) + if total_squared_activation > 0: + # 计算归一化的激活值 + normalized_activations = { + node: (activation**2) / total_squared_activation for node, activation in activate_map.items() + } + + # 按归一化激活值排序并选择前max_memory_num个 + sorted_nodes = sorted(normalized_activations.items(), key=lambda x: x[1], reverse=True)[:max_memory_num] + + # 将选中的节点添加到remember_map + for node, normalized_activation in sorted_nodes: + remember_map[node] = activate_map[node] # 使用原始激活值 + logger.debug( + f"节点 '{node}' (归一化激活值: {normalized_activation:.2f}, 激活值: {activate_map[node]:.2f})" + ) + else: + logger.info("没有有效的激活值") + + # 从选中的节点中提取记忆 + all_memories = [] + # logger.info("开始从选中的节点中提取记忆:") + for node, activation in remember_map.items(): + logger.debug(f"处理节点 '{node}' (激活值: {activation:.2f}):") + node_data = self.memory_graph.G.nodes[node] + memory_items = node_data.get("memory_items", []) + if not isinstance(memory_items, list): + memory_items = [memory_items] if memory_items else [] + + if memory_items: + logger.debug(f"节点包含 {len(memory_items)} 条记忆") + # 计算每条记忆与输入文本的相似度 + memory_similarities = [] + for memory in memory_items: + # 计算与输入文本的相似度 + memory_words = set(jieba.cut(memory)) + text_words = set(keywords) + all_words = memory_words | text_words + v1 = [1 if word in memory_words else 0 for word in all_words] + v2 = [1 if word in text_words else 0 for word in all_words] + similarity = cosine_similarity(v1, v2) + memory_similarities.append((memory, similarity)) + + # 按相似度排序 + memory_similarities.sort(key=lambda x: x[1], reverse=True) + # 获取最匹配的记忆 + top_memories = memory_similarities[:max_memory_length] + + # 添加到结果中 + for memory, similarity in top_memories: + all_memories.append((node, [memory], similarity)) + # logger.info(f"选中记忆: {memory} (相似度: {similarity:.2f})") + else: + logger.info("节点没有记忆") + + # 去重(基于记忆内容) + logger.debug("开始记忆去重:") + seen_memories = set() + unique_memories = [] + for topic, memory_items, activation_value in all_memories: + memory = memory_items[0] # 因为每个topic只有一条记忆 + if memory not in seen_memories: + seen_memories.add(memory) + unique_memories.append((topic, memory_items, activation_value)) + logger.debug(f"保留记忆: {memory} (来自节点: {topic}, 激活值: {activation_value:.2f})") + else: + logger.debug(f"跳过重复记忆: {memory} (来自节点: {topic})") + + # 转换为(关键词, 记忆)格式 + result = [] + for topic, memory_items, _ in unique_memories: + memory = memory_items[0] # 因为每个topic只有一条记忆 + result.append((topic, memory)) + logger.info(f"选中记忆: {memory} (来自节点: {topic})") + + return result async def get_activate_from_text(self, text: str, max_depth: int = 3, fast_retrieval: bool = False) -> float: """从文本中提取关键词并获取相关记忆。 @@ -1772,6 +1941,26 @@ class HippocampusManager: logger.error(f"文本激活记忆失败: {e}") response = [] return response + + async def get_memory_from_topic( + self, + valid_keywords: list[str], + max_memory_num: int = 3, + max_memory_length: int = 2, + max_depth: int = 3, + fast_retrieval: bool = False, + ) -> list: + """从文本中获取相关记忆的公共接口""" + if not self._initialized: + raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法") + try: + response = await self._hippocampus.get_memory_from_topic( + valid_keywords, max_memory_num, max_memory_length, max_depth, fast_retrieval + ) + except Exception as e: + logger.error(f"文本激活记忆失败: {e}") + response = [] + return response async def get_activate_from_text(self, text: str, max_depth: int = 3, fast_retrieval: bool = False) -> float: """从文本中获取激活值的公共接口"""