diff --git a/src/chat/memory_system/Hippocampus.py b/src/chat/memory_system/Hippocampus.py index b1832f41a..0f4ea7a98 100644 --- a/src/chat/memory_system/Hippocampus.py +++ b/src/chat/memory_system/Hippocampus.py @@ -4,14 +4,14 @@ import math import random import time import re -import json import jieba import networkx as nx import numpy as np - -from itertools import combinations -from typing import List, Tuple, Coroutine, Any, Set +from typing import List, Tuple, Set, Coroutine, Any from collections import Counter +from itertools import combinations + + from rich.traceback import install from src.llm_models.utils_model import LLMRequest @@ -25,6 +25,15 @@ from src.chat.utils.chat_message_builder import ( get_raw_msg_by_timestamp_with_chat, ) # 导入 build_readable_messages from src.chat.utils.utils import translate_timestamp_to_human_readable +# 添加cosine_similarity函数 +def cosine_similarity(v1, v2): + """计算余弦相似度""" + dot_product = np.dot(v1, v2) + norm1 = np.linalg.norm(v1) + norm2 = np.linalg.norm(v2) + if norm1 == 0 or norm2 == 0: + return 0 + return dot_product / (norm1 * norm2) install(extra_lines=3) @@ -44,19 +53,18 @@ def calculate_information_content(text): return entropy -def cosine_similarity(v1, v2): # sourcery skip: assign-if-exp, reintroduce-else - """计算余弦相似度""" - dot_product = np.dot(v1, v2) - norm1 = np.linalg.norm(v1) - norm2 = np.linalg.norm(v2) - if norm1 == 0 or norm2 == 0: - return 0 - return dot_product / (norm1 * norm2) + logger = get_logger("memory") + + + + + + class MemoryGraph: def __init__(self): self.G = nx.Graph() # 使用 networkx 的图结构 @@ -83,26 +91,46 @@ class MemoryGraph: last_modified=current_time, ) # 添加最后修改时间 - def add_dot(self, concept, memory): + async def add_dot(self, concept, memory, hippocampus_instance=None): current_time = datetime.datetime.now().timestamp() if concept in self.G: if "memory_items" in self.G.nodes[concept]: - if not isinstance(self.G.nodes[concept]["memory_items"], list): - self.G.nodes[concept]["memory_items"] = [self.G.nodes[concept]["memory_items"]] - self.G.nodes[concept]["memory_items"].append(memory) + # 获取现有的记忆项(已经是str格式) + existing_memory = self.G.nodes[concept]["memory_items"] + + # 如果现有记忆不为空,则使用LLM整合新旧记忆 + if existing_memory and hippocampus_instance and hippocampus_instance.model_small: + try: + integrated_memory = await self._integrate_memories_with_llm( + existing_memory, str(memory), hippocampus_instance.model_small + ) + self.G.nodes[concept]["memory_items"] = integrated_memory + # 整合成功,增加权重 + current_weight = self.G.nodes[concept].get("weight", 0.0) + self.G.nodes[concept]["weight"] = current_weight + 1.0 + logger.debug(f"节点 {concept} 记忆整合成功,权重增加到 {current_weight + 1.0}") + except Exception as e: + logger.error(f"LLM整合记忆失败: {e}") + # 降级到简单连接 + new_memory_str = f"{existing_memory} | {memory}" + self.G.nodes[concept]["memory_items"] = new_memory_str + else: + new_memory_str = str(memory) + self.G.nodes[concept]["memory_items"] = new_memory_str else: - self.G.nodes[concept]["memory_items"] = [memory] + self.G.nodes[concept]["memory_items"] = str(memory) # 如果节点存在但没有memory_items,说明是第一次添加memory,设置created_time if "created_time" not in self.G.nodes[concept]: self.G.nodes[concept]["created_time"] = current_time # 更新最后修改时间 self.G.nodes[concept]["last_modified"] = current_time else: - # 如果是新节点,创建新的记忆列表 + # 如果是新节点,创建新的记忆字符串 self.G.add_node( concept, - memory_items=[memory], + memory_items=str(memory), + weight=1.0, # 新节点初始权重为1.0 created_time=current_time, # 添加创建时间 last_modified=current_time, ) # 添加最后修改时间 @@ -127,9 +155,8 @@ class MemoryGraph: concept, data = node_data if "memory_items" in data: memory_items = data["memory_items"] - if isinstance(memory_items, list): - first_layer_items.extend(memory_items) - else: + # 直接使用完整的记忆内容 + if memory_items: first_layer_items.append(memory_items) # 只在depth=2时获取第二层记忆 @@ -140,12 +167,57 @@ class MemoryGraph: concept, data = node_data if "memory_items" in data: memory_items = data["memory_items"] - if isinstance(memory_items, list): - second_layer_items.extend(memory_items) - else: + # 直接使用完整的记忆内容 + if memory_items: second_layer_items.append(memory_items) return first_layer_items, second_layer_items + + async def _integrate_memories_with_llm(self, existing_memory: str, new_memory: str, llm_model: LLMRequest) -> str: + """ + 使用LLM整合新旧记忆内容 + + Args: + existing_memory: 现有的记忆内容(字符串格式,可能包含多条记忆) + new_memory: 新的记忆内容 + llm_model: LLM模型实例 + + Returns: + str: 整合后的记忆内容 + """ + try: + # 构建整合提示 + integration_prompt = f"""你是一个记忆整合专家。请将以下的旧记忆和新记忆整合成一条更完整、更准确的记忆内容。 + +旧记忆内容: +{existing_memory} + +新记忆内容: +{new_memory} + +整合要求: +1. 保留重要信息,去除重复内容 +2. 如果新旧记忆有冲突,合理整合矛盾的地方 +3. 将相关信息合并,形成更完整的描述 +4. 保持语言简洁、准确 +5. 只返回整合后的记忆内容,不要添加任何解释 + +整合后的记忆:""" + + # 调用LLM进行整合 + content, (reasoning_content, model_name, tool_calls) = await llm_model.generate_response_async(integration_prompt) + + if content and content.strip(): + integrated_content = content.strip() + logger.debug(f"LLM记忆整合成功,模型: {model_name}") + return integrated_content + else: + logger.warning("LLM返回的整合结果为空,使用默认连接方式") + return f"{existing_memory} | {new_memory}" + + except Exception as e: + logger.error(f"LLM记忆整合过程中出错: {e}") + return f"{existing_memory} | {new_memory}" @property def dots(self): @@ -164,26 +236,19 @@ class MemoryGraph: if "memory_items" in node_data: memory_items = node_data["memory_items"] - # 确保memory_items是列表 - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - # 如果有记忆项可以删除 + # 既然每个节点现在是一个完整的记忆内容,直接删除整个节点 if memory_items: - # 随机选择一个记忆项删除 - removed_item = random.choice(memory_items) - memory_items.remove(removed_item) - - # 更新节点的记忆项 - if memory_items: - self.G.nodes[topic]["memory_items"] = memory_items - else: - # 如果没有记忆项了,删除整个节点 - self.G.remove_node(topic) - - return removed_item - - return None + # 删除整个节点 + self.G.remove_node(topic) + return f"删除了节点 {topic} 的完整记忆: {memory_items[:50]}..." if len(memory_items) > 50 else f"删除了节点 {topic} 的完整记忆: {memory_items}" + else: + # 如果没有记忆项,删除该节点 + self.G.remove_node(topic) + return None + else: + # 如果没有memory_items字段,删除该节点 + self.G.remove_node(topic) + return None # 海马体 @@ -205,15 +270,46 @@ class Hippocampus: def get_all_node_names(self) -> list: """获取记忆图中所有节点的名字列表""" return list(self.memory_graph.G.nodes()) + + def calculate_weighted_activation(self, current_activation: float, edge_strength: int, target_node: str) -> float: + """ + 计算考虑节点权重的激活值 + + Args: + current_activation: 当前激活值 + edge_strength: 边的强度 + target_node: 目标节点名称 + + Returns: + float: 计算后的激活值 + """ + # 基础激活值计算 + base_activation = current_activation - (1 / edge_strength) + + if base_activation <= 0: + return 0.0 + + # 获取目标节点的权重 + if target_node in self.memory_graph.G: + node_data = self.memory_graph.G.nodes[target_node] + node_weight = node_data.get("weight", 1.0) + + # 权重加成:每次整合增加10%激活值,最大加成200% + weight_multiplier = 1.0 + min((node_weight - 1.0) * 0.1, 2.0) + + return base_activation * weight_multiplier + else: + return base_activation @staticmethod def calculate_node_hash(concept, memory_items) -> int: """计算节点的特征值""" - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] + # memory_items已经是str格式,直接按分隔符分割 + if memory_items: + unique_items = {item.strip() for item in memory_items.split(" | ") if item.strip()} + else: + unique_items = set() - # 使用集合来去重,避免排序 - unique_items = {str(item) for item in memory_items} # 使用frozenset来保证顺序一致性 content = f"{concept}:{frozenset(unique_items)}" return hash(content) @@ -234,7 +330,7 @@ class Hippocampus: topic_num_str = topic_num prompt = ( - f"这是一段文字:\n{text}\n\n请你从这段话中总结出最多{topic_num_str}个关键的概念,可以是名词,动词,或者特定人物,帮我列出来," + f"这是一段文字:\n{text}\n\n请你从这段话中总结出最多{topic_num_str}个关键的概念,必须是某种概念,比如人,事,物,概念,事件,地点 等等,帮我列出来," f"将主题用逗号隔开,并加上<>,例如<主题1>,<主题2>......尽可能精简。只需要列举最多{topic_num}个话题就好,不要有序号,不要告诉我其他内容。" f"如果确定找不出主题或者没有明显主题,返回。" ) @@ -245,8 +341,8 @@ class Hippocampus: # sourcery skip: inline-immediately-returned-variable # 不再需要 time_info 参数 prompt = ( - f'这是一段文字:\n{text}\n\n我想让你基于这段文字来概括"{topic}"这个概念,帮我总结成一句自然的话,' - f"要求包含对这个概念的定义,内容,知识,但是这些信息必须来自这段文字,不能添加信息。\n,请包含时间和人物。只输出这句话就好" + f'这是一段文字:\n{text}\n\n我想让你基于这段文字来概括"{topic}"这个概念,帮我总结成几句自然的话,' + f"要求包含对这个概念的定义,内容,知识,时间和人物,这些信息必须来自这段文字,不能添加信息。\n只输出几句自然的话就好" ) return prompt @@ -271,9 +367,9 @@ class Hippocampus: max_depth (int, optional): 记忆检索深度,默认为2。1表示只获取直接相关的记忆,2表示获取间接相关的记忆。 Returns: - list: 记忆列表,每个元素是一个元组 (topic, memory_items, similarity) + list: 记忆列表,每个元素是一个元组 (topic, memory_content, similarity) - topic: str, 记忆主题 - - memory_items: list, 该主题下的记忆项列表 + - memory_content: str, 该主题下的完整记忆内容 - similarity: float, 与关键词的相似度 """ if not keyword: @@ -297,11 +393,10 @@ class Hippocampus: # 如果相似度超过阈值,获取该节点的记忆 if similarity >= 0.3: # 可以调整这个阈值 node_data = self.memory_graph.G.nodes[node] - memory_items = node_data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - memories.append((node, memory_items, similarity)) + memory_items = node_data.get("memory_items", "") + # 直接使用完整的记忆内容 + if memory_items: + memories.append((node, memory_items, similarity)) # 按相似度降序排序 memories.sort(key=lambda x: x[2], reverse=True) @@ -378,10 +473,9 @@ class Hippocampus: 如果为False,使用LLM提取关键词,速度较慢但更准确。 Returns: - list: 记忆列表,每个元素是一个元组 (topic, memory_items, similarity) + list: 记忆列表,每个元素是一个元组 (topic, memory_content) - topic: str, 记忆主题 - - memory_items: list, 该主题下的记忆项列表 - - similarity: float, 与文本的相似度 + - memory_content: str, 该主题下的完整记忆内容 """ keywords = await self.get_keywords_from_text(text) @@ -478,31 +572,22 @@ class Hippocampus: for node, activation in remember_map.items(): logger.debug(f"处理节点 '{node}' (激活值: {activation:.2f}):") node_data = self.memory_graph.G.nodes[node] - memory_items = node_data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - + memory_items = node_data.get("memory_items", "") + # 直接使用完整的记忆内容 if memory_items: - logger.debug(f"节点包含 {len(memory_items)} 条记忆") - # 计算每条记忆与输入文本的相似度 - memory_similarities = [] - for memory in memory_items: - # 计算与输入文本的相似度 - memory_words = set(jieba.cut(memory)) - text_words = set(jieba.cut(text)) - all_words = memory_words | text_words + logger.debug("节点包含完整记忆") + # 计算记忆与输入文本的相似度 + memory_words = set(jieba.cut(memory_items)) + text_words = set(jieba.cut(text)) + all_words = memory_words | text_words + if all_words: + # 计算相似度(虽然这里没有使用,但保持逻辑一致性) v1 = [1 if word in memory_words else 0 for word in all_words] v2 = [1 if word in text_words else 0 for word in all_words] - similarity = cosine_similarity(v1, v2) - memory_similarities.append((memory, similarity)) - - # 按相似度排序 - memory_similarities.sort(key=lambda x: x[1], reverse=True) - # 获取最匹配的记忆 - top_memories = memory_similarities[:max_memory_length] - - # 添加到结果中 - all_memories.extend((node, [memory], similarity) for memory, similarity in top_memories) + _ = cosine_similarity(v1, v2) # 计算但不使用,用_表示 + + # 添加完整记忆到结果中 + all_memories.append((node, memory_items, activation)) else: logger.info("节点没有记忆") @@ -511,7 +596,8 @@ class Hippocampus: seen_memories = set() unique_memories = [] for topic, memory_items, activation_value in all_memories: - memory = memory_items[0] # 因为每个topic只有一条记忆 + # memory_items现在是完整的字符串格式 + memory = memory_items if memory_items else "" if memory not in seen_memories: seen_memories.add(memory) unique_memories.append((topic, memory_items, activation_value)) @@ -522,7 +608,8 @@ class Hippocampus: # 转换为(关键词, 记忆)格式 result = [] for topic, memory_items, _ in unique_memories: - memory = memory_items[0] # 因为每个topic只有一条记忆 + # memory_items现在是完整的字符串格式 + memory = memory_items if memory_items else "" result.append((topic, memory)) logger.debug(f"选中记忆: {memory} (来自节点: {topic})") @@ -544,10 +631,9 @@ class Hippocampus: max_depth (int, optional): 记忆检索深度。默认为3。值越大,检索范围越广,可以获取更多间接相关的记忆,但速度会变慢。 Returns: - list: 记忆列表,每个元素是一个元组 (topic, memory_items, similarity) + list: 记忆列表,每个元素是一个元组 (topic, memory_content) - topic: str, 记忆主题 - - memory_items: list, 该主题下的记忆项列表 - - similarity: float, 与文本的相似度 + - memory_content: str, 该主题下的完整记忆内容 """ if not keywords: return [] @@ -642,31 +728,22 @@ class Hippocampus: for node, activation in remember_map.items(): logger.debug(f"处理节点 '{node}' (激活值: {activation:.2f}):") node_data = self.memory_graph.G.nodes[node] - memory_items = node_data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - + memory_items = node_data.get("memory_items", "") + # 直接使用完整的记忆内容 if memory_items: - logger.debug(f"节点包含 {len(memory_items)} 条记忆") - # 计算每条记忆与输入文本的相似度 - memory_similarities = [] - for memory in memory_items: - # 计算与输入文本的相似度 - memory_words = set(jieba.cut(memory)) - text_words = set(keywords) - all_words = memory_words | text_words + logger.debug("节点包含完整记忆") + # 计算记忆与关键词的相似度 + memory_words = set(jieba.cut(memory_items)) + text_words = set(keywords) + all_words = memory_words | text_words + if all_words: + # 计算相似度(虽然这里没有使用,但保持逻辑一致性) v1 = [1 if word in memory_words else 0 for word in all_words] v2 = [1 if word in text_words else 0 for word in all_words] - similarity = cosine_similarity(v1, v2) - memory_similarities.append((memory, similarity)) - - # 按相似度排序 - memory_similarities.sort(key=lambda x: x[1], reverse=True) - # 获取最匹配的记忆 - top_memories = memory_similarities[:max_memory_length] - - # 添加到结果中 - all_memories.extend((node, [memory], similarity) for memory, similarity in top_memories) + _ = cosine_similarity(v1, v2) # 计算但不使用,用_表示 + + # 添加完整记忆到结果中 + all_memories.append((node, memory_items, activation)) else: logger.info("节点没有记忆") @@ -675,7 +752,8 @@ class Hippocampus: seen_memories = set() unique_memories = [] for topic, memory_items, activation_value in all_memories: - memory = memory_items[0] # 因为每个topic只有一条记忆 + # memory_items现在是完整的字符串格式 + memory = memory_items if memory_items else "" if memory not in seen_memories: seen_memories.add(memory) unique_memories.append((topic, memory_items, activation_value)) @@ -686,7 +764,8 @@ class Hippocampus: # 转换为(关键词, 记忆)格式 result = [] for topic, memory_items, _ in unique_memories: - memory = memory_items[0] # 因为每个topic只有一条记忆 + # memory_items现在是完整的字符串格式 + memory = memory_items if memory_items else "" result.append((topic, memory)) logger.debug(f"选中记忆: {memory} (来自节点: {topic})") @@ -894,11 +973,10 @@ class EntorhinalCortex: self.memory_graph.G.remove_node(concept) continue - memory_items = data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - if not memory_items: + memory_items = data.get("memory_items", "") + + # 直接检查字符串是否为空,不需要分割成列表 + if not memory_items or memory_items.strip() == "": self.memory_graph.G.remove_node(concept) continue @@ -907,21 +985,19 @@ class EntorhinalCortex: created_time = data.get("created_time", current_time) last_modified = data.get("last_modified", current_time) - # 将memory_items转换为JSON字符串 - try: - memory_items = [str(item) for item in memory_items] - memory_items_json = json.dumps(memory_items, ensure_ascii=False) - if not memory_items_json: - continue - except Exception: - self.memory_graph.G.remove_node(concept) + # memory_items直接作为字符串存储,不需要JSON序列化 + if not memory_items: continue + # 获取权重属性 + weight = data.get("weight", 1.0) + if concept not in db_nodes: nodes_to_create.append( { "concept": concept, - "memory_items": memory_items_json, + "memory_items": memory_items, + "weight": weight, "hash": memory_hash, "created_time": created_time, "last_modified": last_modified, @@ -933,7 +1009,8 @@ class EntorhinalCortex: nodes_to_update.append( { "concept": concept, - "memory_items": memory_items_json, + "memory_items": memory_items, + "weight": weight, "hash": memory_hash, "last_modified": last_modified, } @@ -1031,8 +1108,8 @@ class EntorhinalCortex: GraphEdges.delete().where((GraphEdges.source == source) & (GraphEdges.target == target)).execute() end_time = time.time() - logger.info(f"[同步] 总耗时: {end_time - start_time:.2f}秒") - logger.info(f"[同步] 同步了 {len(memory_nodes)} 个节点和 {len(memory_edges)} 条边") + logger.info(f"[数据库] 同步完成,总耗时: {end_time - start_time:.2f}秒") + logger.info(f"[数据库] 同步了 {len(nodes_to_create) + len(nodes_to_update)} 个节点和 {len(edges_to_create) + len(edges_to_update)} 条边") async def resync_memory_to_db(self): """清空数据库并重新同步所有记忆数据""" @@ -1054,27 +1131,43 @@ class EntorhinalCortex: # 批量准备节点数据 nodes_data = [] for concept, data in memory_nodes: - memory_items = data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - try: - memory_items = [str(item) for item in memory_items] - if memory_items_json := json.dumps(memory_items, ensure_ascii=False): - nodes_data.append( - { - "concept": concept, - "memory_items": memory_items_json, - "hash": self.hippocampus.calculate_node_hash(concept, memory_items), - "created_time": data.get("created_time", current_time), - "last_modified": data.get("last_modified", current_time), - } - ) - - except Exception as e: - logger.error(f"准备节点 {concept} 数据时发生错误: {e}") + memory_items = data.get("memory_items", "") + + # 直接检查字符串是否为空,不需要分割成列表 + if not memory_items or memory_items.strip() == "": + self.memory_graph.G.remove_node(concept) continue + # 计算内存中节点的特征值 + memory_hash = self.hippocampus.calculate_node_hash(concept, memory_items) + created_time = data.get("created_time", current_time) + last_modified = data.get("last_modified", current_time) + + # memory_items直接作为字符串存储,不需要JSON序列化 + if not memory_items: + continue + + # 获取权重属性 + weight = data.get("weight", 1.0) + + nodes_data.append( + { + "concept": concept, + "memory_items": memory_items, + "weight": weight, + "hash": memory_hash, + "created_time": created_time, + "last_modified": last_modified, + } + ) + + # 批量插入节点 + if nodes_data: + batch_size = 100 + for i in range(0, len(nodes_data), batch_size): + batch = nodes_data[i : i + batch_size] + GraphNodes.insert_many(batch).execute() + # 批量准备边数据 edges_data = [] for source, target, data in memory_edges: @@ -1093,27 +1186,12 @@ class EntorhinalCortex: logger.error(f"准备边 {source}-{target} 数据时发生错误: {e}") continue - # 使用事务批量写入节点 - node_start = time.time() - if nodes_data: - batch_size = 500 # 增加批量大小 - with GraphNodes._meta.database.atomic(): # type: ignore - for i in range(0, len(nodes_data), batch_size): - batch = nodes_data[i : i + batch_size] - GraphNodes.insert_many(batch).execute() - node_end = time.time() - logger.info(f"[数据库] 写入 {len(nodes_data)} 个节点耗时: {node_end - node_start:.2f}秒") - - # 使用事务批量写入边 - edge_start = time.time() + # 批量插入边 if edges_data: - batch_size = 500 # 增加批量大小 - with GraphEdges._meta.database.atomic(): # type: ignore - for i in range(0, len(edges_data), batch_size): - batch = edges_data[i : i + batch_size] - GraphEdges.insert_many(batch).execute() - edge_end = time.time() - logger.info(f"[数据库] 写入 {len(edges_data)} 条边耗时: {edge_end - edge_start:.2f}秒") + batch_size = 100 + for i in range(0, len(edges_data), batch_size): + batch = edges_data[i : i + batch_size] + GraphEdges.insert_many(batch).execute() end_time = time.time() logger.info(f"[数据库] 重新同步完成,总耗时: {end_time - start_time:.2f}秒") @@ -1126,19 +1204,30 @@ class EntorhinalCortex: # 清空当前图 self.memory_graph.G.clear() + + # 统计加载情况 + total_nodes = 0 + loaded_nodes = 0 + skipped_nodes = 0 # 从数据库加载所有节点 nodes = list(GraphNodes.select()) + total_nodes = len(nodes) + for node in nodes: concept = node.concept try: - memory_items = json.loads(node.memory_items) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] + # 处理空字符串或None的情况 + if not node.memory_items or node.memory_items.strip() == "": + logger.warning(f"节点 {concept} 的memory_items为空,跳过") + skipped_nodes += 1 + continue + + # 直接使用memory_items + memory_items = node.memory_items.strip() # 检查时间字段是否存在 if not node.created_time or not node.last_modified: - need_update = True # 更新数据库中的节点 update_data = {} if not node.created_time: @@ -1146,18 +1235,24 @@ class EntorhinalCortex: if not node.last_modified: update_data["last_modified"] = current_time - GraphNodes.update(**update_data).where(GraphNodes.concept == concept).execute() + if update_data: + GraphNodes.update(**update_data).where(GraphNodes.concept == concept).execute() # 获取时间信息(如果不存在则使用当前时间) created_time = node.created_time or current_time last_modified = node.last_modified or current_time + # 获取权重属性 + weight = node.weight if hasattr(node, 'weight') and node.weight is not None else 1.0 + # 添加节点到图中 self.memory_graph.G.add_node( - concept, memory_items=memory_items, created_time=created_time, last_modified=last_modified + concept, memory_items=memory_items, weight=weight, created_time=created_time, last_modified=last_modified ) + loaded_nodes += 1 except Exception as e: logger.error(f"加载节点 {concept} 时发生错误: {e}") + skipped_nodes += 1 continue # 从数据库加载所有边 @@ -1193,6 +1288,9 @@ class EntorhinalCortex: if need_update: logger.info("[数据库] 已为缺失的时间字段进行补充") + + # 输出加载统计信息 + logger.info(f"[数据库] 记忆加载完成: 总计 {total_nodes} 个节点, 成功加载 {loaded_nodes} 个, 跳过 {skipped_nodes} 个") # 负责整合,遗忘,合并记忆 @@ -1338,7 +1436,7 @@ class ParahippocampalGyrus: all_added_nodes.extend(topic for topic, _ in compressed_memory) for topic, memory in compressed_memory: - self.memory_graph.add_dot(topic, memory) + await self.memory_graph.add_dot(topic, memory, self.hippocampus) all_topics.append(topic) if topic in similar_topics_dict: @@ -1458,12 +1556,9 @@ class ParahippocampalGyrus: node_data = self.memory_graph.G.nodes[node] # 首先获取记忆项 - memory_items = node_data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - # 新增:检查节点是否为空 - if not memory_items: + memory_items = node_data.get("memory_items", "") + # 直接检查记忆内容是否为空 + if not memory_items or memory_items.strip() == "": try: self.memory_graph.G.remove_node(node) node_changes["removed"].append(f"{node}(空节点)") # 标记为空节点移除 @@ -1474,31 +1569,24 @@ class ParahippocampalGyrus: # --- 如果节点不为空,则执行原来的不活跃检查和随机移除逻辑 --- last_modified = node_data.get("last_modified", current_time) - # 条件1:检查是否长时间未修改 (超过24小时) - if current_time - last_modified > 3600 * 24 and memory_items: - current_count = len(memory_items) - # 如果列表非空,才进行随机选择 - if current_count > 0: - removed_item = random.choice(memory_items) - try: - memory_items.remove(removed_item) - - # 条件3:检查移除后 memory_items 是否变空 - if memory_items: # 如果移除后列表不为空 - # self.memory_graph.G.nodes[node]["memory_items"] = memory_items # 直接修改列表即可 - self.memory_graph.G.nodes[node]["last_modified"] = current_time # 更新修改时间 - node_changes["reduced"].append(f"{node} (数量: {current_count} -> {len(memory_items)})") - else: # 如果移除后列表为空 - # 尝试移除节点,处理可能的错误 - try: - self.memory_graph.G.remove_node(node) - node_changes["removed"].append(f"{node}(遗忘清空)") # 标记为遗忘清空 - logger.debug(f"[遗忘] 节点 {node} 因移除最后一项而被清空。") - except nx.NetworkXError as e: - logger.warning(f"[遗忘] 尝试移除节点 {node} 时发生错误(可能已被移除):{e}") - except ValueError: - # 这个错误理论上不应发生,因为 removed_item 来自 memory_items - logger.warning(f"[遗忘] 尝试从节点 '{node}' 移除不存在的项目 '{removed_item[:30]}...'") + node_weight = node_data.get("weight", 1.0) + + # 条件1:检查是否长时间未修改 (使用配置的遗忘时间) + time_threshold = 3600 * global_config.memory.memory_forget_time + + # 基于权重调整遗忘阈值:权重越高,需要更长时间才能被遗忘 + # 权重为1时使用默认阈值,权重越高阈值越大(越难遗忘) + adjusted_threshold = time_threshold * node_weight + + if current_time - last_modified > adjusted_threshold and memory_items: + # 既然每个节点现在是完整记忆,直接删除整个节点 + try: + self.memory_graph.G.remove_node(node) + node_changes["removed"].append(f"{node}(长时间未修改,权重{node_weight:.1f})") + logger.debug(f"[遗忘] 移除了长时间未修改的节点: {node} (权重: {node_weight:.1f})") + except nx.NetworkXError as e: + logger.warning(f"[遗忘] 移除节点 {node} 时发生错误(可能已被移除): {e}") + continue node_check_end = time.time() logger.info(f"[遗忘] 节点检查耗时: {node_check_end - node_check_start:.2f}秒") @@ -1537,118 +1625,7 @@ class ParahippocampalGyrus: end_time = time.time() logger.info(f"[遗忘] 总耗时: {end_time - start_time:.2f}秒") - async def operation_consolidate_memory(self): - """整合记忆:合并节点内相似的记忆项""" - start_time = time.time() - percentage = global_config.memory.consolidate_memory_percentage - similarity_threshold = global_config.memory.consolidation_similarity_threshold - logger.info(f"[整合] 开始检查记忆节点... 检查比例: {percentage:.2%}, 合并阈值: {similarity_threshold}") - # 获取所有至少有2条记忆项的节点 - eligible_nodes = [] - for node, data in self.memory_graph.G.nodes(data=True): - memory_items = data.get("memory_items", []) - if isinstance(memory_items, list) and len(memory_items) >= 2: - eligible_nodes.append(node) - - if not eligible_nodes: - logger.info("[整合] 没有找到包含多个记忆项的节点,无需整合。") - return - - # 计算需要检查的节点数量 - check_nodes_count = max(1, min(len(eligible_nodes), int(len(eligible_nodes) * percentage))) - - # 随机抽取节点进行检查 - try: - nodes_to_check = random.sample(eligible_nodes, check_nodes_count) - except ValueError as e: - logger.error(f"[整合] 抽样节点时出错: {e}") - return - - logger.info(f"[整合] 将检查 {len(nodes_to_check)} / {len(eligible_nodes)} 个符合条件的节点。") - - merged_count = 0 - nodes_modified = set() - current_timestamp = datetime.datetime.now().timestamp() - - for node in nodes_to_check: - node_data = self.memory_graph.G.nodes[node] - memory_items = node_data.get("memory_items", []) - if not isinstance(memory_items, list) or len(memory_items) < 2: - continue # 双重检查,理论上不会进入 - - items_copy = list(memory_items) # 创建副本以安全迭代和修改 - - # 遍历所有记忆项组合 - for item1, item2 in combinations(items_copy, 2): - # 确保 item1 和 item2 仍然存在于原始列表中(可能已被之前的合并移除) - if item1 not in memory_items or item2 not in memory_items: - continue - - similarity = self._calculate_item_similarity(item1, item2) - - if similarity >= similarity_threshold: - logger.debug(f"[整合] 节点 '{node}' 中发现相似项 (相似度: {similarity:.2f}):") - logger.debug(f" - '{item1}'") - logger.debug(f" - '{item2}'") - - # 比较信息量 - info1 = calculate_information_content(item1) - info2 = calculate_information_content(item2) - - if info1 >= info2: - item_to_keep = item1 - item_to_remove = item2 - else: - item_to_keep = item2 - item_to_remove = item1 - - # 从原始列表中移除信息量较低的项 - try: - memory_items.remove(item_to_remove) - logger.info( - f"[整合] 已合并节点 '{node}' 中的记忆,保留: '{item_to_keep[:60]}...', 移除: '{item_to_remove[:60]}...'" - ) - merged_count += 1 - nodes_modified.add(node) - node_data["last_modified"] = current_timestamp # 更新修改时间 - _merged_in_this_node = True - break # 每个节点每次检查只合并一对 - except ValueError: - # 如果项已经被移除(例如,在之前的迭代中作为 item_to_keep),则跳过 - logger.warning( - f"[整合] 尝试移除节点 '{node}' 中不存在的项 '{item_to_remove[:30]}...',可能已被合并。" - ) - continue - # # 如果节点内发生了合并,更新节点数据 (这种方式不安全,会丢失其他属性) - # if merged_in_this_node: - # self.memory_graph.G.nodes[node]["memory_items"] = memory_items - - if merged_count > 0: - logger.info(f"[整合] 共合并了 {merged_count} 对相似记忆项,分布在 {len(nodes_modified)} 个节点中。") - sync_start = time.time() - logger.info("[整合] 开始将变更同步到数据库...") - # 使用 resync 更安全地处理删除和添加 - await self.hippocampus.entorhinal_cortex.resync_memory_to_db() - sync_end = time.time() - logger.info(f"[整合] 数据库同步耗时: {sync_end - sync_start:.2f}秒") - else: - logger.info("[整合] 本次检查未发现需要合并的记忆项。") - - end_time = time.time() - logger.info(f"[整合] 整合检查完成,总耗时: {end_time - start_time:.2f}秒") - - @staticmethod - def _calculate_item_similarity(item1: str, item2: str) -> float: - """计算两条记忆项文本的余弦相似度""" - words1 = set(jieba.cut(item1)) - words2 = set(jieba.cut(item2)) - all_words = words1 | words2 - if not all_words: - return 0.0 - v1 = [1 if word in words1 else 0 for word in all_words] - v2 = [1 if word in words2 else 0 for word in all_words] - return cosine_similarity(v1, v2) class HippocampusManager: @@ -1698,13 +1675,7 @@ class HippocampusManager: raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法") return await self._hippocampus.parahippocampal_gyrus.operation_forget_topic(percentage) - async def consolidate_memory(self): - """整合记忆的公共接口""" - if not self._initialized: - raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法") - # 注意:目前 operation_consolidate_memory 内部直接读取配置,percentage 参数暂时无效 - # 如果需要外部控制比例,需要修改 operation_consolidate_memory - return await self._hippocampus.parahippocampal_gyrus.operation_consolidate_memory() + async def get_memory_from_text( self, @@ -1768,3 +1739,5 @@ class HippocampusManager: # 创建全局实例 hippocampus_manager = HippocampusManager() + + diff --git a/src/chat/memory_system/memory_activator.py b/src/chat/memory_system/memory_activator.py index d3cbb5d75..9067c6a20 100644 --- a/src/chat/memory_system/memory_activator.py +++ b/src/chat/memory_system/memory_activator.py @@ -1,15 +1,15 @@ -import difflib import json from json_repair import repair_json from typing import List, Dict -from datetime import datetime + from src.llm_models.utils_model import LLMRequest from src.config.config import global_config, model_config from src.common.logger import get_logger -from src.chat.utils.prompt_builder import Prompt, global_prompt_manager +from src.chat.utils.prompt_builder import Prompt from src.chat.memory_system.Hippocampus import hippocampus_manager +from src.chat.utils.utils import parse_keywords_string logger = get_logger("memory_activator") @@ -68,8 +68,6 @@ class MemoryActivator: request_type="memory.activator", ) - self.running_memory = [] - self.cached_keywords = set() # 用于缓存历史关键词 async def activate_memory_with_chat_history(self, target_message, chat_history_prompt) -> List[Dict]: """ @@ -78,67 +76,31 @@ class MemoryActivator: # 如果记忆系统被禁用,直接返回空列表 if not global_config.memory.enable_memory: return [] - - # 将缓存的关键词转换为字符串,用于prompt - cached_keywords_str = ", ".join(self.cached_keywords) if self.cached_keywords else "暂无历史关键词" - - prompt = await global_prompt_manager.format_prompt( - "memory_activator_prompt", - obs_info_text=chat_history_prompt, - target_message=target_message, - cached_keywords=cached_keywords_str, - ) - - # logger.debug(f"prompt: {prompt}") - - response, (reasoning_content, model_name, _) = await self.key_words_model.generate_response_async( - prompt, temperature=0.5 - ) - - keywords = list(get_keywords_from_json(response)) - - # 更新关键词缓存 - if keywords: - # 限制缓存大小,最多保留10个关键词 - if len(self.cached_keywords) > 10: - # 转换为列表,移除最早的关键词 - cached_list = list(self.cached_keywords) - self.cached_keywords = set(cached_list[-8:]) - - # 添加新的关键词到缓存 - self.cached_keywords.update(keywords) - - # 调用记忆系统获取相关记忆 + + keywords_list = set() + + for msg in chat_history_prompt: + keywords = parse_keywords_string(msg.get("key_words", "")) + if keywords: + if len(keywords_list) < 30: + # 最多容纳30个关键词 + keywords_list.update(keywords) + print(keywords_list) + else: + break + + if not keywords_list: + return [] + related_memory = await hippocampus_manager.get_memory_from_topic( - valid_keywords=keywords, max_memory_num=3, max_memory_length=2, max_depth=3 + valid_keywords=list(keywords_list), max_memory_num=10, max_memory_length=3, max_depth=3 ) + - logger.debug(f"当前记忆关键词: {self.cached_keywords} ") - logger.debug(f"获取到的记忆: {related_memory}") + logger.info(f"当前记忆关键词: {keywords_list} ") + logger.info(f"获取到的记忆: {related_memory}") - # 激活时,所有已有记忆的duration+1,达到3则移除 - for m in self.running_memory[:]: - m["duration"] = m.get("duration", 1) + 1 - self.running_memory = [m for m in self.running_memory if m["duration"] < 3] - - if related_memory: - for topic, memory in related_memory: - # 检查是否已存在相同topic或相似内容(相似度>=0.7)的记忆 - exists = any( - m["topic"] == topic or difflib.SequenceMatcher(None, m["content"], memory).ratio() >= 0.7 - for m in self.running_memory - ) - if not exists: - self.running_memory.append( - {"topic": topic, "content": memory, "timestamp": datetime.now().isoformat(), "duration": 1} - ) - logger.debug(f"添加新记忆: {topic} - {memory}") - - # 限制同时加载的记忆条数,最多保留最后3条 - if len(self.running_memory) > 3: - self.running_memory = self.running_memory[-3:] - - return self.running_memory + return related_memory init_prompt() diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 3610cc9b1..f3be85a9a 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -181,6 +181,7 @@ class DefaultReplyer: """ prompt = None + selected_expressions = None if available_actions is None: available_actions = {} try: @@ -345,7 +346,7 @@ class DefaultReplyer: return f"{expression_habits_title}\n{expression_habits_block}", selected_ids - async def build_memory_block(self, chat_history: str, target: str) -> str: + async def build_memory_block(self, chat_history: List[Dict[str, Any]], target: str) -> str: """构建记忆块 Args: @@ -355,6 +356,16 @@ class DefaultReplyer: Returns: str: 记忆信息字符串 """ + chat_talking_prompt_short = build_readable_messages( + chat_history, + replace_bot_name=True, + merge_messages=False, + timestamp_mode="relative", + read_mark=0.0, + show_actions=True, + ) + + if not global_config.memory.enable_memory: return "" @@ -363,6 +374,7 @@ class DefaultReplyer: running_memories = await self.memory_activator.activate_memory_with_chat_history( target_message=target, chat_history_prompt=chat_history ) + if global_config.memory.enable_instant_memory: asyncio.create_task(self.instant_memory.create_and_store_memory(chat_history)) @@ -373,9 +385,11 @@ class DefaultReplyer: if not running_memories: return "" + memory_str = "以下是当前在聊天中,你回忆起的记忆:\n" for running_memory in running_memories: - memory_str += f"- {running_memory['content']}\n" + keywords,content = running_memory + memory_str += f"- {keywords}:{content}\n" if instant_memory: memory_str += f"- {instant_memory}\n" @@ -731,7 +745,7 @@ class DefaultReplyer: self.build_expression_habits(chat_talking_prompt_short, target), "expression_habits" ), self._time_and_run_task(self.build_relation_info(sender, target), "relation_info"), - self._time_and_run_task(self.build_memory_block(chat_talking_prompt_short, target), "memory_block"), + self._time_and_run_task(self.build_memory_block(message_list_before_short, target), "memory_block"), self._time_and_run_task( self.build_tool_info(chat_talking_prompt_short, sender, target, enable_tool=enable_tool), "tool_info" ), diff --git a/src/chat/utils/utils.py b/src/chat/utils/utils.py index aefc694e5..6c97be0ba 100644 --- a/src/chat/utils/utils.py +++ b/src/chat/utils/utils.py @@ -642,7 +642,7 @@ def get_chat_type_and_target_info(chat_id: str) -> Tuple[bool, Optional[Dict]]: person = Person(platform=platform, user_id=user_id) if not person.is_known: logger.warning(f"用户 {user_info.user_nickname} 尚未认识") - return False, None + return False, None person_id = person.person_id person_name = None if person_id: @@ -768,3 +768,68 @@ def assign_message_ids_flexible( # # 增强版本 - 使用时间戳 # result3 = assign_message_ids_flexible(messages, prefix="ts", use_timestamp=True) # # 结果: [{'id': 'ts123a1b', 'message': 'Hello'}, {'id': 'ts123c2d', 'message': 'World'}, {'id': 'ts123e3f', 'message': 'Test message'}] + +def parse_keywords_string(keywords_input) -> list[str]: + """ + 统一的关键词解析函数,支持多种格式的关键词字符串解析 + + 支持的格式: + 1. 字符串列表格式:'["utils.py", "修改", "代码", "动作"]' + 2. 斜杠分隔格式:'utils.py/修改/代码/动作' + 3. 逗号分隔格式:'utils.py,修改,代码,动作' + 4. 空格分隔格式:'utils.py 修改 代码 动作' + 5. 已经是列表的情况:["utils.py", "修改", "代码", "动作"] + 6. JSON格式字符串:'{"keywords": ["utils.py", "修改", "代码", "动作"]}' + + Args: + keywords_input: 关键词输入,可以是字符串或列表 + + Returns: + list[str]: 解析后的关键词列表,去除空白项 + """ + if not keywords_input: + return [] + + # 如果已经是列表,直接处理 + if isinstance(keywords_input, list): + return [str(k).strip() for k in keywords_input if str(k).strip()] + + # 转换为字符串处理 + keywords_str = str(keywords_input).strip() + if not keywords_str: + return [] + + try: + # 尝试作为JSON对象解析(支持 {"keywords": [...]} 格式) + import json + json_data = json.loads(keywords_str) + if isinstance(json_data, dict) and "keywords" in json_data: + keywords_list = json_data["keywords"] + if isinstance(keywords_list, list): + return [str(k).strip() for k in keywords_list if str(k).strip()] + elif isinstance(json_data, list): + # 直接是JSON数组格式 + return [str(k).strip() for k in json_data if str(k).strip()] + except (json.JSONDecodeError, ValueError): + pass + + try: + # 尝试使用 ast.literal_eval 解析(支持Python字面量格式) + import ast + parsed = ast.literal_eval(keywords_str) + if isinstance(parsed, list): + return [str(k).strip() for k in parsed if str(k).strip()] + except (ValueError, SyntaxError): + pass + + # 尝试不同的分隔符 + separators = ['/', ',', ' ', '|', ';'] + + for separator in separators: + if separator in keywords_str: + keywords_list = [k.strip() for k in keywords_str.split(separator) if k.strip()] + if len(keywords_list) > 1: # 确保分割有效 + return keywords_list + + # 如果没有分隔符,返回单个关键词 + return [keywords_str] if keywords_str else [] \ No newline at end of file diff --git a/src/common/database/database_model.py b/src/common/database/database_model.py index e08c82f7f..aa996cf2b 100644 --- a/src/common/database/database_model.py +++ b/src/common/database/database_model.py @@ -345,6 +345,7 @@ class GraphNodes(BaseModel): concept = TextField(unique=True, index=True) # 节点概念 memory_items = TextField() # JSON格式存储的记忆列表 + weight = FloatField(default=0.0) # 节点权重 hash = TextField() # 节点哈希值 created_time = FloatField() # 创建时间戳 last_modified = FloatField() # 最后修改时间戳 @@ -748,4 +749,8 @@ def check_field_constraints(): # 模块加载时调用初始化函数 -initialize_database(sync_constraints=True) \ No newline at end of file +initialize_database(sync_constraints=True) + + + + diff --git a/src/main.py b/src/main.py index 5fb7b471b..9a42c0d7c 100644 --- a/src/main.py +++ b/src/main.py @@ -14,6 +14,7 @@ from src.individuality.individuality import get_individuality, Individuality from src.common.server import get_global_server, Server from src.mood.mood_manager import mood_manager from rich.traceback import install +from src.migrate_helper.migrate import check_and_run_migrations # from src.api.main import start_api_server # 导入新的插件管理器 @@ -116,6 +117,9 @@ class MainSystem: # 初始化个体特征 await self.individuality.initialize() + + await check_and_run_migrations() + try: init_time = int(1000 * (time.time() - init_start_time)) @@ -139,7 +143,6 @@ class MainSystem: [ self.build_memory_task(), self.forget_memory_task(), - self.consolidate_memory_task(), ] ) @@ -160,13 +163,7 @@ class MainSystem: await self.hippocampus_manager.forget_memory(percentage=global_config.memory.memory_forget_percentage) # type: ignore logger.info("[记忆遗忘] 记忆遗忘完成") - async def consolidate_memory_task(self): - """记忆整合任务""" - while True: - await asyncio.sleep(global_config.memory.consolidate_memory_interval) - logger.info("[记忆整合] 开始整合记忆...") - await self.hippocampus_manager.consolidate_memory() # type: ignore - logger.info("[记忆整合] 记忆整合完成") + async def main(): @@ -180,3 +177,5 @@ async def main(): if __name__ == "__main__": asyncio.run(main()) + + \ No newline at end of file diff --git a/src/migrate_helper/__init__.py b/src/migrate_helper/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/migrate_helper/migrate.py b/src/migrate_helper/migrate.py new file mode 100644 index 000000000..6d60dae0a --- /dev/null +++ b/src/migrate_helper/migrate.py @@ -0,0 +1,312 @@ +import json +import os +import asyncio +from src.common.database.database_model import GraphNodes +from src.common.logger import get_logger + +logger = get_logger("migrate") + + +async def migrate_memory_items_to_string(): + """ + 将数据库中记忆节点的memory_items从list格式迁移到string格式 + 并根据原始list的项目数量设置weight值 + """ + logger.info("开始迁移记忆节点格式...") + + migration_stats = { + "total_nodes": 0, + "converted_nodes": 0, + "already_string_nodes": 0, + "empty_nodes": 0, + "error_nodes": 0, + "weight_updated_nodes": 0, + "truncated_nodes": 0 + } + + try: + # 获取所有图节点 + all_nodes = GraphNodes.select() + migration_stats["total_nodes"] = all_nodes.count() + + logger.info(f"找到 {migration_stats['total_nodes']} 个记忆节点") + + for node in all_nodes: + try: + concept = node.concept + memory_items_raw = node.memory_items.strip() if node.memory_items else "" + original_weight = node.weight if hasattr(node, 'weight') and node.weight is not None else 1.0 + + # 如果为空,跳过 + if not memory_items_raw: + migration_stats["empty_nodes"] += 1 + logger.debug(f"跳过空节点: {concept}") + continue + + try: + # 尝试解析JSON + parsed_data = json.loads(memory_items_raw) + + if isinstance(parsed_data, list): + # 如果是list格式,需要转换 + if parsed_data: + # 转换为字符串格式 + new_memory_items = " | ".join(str(item) for item in parsed_data) + original_length = len(new_memory_items) + + # 检查长度并截断 + if len(new_memory_items) > 100: + new_memory_items = new_memory_items[:100] + migration_stats["truncated_nodes"] += 1 + logger.debug(f"节点 '{concept}' 内容过长,从 {original_length} 字符截断到 100 字符") + + new_weight = float(len(parsed_data)) # weight = list项目数量 + + # 更新数据库 + node.memory_items = new_memory_items + node.weight = new_weight + node.save() + + migration_stats["converted_nodes"] += 1 + migration_stats["weight_updated_nodes"] += 1 + + length_info = f" (截断: {original_length}→100)" if original_length > 100 else "" + logger.info(f"转换节点 '{concept}': {len(parsed_data)} 项 -> 字符串{length_info}, weight: {original_weight} -> {new_weight}") + else: + # 空list,设置为空字符串 + node.memory_items = "" + node.weight = 1.0 + node.save() + + migration_stats["converted_nodes"] += 1 + logger.debug(f"转换空list节点: {concept}") + + elif isinstance(parsed_data, str): + # 已经是字符串格式,检查长度和weight + current_content = parsed_data + original_length = len(current_content) + content_truncated = False + + # 检查长度并截断 + if len(current_content) > 100: + current_content = current_content[:100] + content_truncated = True + migration_stats["truncated_nodes"] += 1 + node.memory_items = current_content + logger.debug(f"节点 '{concept}' 字符串内容过长,从 {original_length} 字符截断到 100 字符") + + # 检查weight是否需要更新 + update_needed = False + if original_weight == 1.0: + # 如果weight还是默认值,可以根据内容复杂度估算 + content_parts = current_content.split(" | ") if " | " in current_content else [current_content] + estimated_weight = max(1.0, float(len(content_parts))) + + if estimated_weight != original_weight: + node.weight = estimated_weight + update_needed = True + logger.debug(f"更新字符串节点权重 '{concept}': {original_weight} -> {estimated_weight}") + + # 如果内容被截断或权重需要更新,保存到数据库 + if content_truncated or update_needed: + node.save() + if update_needed: + migration_stats["weight_updated_nodes"] += 1 + if content_truncated: + migration_stats["converted_nodes"] += 1 # 算作转换节点 + else: + migration_stats["already_string_nodes"] += 1 + else: + migration_stats["already_string_nodes"] += 1 + + else: + # 其他JSON类型,转换为字符串 + new_memory_items = str(parsed_data) if parsed_data else "" + original_length = len(new_memory_items) + + # 检查长度并截断 + if len(new_memory_items) > 100: + new_memory_items = new_memory_items[:100] + migration_stats["truncated_nodes"] += 1 + logger.debug(f"节点 '{concept}' 其他类型内容过长,从 {original_length} 字符截断到 100 字符") + + node.memory_items = new_memory_items + node.weight = 1.0 + node.save() + + migration_stats["converted_nodes"] += 1 + length_info = f" (截断: {original_length}→100)" if original_length > 100 else "" + logger.debug(f"转换其他类型节点: {concept}{length_info}") + + except json.JSONDecodeError: + # 不是JSON格式,假设已经是纯字符串 + # 检查是否是带引号的字符串 + if memory_items_raw.startswith('"') and memory_items_raw.endswith('"'): + # 去掉引号 + clean_content = memory_items_raw[1:-1] + original_length = len(clean_content) + + # 检查长度并截断 + if len(clean_content) > 100: + clean_content = clean_content[:100] + migration_stats["truncated_nodes"] += 1 + logger.debug(f"节点 '{concept}' 去引号内容过长,从 {original_length} 字符截断到 100 字符") + + node.memory_items = clean_content + node.save() + + migration_stats["converted_nodes"] += 1 + length_info = f" (截断: {original_length}→100)" if original_length > 100 else "" + logger.debug(f"去除引号节点: {concept}{length_info}") + else: + # 已经是纯字符串格式,检查长度 + current_content = memory_items_raw + original_length = len(current_content) + + # 检查长度并截断 + if len(current_content) > 100: + current_content = current_content[:100] + node.memory_items = current_content + node.save() + + migration_stats["converted_nodes"] += 1 # 算作转换节点 + migration_stats["truncated_nodes"] += 1 + logger.debug(f"节点 '{concept}' 纯字符串内容过长,从 {original_length} 字符截断到 100 字符") + else: + migration_stats["already_string_nodes"] += 1 + logger.debug(f"已是字符串格式节点: {concept}") + + except Exception as e: + migration_stats["error_nodes"] += 1 + logger.error(f"处理节点 {concept} 时发生错误: {e}") + continue + + except Exception as e: + logger.error(f"迁移过程中发生严重错误: {e}") + raise + + # 输出迁移统计 + logger.info("=== 记忆节点迁移完成 ===") + logger.info(f"总节点数: {migration_stats['total_nodes']}") + logger.info(f"已转换节点: {migration_stats['converted_nodes']}") + logger.info(f"已是字符串格式: {migration_stats['already_string_nodes']}") + logger.info(f"空节点: {migration_stats['empty_nodes']}") + logger.info(f"错误节点: {migration_stats['error_nodes']}") + logger.info(f"权重更新节点: {migration_stats['weight_updated_nodes']}") + logger.info(f"内容截断节点: {migration_stats['truncated_nodes']}") + + success_rate = (migration_stats['converted_nodes'] + migration_stats['already_string_nodes']) / migration_stats['total_nodes'] * 100 if migration_stats['total_nodes'] > 0 else 0 + logger.info(f"迁移成功率: {success_rate:.1f}%") + + return migration_stats + + + + +async def set_all_person_known(): + """ + 将person_info库中所有记录的is_known字段设置为True + 在设置之前,先清理掉user_id或platform为空的记录 + """ + logger.info("开始设置所有person_info记录为已认识...") + + try: + from src.common.database.database_model import PersonInfo + + # 获取所有PersonInfo记录 + all_persons = PersonInfo.select() + total_count = all_persons.count() + + logger.info(f"找到 {total_count} 个人员记录") + + if total_count == 0: + logger.info("没有找到任何人员记录") + return {"total": 0, "deleted": 0, "updated": 0, "known_count": 0} + + # 删除user_id或platform为空的记录 + deleted_count = 0 + invalid_records = PersonInfo.select().where( + (PersonInfo.user_id.is_null()) | + (PersonInfo.user_id == '') | + (PersonInfo.platform.is_null()) | + (PersonInfo.platform == '') + ) + + # 记录要删除的记录信息 + for record in invalid_records: + user_id_info = f"'{record.user_id}'" if record.user_id else "NULL" + platform_info = f"'{record.platform}'" if record.platform else "NULL" + person_name_info = f"'{record.person_name}'" if record.person_name else "无名称" + logger.debug(f"删除无效记录: person_id={record.person_id}, user_id={user_id_info}, platform={platform_info}, person_name={person_name_info}") + + # 执行删除操作 + deleted_count = PersonInfo.delete().where( + (PersonInfo.user_id.is_null()) | + (PersonInfo.user_id == '') | + (PersonInfo.platform.is_null()) | + (PersonInfo.platform == '') + ).execute() + + if deleted_count > 0: + logger.info(f"删除了 {deleted_count} 个user_id或platform为空的记录") + else: + logger.info("没有发现user_id或platform为空的记录") + + # 重新获取剩余记录数量 + remaining_count = PersonInfo.select().count() + logger.info(f"清理后剩余 {remaining_count} 个有效记录") + + if remaining_count == 0: + logger.info("清理后没有剩余记录") + return {"total": total_count, "deleted": deleted_count, "updated": 0, "known_count": 0} + + # 批量更新剩余记录的is_known字段为True + updated_count = PersonInfo.update(is_known=True).execute() + + logger.info(f"成功更新 {updated_count} 个人员记录的is_known字段为True") + + # 验证更新结果 + known_count = PersonInfo.select().where(PersonInfo.is_known).count() + + result = { + "total": total_count, + "deleted": deleted_count, + "updated": updated_count, + "known_count": known_count + } + + logger.info("=== person_info更新完成 ===") + logger.info(f"原始记录数: {result['total']}") + logger.info(f"删除记录数: {result['deleted']}") + logger.info(f"更新记录数: {result['updated']}") + logger.info(f"已认识记录数: {result['known_count']}") + + return result + + except Exception as e: + logger.error(f"更新person_info过程中发生错误: {e}") + raise + + + +async def check_and_run_migrations(): + # 获取根目录 + project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) + data_dir = os.path.join(project_root, "data") + temp_dir = os.path.join(data_dir, "temp") + done_file = os.path.join(temp_dir, "done.mem") + + # 检查done.mem是否存在 + if not os.path.exists(done_file): + # 如果temp目录不存在则创建 + if not os.path.exists(temp_dir): + os.makedirs(temp_dir, exist_ok=True) + # 执行迁移函数 + # 依次执行两个异步函数 + await asyncio.sleep(3) + await migrate_memory_items_to_string() + await set_all_person_known() + # 创建done.mem文件 + with open(done_file, "w", encoding="utf-8") as f: + f.write("done") + \ No newline at end of file diff --git a/src/person_info/relationship_manager.py b/src/person_info/relationship_manager.py index c7ee155ee..8469ebeee 100644 --- a/src/person_info/relationship_manager.py +++ b/src/person_info/relationship_manager.py @@ -302,8 +302,8 @@ class RelationshipManager: neuroticism, _ = await self.relationship_llm.generate_response_async(prompt=prompt) - logger.info(f"prompt: {prompt}") - logger.info(f"neuroticism: {neuroticism}") + # logger.info(f"prompt: {prompt}") + # logger.info(f"neuroticism: {neuroticism}") neuroticism = repair_json(neuroticism) diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 6ba9771d2..e66a5f605 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "6.3.3" +version = "6.4.0" #----以下是给开发人员阅读的,如果你只是部署了麦麦,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -140,10 +140,6 @@ forget_memory_interval = 3000 # 记忆遗忘间隔 单位秒 间隔越低, memory_forget_time = 48 #多长时间后的记忆会被遗忘 单位小时 memory_forget_percentage = 0.008 # 记忆遗忘比例 控制记忆遗忘程度 越大遗忘越多 建议保持默认 -consolidate_memory_interval = 1000 # 记忆整合间隔 单位秒 间隔越低,麦麦整合越频繁,记忆更精简 -consolidation_similarity_threshold = 0.7 # 相似度阈值 -consolidation_check_percentage = 0.05 # 检查节点比例 - enable_instant_memory = false # 是否启用即时记忆,测试功能,可能存在未知问题 #不希望记忆的词,已经记忆的不会受到影响,需要手动清理