diff --git a/src/chat/message_receive/storage.py b/src/chat/message_receive/storage.py index 73fccdf97..2aa5562c1 100644 --- a/src/chat/message_receive/storage.py +++ b/src/chat/message_receive/storage.py @@ -182,12 +182,14 @@ class MessageStorageBatcher: is_command = message.is_command or False is_public_notice = message.is_public_notice or False notice_type = message.notice_type - actions = message.actions + # 序列化actions列表为JSON字符串 + actions = orjson.dumps(message.actions).decode("utf-8") if message.actions else None should_reply = message.should_reply should_act = message.should_act additional_config = message.additional_config - key_words = "" - key_words_lite = "" + # 确保关键词字段是字符串格式(如果不是,则序列化) + key_words = MessageStorage._serialize_keywords(message.key_words) + key_words_lite = MessageStorage._serialize_keywords(message.key_words_lite) memorized_times = 0 user_platform = message.user_info.platform if message.user_info else "" @@ -254,7 +256,8 @@ class MessageStorageBatcher: is_command = message.is_command is_public_notice = getattr(message, "is_public_notice", False) notice_type = getattr(message, "notice_type", None) - actions = getattr(message, "actions", None) + # 序列化actions列表为JSON字符串 + actions = orjson.dumps(getattr(message, "actions", None)).decode("utf-8") if getattr(message, "actions", None) else None should_reply = getattr(message, "should_reply", None) should_act = getattr(message, "should_act", None) additional_config = getattr(message, "additional_config", None) @@ -580,6 +583,11 @@ class MessageStorage: is_picid = False is_notify = False is_command = False + is_public_notice = False + notice_type = None + actions = None + should_reply = False + should_act = False key_words = "" key_words_lite = "" else: @@ -593,6 +601,12 @@ class MessageStorage: is_picid = message.is_picid is_notify = message.is_notify is_command = message.is_command + is_public_notice = getattr(message, "is_public_notice", False) + notice_type = getattr(message, "notice_type", None) + # 序列化actions列表为JSON字符串 + actions = orjson.dumps(getattr(message, "actions", None)).decode("utf-8") if getattr(message, "actions", None) else None + should_reply = getattr(message, "should_reply", False) + should_act = getattr(message, "should_act", False) # 序列化关键词列表为JSON字符串 key_words = MessageStorage._serialize_keywords(message.key_words) key_words_lite = MessageStorage._serialize_keywords(message.key_words_lite) @@ -666,6 +680,11 @@ class MessageStorage: is_picid=is_picid, is_notify=is_notify, is_command=is_command, + is_public_notice=is_public_notice, + notice_type=notice_type, + actions=actions, + should_reply=should_reply, + should_act=should_act, key_words=key_words, key_words_lite=key_words_lite, ) diff --git a/src/config/official_configs.py b/src/config/official_configs.py index a8aa295ed..8be21a3eb 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -416,6 +416,7 @@ class MemoryConfig(ValidatedConfigBase): search_min_importance: float = Field(default=0.3, description="最小重要性阈值") search_similarity_threshold: float = Field(default=0.5, description="向量相似度阈值") search_max_expand_depth: int = Field(default=2, description="检索时图扩展深度(0-3)") + search_expand_semantic_threshold: float = Field(default=0.3, description="图扩展时语义相似度阈值(建议0.3-0.5,过低可能引入无关记忆,过高无法扩展)") enable_query_optimization: bool = Field(default=True, description="启用查询优化") # 检索权重配置 (记忆图系统) @@ -426,15 +427,21 @@ class MemoryConfig(ValidatedConfigBase): # 记忆整合配置 consolidation_enabled: bool = Field(default=False, description="是否启用记忆整合") - consolidation_interval_hours: float = Field(default=6.0, description="整合任务执行间隔(小时)") - consolidation_similarity_threshold: float = Field(default=0.92, description="相似记忆去重阈值") - consolidation_time_window_hours: float = Field(default=6.0, description="整合时间窗口(小时)") - consolidation_max_batch_size: int = Field(default=50, description="单次最多处理的记忆数量") - - # 自动关联配置 - auto_link_enabled: bool = Field(default=True, description="是否启用自动关联") - auto_link_max_candidates: int = Field(default=5, description="每个记忆最多关联候选数") - auto_link_min_confidence: float = Field(default=0.7, description="最低置信度阈值") + consolidation_interval_hours: float = Field(default=2.0, description="整合任务执行间隔(小时)") + consolidation_deduplication_threshold: float = Field(default=0.93, description="相似记忆去重阈值") + consolidation_time_window_hours: float = Field(default=2.0, description="整合时间窗口(小时)- 统一用于去重和关联") + consolidation_max_batch_size: int = Field(default=30, description="单次最多处理的记忆数量") + + # 记忆关联配置(整合功能的子模块) + consolidation_linking_enabled: bool = Field(default=True, description="是否启用记忆关联建立") + consolidation_linking_max_candidates: int = Field(default=10, description="每个记忆最多关联的候选数") + consolidation_linking_max_memories: int = Field(default=20, description="单次最多处理的记忆总数") + consolidation_linking_min_importance: float = Field(default=0.5, description="最低重要性阈值") + consolidation_linking_pre_filter_threshold: float = Field(default=0.7, description="向量相似度预筛选阈值") + consolidation_linking_max_pairs_for_llm: int = Field(default=5, description="最多发送给LLM分析的候选对数") + consolidation_linking_min_confidence: float = Field(default=0.7, description="LLM分析最低置信度阈值") + consolidation_linking_llm_temperature: float = Field(default=0.2, description="LLM分析温度参数") + consolidation_linking_llm_max_tokens: int = Field(default=1500, description="LLM分析最大输出长度") # 遗忘配置 (记忆图系统) forgetting_enabled: bool = Field(default=True, description="是否启用自动遗忘") diff --git a/src/memory_graph/manager.py b/src/memory_graph/manager.py index 5def3c2c9..142fe101a 100644 --- a/src/memory_graph/manager.py +++ b/src/memory_graph/manager.py @@ -25,6 +25,10 @@ from src.memory_graph.storage.vector_store import VectorStore from src.memory_graph.tools.memory_tools import MemoryTools from src.memory_graph.utils.embeddings import EmbeddingGenerator import uuid +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import numpy as np logger = logging.getLogger(__name__) @@ -135,14 +139,16 @@ class MemoryManager: # 检查配置值 expand_depth = self.config.search_max_expand_depth - logger.info(f"📊 配置检查: search_max_expand_depth={expand_depth}") - + expand_semantic_threshold = self.config.search_expand_semantic_threshold + logger.info(f"📊 配置检查: search_max_expand_depth={expand_depth}, search_expand_semantic_threshold={expand_semantic_threshold}") + self.tools = MemoryTools( vector_store=self.vector_store, graph_store=self.graph_store, persistence_manager=self.persistence, embedding_generator=self.embedding_generator, max_expand_depth=expand_depth, # 从配置读取图扩展深度 + expand_semantic_threshold=expand_semantic_threshold, # 从配置读取图扩展语义阈值 ) self._initialized = True @@ -977,135 +983,227 @@ class MemoryManager: ) -> Dict[str, Any]: """ 整理记忆:直接合并去重相似记忆(不创建新边) - - 优化点: - 1. 添加批量限制,避免长时间阻塞 - 2. 相似记忆直接覆盖合并,不创建关联边 - 3. 使用 asyncio.sleep 让出控制权,避免阻塞事件循环 - + + 性能优化版本: + 1. 使用 asyncio.create_task 在后台执行,避免阻塞主流程 + 2. 向量计算批量处理,减少重复计算 + 3. 延迟保存,批量写入数据库 + 4. 更频繁的协作式多任务让出 + Args: similarity_threshold: 相似度阈值(默认0.85,建议提高到0.9减少误判) time_window_hours: 时间窗口(小时) max_batch_size: 单次最多处理的记忆数量 - + Returns: - 整理结果 + 整理结果(如果是异步执行,返回启动状态) """ if not self._initialized: await self.initialize() try: - logger.info(f"开始记忆整理 (similarity_threshold={similarity_threshold}, time_window={time_window_hours}h, max_batch={max_batch_size})...") - + logger.info(f"🚀 启动记忆整理任务 (similarity_threshold={similarity_threshold}, time_window={time_window_hours}h, max_batch={max_batch_size})...") + + # 创建后台任务执行整理 + task = asyncio.create_task( + self._consolidate_memories_background( + similarity_threshold=similarity_threshold, + time_window_hours=time_window_hours, + max_batch_size=max_batch_size + ) + ) + + # 返回任务启动状态,不等待完成 + return { + "task_started": True, + "task_id": id(task), + "message": "记忆整理任务已在后台启动" + } + + except Exception as e: + logger.error(f"启动记忆整理任务失败: {e}", exc_info=True) + return {"error": str(e), "task_started": False} + + async def _consolidate_memories_background( + self, + similarity_threshold: float, + time_window_hours: float, + max_batch_size: int, + ) -> None: + """ + 后台执行记忆整理的具体实现 + + 这个方法会在独立任务中运行,不阻塞主流程 + """ + try: result = { "merged_count": 0, "checked_count": 0, "skipped_count": 0, } - + # 获取最近创建的记忆 cutoff_time = datetime.now() - timedelta(hours=time_window_hours) all_memories = self.graph_store.get_all_memories() - + recent_memories = [ mem for mem in all_memories if mem.created_at >= cutoff_time and not mem.metadata.get("forgotten", False) ] - + if not recent_memories: - logger.info("没有需要整理的记忆") - return result - + logger.info("✅ 记忆整理完成: 没有需要整理的记忆") + return + # 限制批量处理数量 if len(recent_memories) > max_batch_size: - logger.info(f"记忆数量 {len(recent_memories)} 超过批量限制 {max_batch_size},仅处理最新的 {max_batch_size} 条") + logger.info(f"📊 记忆数量 {len(recent_memories)} 超过批量限制 {max_batch_size},仅处理最新的 {max_batch_size} 条") recent_memories = sorted(recent_memories, key=lambda m: m.created_at, reverse=True)[:max_batch_size] result["skipped_count"] = len(all_memories) - max_batch_size - - logger.info(f"找到 {len(recent_memories)} 条待整理记忆") + + logger.info(f"📋 找到 {len(recent_memories)} 条待整理记忆") result["checked_count"] = len(recent_memories) - - # 按记忆类型分组 + + # 按记忆类型分组,减少跨类型比较 memories_by_type: Dict[str, List[Memory]] = {} for mem in recent_memories: mem_type = mem.metadata.get("memory_type", "") if mem_type not in memories_by_type: memories_by_type[mem_type] = [] memories_by_type[mem_type].append(mem) - - # 记录已删除的记忆ID,避免重复处理 + + # 记录需要删除的记忆,延迟批量删除 + to_delete: List[Tuple[Memory, str]] = [] # (memory, reason) deleted_ids = set() - + # 对每个类型的记忆进行相似度检测 for mem_type, memories in memories_by_type.items(): if len(memories) < 2: continue - - logger.debug(f"检查类型 '{mem_type}' 的 {len(memories)} 条记忆") - - # 使用向量相似度检测 - for i in range(len(memories)): - # 让出控制权,避免长时间阻塞 - if i % 10 == 0: - await asyncio.sleep(0) - - if memories[i].id in deleted_ids: + + logger.debug(f"🔍 检查类型 '{mem_type}' 的 {len(memories)} 条记忆") + + # 预提取所有主题节点的嵌入向量 + embeddings_map: Dict[str, "np.ndarray"] = {} + valid_memories = [] + + for mem in memories: + topic_node = next((n for n in mem.nodes if n.node_type == NodeType.TOPIC), None) + if topic_node and topic_node.embedding is not None: + embeddings_map[mem.id] = topic_node.embedding + valid_memories.append(mem) + + # 批量计算相似度矩阵(比逐个计算更高效) + import numpy as np + + for i in range(len(valid_memories)): + # 更频繁的协作式多任务让出 + if i % 5 == 0: + await asyncio.sleep(0.001) # 1ms让出 + + mem_i = valid_memories[i] + if mem_i.id in deleted_ids: continue - - for j in range(i + 1, len(memories)): - if memories[j].id in deleted_ids: + + for j in range(i + 1, len(valid_memories)): + if valid_memories[j].id in deleted_ids: continue - - mem_i = memories[i] - mem_j = memories[j] - - # 获取主题节点的向量 - topic_i = next((n for n in mem_i.nodes if n.node_type == NodeType.TOPIC), None) - topic_j = next((n for n in mem_j.nodes if n.node_type == NodeType.TOPIC), None) - - if not topic_i or not topic_j: - continue - - if topic_i.embedding is None or topic_j.embedding is None: - continue - - # 计算余弦相似度 - import numpy as np - similarity = np.dot(topic_i.embedding, topic_j.embedding) / ( - np.linalg.norm(topic_i.embedding) * np.linalg.norm(topic_j.embedding) - ) - + + mem_j = valid_memories[j] + + # 快速向量相似度计算 + embedding_i = embeddings_map[mem_i.id] + embedding_j = embeddings_map[mem_j.id] + + # 优化的余弦相似度计算 + similarity = self._fast_cosine_similarity(embedding_i, embedding_j) + if similarity >= similarity_threshold: - # 直接去重:保留重要性高的,删除另一个(不创建关联边) + # 决定保留哪个记忆 if mem_i.importance >= mem_j.importance: keep_mem, remove_mem = mem_i, mem_j else: keep_mem, remove_mem = mem_j, mem_i - - logger.info( - f"去重相似记忆 (similarity={similarity:.3f}): " - f"保留 {keep_mem.id}, 删除 {remove_mem.id}" + + logger.debug( + f"🔄 标记相似记忆 (similarity={similarity:.3f}): " + f"保留 {keep_mem.id[:8]}, 删除 {remove_mem.id[:8]}" ) - - # 增强保留记忆的重要性(合并信息价值) + + # 增强保留记忆的重要性 keep_mem.importance = min(1.0, keep_mem.importance + 0.05) - keep_mem.activation = min(1.0, keep_mem.activation + 0.05) - - # 将被删除记忆的访问次数累加到保留记忆 + + # 累加访问次数 if hasattr(keep_mem, 'access_count') and hasattr(remove_mem, 'access_count'): keep_mem.access_count += remove_mem.access_count - - # 直接删除相似记忆(不创建边,简化图结构) - await self.delete_memory(remove_mem.id) + + # 标记为待删除(不立即删除) + to_delete.append((remove_mem, f"与记忆 {keep_mem.id[:8]} 相似度 {similarity:.3f}")) deleted_ids.add(remove_mem.id) result["merged_count"] += 1 - - logger.info(f"记忆整理完成: {result}") - return result - + + # 每处理完一个类型就让出控制权 + await asyncio.sleep(0.005) # 5ms让出 + + # 批量删除标记的记忆 + if to_delete: + logger.info(f"🗑️ 开始批量删除 {len(to_delete)} 条相似记忆") + + for memory, reason in to_delete: + try: + # 从向量存储删除节点 + for node in memory.nodes: + if node.embedding is not None: + await self.vector_store.delete_node(node.id) + + # 从图存储删除记忆 + self.graph_store.remove_memory(memory.id) + + except Exception as e: + logger.warning(f"删除记忆 {memory.id[:8]} 失败: {e}") + + # 批量保存(一次性写入,减少I/O) + await self.persistence.save_graph_store(self.graph_store) + logger.info(f"💾 批量保存完成") + + logger.info(f"✅ 记忆整理完成: {result}") + except Exception as e: - logger.error(f"记忆整理失败: {e}", exc_info=True) - return {"error": str(e), "merged_count": 0, "checked_count": 0} + logger.error(f"❌ 记忆整理失败: {e}", exc_info=True) + + def _fast_cosine_similarity(self, vec1: "np.ndarray", vec2: "np.ndarray") -> float: + """ + 快速余弦相似度计算(优化版本) + + Args: + vec1, vec2: 向量 + + Returns: + 余弦相似度 + """ + try: + import numpy as np + + # 避免重复的类型检查和转换 + # 向量应该是numpy数组,如果不是,转换一次 + if not isinstance(vec1, np.ndarray): + vec1 = np.asarray(vec1, dtype=np.float32) + if not isinstance(vec2, np.ndarray): + vec2 = np.asarray(vec2, dtype=np.float32) + + # 使用更高效的范数计算 + norm1 = np.linalg.norm(vec1) + norm2 = np.linalg.norm(vec2) + + if norm1 == 0 or norm2 == 0: + return 0.0 + + # 直接计算点积和除法 + return float(np.dot(vec1, vec2) / (norm1 * norm2)) + + except Exception as e: + logger.warning(f"计算余弦相似度失败: {e}") + return 0.0 async def auto_link_memories( self, @@ -1478,14 +1576,14 @@ class MemoryManager: async def maintenance(self) -> Dict[str, Any]: """ - 执行维护任务 - + 执行维护任务(优化版本) + 包括: - - 记忆整理(合并相似记忆) - - 清理过期记忆 + - 记忆整理(异步后台执行) + - 自动关联记忆(轻量级执行) - 自动遗忘低激活度记忆 - 保存数据 - + Returns: 维护结果 """ @@ -1493,52 +1591,355 @@ class MemoryManager: await self.initialize() try: - logger.info("开始执行记忆系统维护...") - + logger.info("🔧 开始执行记忆系统维护(优化版)...") + result = { - "consolidated": 0, + "consolidation_task": "none", + "linked": 0, "forgotten": 0, - "deleted": 0, "saved": False, + "total_time": 0, } - - # 1. 记忆整理(合并相似记忆) - # 默认禁用自动整理,因为可能阻塞主流程 - # 建议:提高阈值到0.92以上,减少误判;限制批量大小避免阻塞 + + start_time = datetime.now() + + # 1. 记忆整理(异步后台执行,不阻塞主流程) if getattr(self.config, 'consolidation_enabled', False): + logger.info("🚀 启动异步记忆整理任务...") consolidate_result = await self.consolidate_memories( - similarity_threshold=getattr(self.config, 'consolidation_similarity_threshold', 0.92), - time_window_hours=getattr(self.config, 'consolidation_time_window_hours', 24.0), - max_batch_size=getattr(self.config, 'consolidation_max_batch_size', 50) + similarity_threshold=getattr(self.config, 'consolidation_deduplication_threshold', 0.93), + time_window_hours=getattr(self.config, 'consolidation_time_window_hours', 2.0), # 统一时间窗口 + max_batch_size=getattr(self.config, 'consolidation_max_batch_size', 30) ) - result["consolidated"] = consolidate_result.get("merged_count", 0) - - # 2. 自动关联记忆(发现和建立关系) - if getattr(self.config, 'auto_link_enabled', True): - link_result = await self.auto_link_memories() + + if consolidate_result.get("task_started"): + result["consolidation_task"] = f"background_task_{consolidate_result.get('task_id', 'unknown')}" + logger.info("✅ 记忆整理任务已启动到后台执行") + else: + result["consolidation_task"] = "failed" + logger.warning("❌ 记忆整理任务启动失败") + + # 2. 自动关联记忆(使用统一的时间窗口) + if getattr(self.config, 'consolidation_linking_enabled', True): + logger.info("🔗 执行轻量级自动关联...") + link_result = await self._lightweight_auto_link_memories() result["linked"] = link_result.get("linked_count", 0) - - # 3. 自动遗忘 + + # 3. 自动遗忘(快速执行) if getattr(self.config, 'forgetting_enabled', True): + logger.info("🗑️ 执行自动遗忘...") forgotten_count = await self.auto_forget_memories( threshold=getattr(self.config, 'forgetting_activation_threshold', 0.1) ) result["forgotten"] = forgotten_count - - # 4. 清理非常旧的已遗忘记忆(可选) - # TODO: 实现清理逻辑 - - # 5. 保存数据 - await self.persistence.save_graph_store(self.graph_store) - result["saved"] = True - + + # 4. 保存数据(如果记忆整理不在后台执行) + if result["consolidation_task"] == "none": + await self.persistence.save_graph_store(self.graph_store) + result["saved"] = True + logger.info("💾 数据保存完成") + self._last_maintenance = datetime.now() - logger.info(f"维护完成: {result}") + + # 计算维护耗时 + total_time = (datetime.now() - start_time).total_seconds() + result["total_time"] = total_time + + logger.info(f"✅ 维护完成 (耗时 {total_time:.2f}s): {result}") return result - + except Exception as e: - logger.error(f"维护失败: {e}", exc_info=True) - return {"error": str(e)} + logger.error(f"❌ 维护失败: {e}", exc_info=True) + return {"error": str(e), "total_time": 0} + + async def _lightweight_auto_link_memories( + self, + time_window_hours: float = None, # 从配置读取 + max_candidates: int = None, # 从配置读取 + max_memories: int = None, # 从配置读取 + ) -> Dict[str, Any]: + """ + 智能轻量级自动关联记忆(保留LLM判断,优化性能) + + 优化策略: + 1. 从配置读取处理参数,尊重用户设置 + 2. 使用向量相似度预筛选,仅对高相似度记忆调用LLM + 3. 批量LLM调用,减少网络开销 + 4. 异步执行,避免阻塞 + """ + try: + result = { + "checked_count": 0, + "linked_count": 0, + "llm_calls": 0, + } + + # 从配置读取参数,使用统一的时间窗口 + if time_window_hours is None: + time_window_hours = getattr(self.config, 'consolidation_time_window_hours', 2.0) + if max_candidates is None: + max_candidates = getattr(self.config, 'consolidation_linking_max_candidates', 10) + if max_memories is None: + max_memories = getattr(self.config, 'consolidation_linking_max_memories', 20) + + # 获取用户配置时间窗口内的记忆 + time_threshold = datetime.now() - timedelta(hours=time_window_hours) + all_memories = self.graph_store.get_all_memories() + + recent_memories = [ + mem for mem in all_memories + if mem.created_at >= time_threshold + and not mem.metadata.get("forgotten", False) + and mem.importance >= getattr(self.config, 'consolidation_linking_min_importance', 0.5) # 从配置读取重要性阈值 + ] + + if len(recent_memories) > max_memories: + recent_memories = sorted(recent_memories, key=lambda m: m.created_at, reverse=True)[:max_memories] + + if len(recent_memories) < 2: + logger.debug("记忆数量不足,跳过智能关联") + return result + + logger.debug(f"🧠 智能关联: 检查 {len(recent_memories)} 条重要记忆") + + # 第一步:向量相似度预筛选,找到潜在关联对 + candidate_pairs = [] + import numpy as np + + for i, memory in enumerate(recent_memories): + # 获取主题节点 + topic_node = next( + (n for n in memory.nodes if n.node_type == NodeType.TOPIC), + None + ) + + if not topic_node or topic_node.embedding is None: + continue + + # 与其他记忆计算相似度 + for j, other_memory in enumerate(recent_memories[i+1:], i+1): + other_topic = next( + (n for n in other_memory.nodes if n.node_type == NodeType.TOPIC), + None + ) + + if not other_topic or other_topic.embedding is None: + continue + + # 快速相似度计算 + similarity = self._fast_cosine_similarity( + topic_node.embedding, + other_topic.embedding + ) + + # 使用配置的预筛选阈值 + pre_filter_threshold = getattr(self.config, 'consolidation_linking_pre_filter_threshold', 0.7) + if similarity >= pre_filter_threshold: + candidate_pairs.append((memory, other_memory, similarity)) + + # 让出控制权 + if i % 3 == 0: + await asyncio.sleep(0.001) + + logger.debug(f"🔍 预筛选找到 {len(candidate_pairs)} 个候选关联对") + + if not candidate_pairs: + return result + + # 第二步:批量LLM分析(使用配置的最大候选对数) + max_pairs_for_llm = getattr(self.config, 'consolidation_linking_max_pairs_for_llm', 5) + if len(candidate_pairs) <= max_pairs_for_llm: + link_relations = await self._batch_analyze_memory_relations(candidate_pairs) + result["llm_calls"] = 1 + + # 第三步:建立LLM确认的关联 + for relation_info in link_relations: + try: + memory_a, memory_b = relation_info["memory_pair"] + relation_type = relation_info["relation_type"] + confidence = relation_info["confidence"] + + # 创建关联边 + edge = MemoryEdge( + id=f"smart_edge_{uuid.uuid4().hex[:12]}", + source_id=memory_a.subject_id, + target_id=memory_b.subject_id, + relation=relation_type, + edge_type=EdgeType.RELATION, + importance=confidence, + metadata={ + "auto_linked": True, + "method": "llm_analyzed", + "vector_similarity": relation_info.get("vector_similarity", 0.0), + "confidence": confidence, + "reasoning": relation_info.get("reasoning", ""), + "created_at": datetime.now().isoformat(), + } + ) + + # 添加到图 + self.graph_store.graph.add_edge( + edge.source_id, + edge.target_id, + edge_id=edge.id, + relation=edge.relation, + edge_type=edge.edge_type.value, + importance=edge.importance, + metadata=edge.metadata, + ) + + memory_a.edges.append(edge) + result["linked_count"] += 1 + + logger.debug(f"🧠 智能关联: {memory_a.id[:8]} --[{relation_type}]--> {memory_b.id[:8]} (置信度={confidence:.2f})") + + except Exception as e: + logger.warning(f"建立智能关联失败: {e}") + continue + + # 保存关联结果 + if result["linked_count"] > 0: + await self.persistence.save_graph_store(self.graph_store) + + logger.debug(f"✅ 智能关联完成: 建立了 {result['linked_count']} 个关联,LLM调用 {result['llm_calls']} 次") + return result + + except Exception as e: + logger.error(f"智能关联失败: {e}", exc_info=True) + return {"error": str(e), "checked_count": 0, "linked_count": 0} + + async def _batch_analyze_memory_relations( + self, + candidate_pairs: List[Tuple[Memory, Memory, float]] + ) -> List[Dict[str, Any]]: + """ + 批量分析记忆关系(优化LLM调用) + + Args: + candidate_pairs: 候选记忆对列表,每项包含 (memory_a, memory_b, vector_similarity) + + Returns: + 关系分析结果列表 + """ + try: + from src.llm_models.utils_model import LLMRequest + from src.config.config import model_config + + llm = LLMRequest( + model_set=model_config.model_task_config.utils_small, + request_type="memory.batch_relation_analysis" + ) + + # 格式化所有候选记忆对 + candidates_text = "" + for i, (mem_a, mem_b, similarity) in enumerate(candidate_pairs): + desc_a = self._format_memory_for_llm(mem_a) + desc_b = self._format_memory_for_llm(mem_b) + candidates_text += f""" +候选对 {i+1}: +记忆A: {desc_a} +记忆B: {desc_b} +向量相似度: {similarity:.3f} +""" + + # 构建批量分析提示词(使用配置的置信度阈值) + min_confidence = getattr(self.config, 'consolidation_linking_min_confidence', 0.7) + + prompt = f"""你是记忆关系分析专家。请批量分析以下候选记忆对之间的关系。 + +**关系类型说明:** +- 导致: A的发生导致了B的发生(因果关系) +- 引用: A提到或涉及B(引用关系) +- 相似: A和B描述相似的内容(相似关系) +- 相反: A和B表达相反的观点(对立关系) +- 关联: A和B存在某种关联但不属于以上类型(一般关联) + +**候选记忆对:** +{candidates_text} + +**任务要求:** +1. 对每个候选对,判断是否存在有意义的关系 +2. 如果存在关系,指定关系类型和置信度(0.0-1.0) +3. 简要说明判断理由 +4. 只返回置信度 >= {min_confidence} 的关系 +5. 优先考虑因果、引用等强关系,谨慎建立相似关系 + +**输出格式(JSON):** +```json +[ + {{ + "candidate_id": 1, + "has_relation": true, + "relation_type": "导致", + "confidence": 0.85, + "reasoning": "记忆A描述的原因导致记忆B的结果" + }}, + {{ + "candidate_id": 2, + "has_relation": false, + "reasoning": "两者无明显关联" + }} +] +``` + +请分析并输出JSON结果:""" + + # 调用LLM(使用配置的参数) + llm_temperature = getattr(self.config, 'consolidation_linking_llm_temperature', 0.2) + llm_max_tokens = getattr(self.config, 'consolidation_linking_llm_max_tokens', 1500) + + response, _ = await llm.generate_response_async( + prompt, + temperature=llm_temperature, + max_tokens=llm_max_tokens, + ) + + # 解析响应 + import json + import re + + # 提取JSON + json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL) + if json_match: + json_str = json_match.group(1) + else: + json_str = response.strip() + + try: + analysis_results = json.loads(json_str) + except json.JSONDecodeError: + logger.warning(f"LLM返回格式错误,尝试修复: {response[:200]}") + # 尝试简单修复 + json_str = re.sub(r'[\r\n\t]', '', json_str) + analysis_results = json.loads(json_str) + + # 转换为结果格式 + relations = [] + for result in analysis_results: + if not result.get("has_relation", False): + continue + + confidence = result.get("confidence", 0.0) + if confidence < min_confidence: # 使用配置的置信度阈值 + continue + + candidate_id = result.get("candidate_id", 0) - 1 + if 0 <= candidate_id < len(candidate_pairs): + mem_a, mem_b, vector_similarity = candidate_pairs[candidate_id] + relations.append({ + "memory_pair": (mem_a, mem_b), + "relation_type": result.get("relation_type", "关联"), + "confidence": confidence, + "reasoning": result.get("reasoning", ""), + "vector_similarity": vector_similarity, + }) + + logger.debug(f"🧠 LLM批量分析完成: 发现 {len(relations)} 个关系") + return relations + + except Exception as e: + logger.error(f"LLM批量关系分析失败: {e}", exc_info=True) + return [] async def start_maintenance_scheduler(self) -> None: """ diff --git a/src/memory_graph/tools/memory_tools.py b/src/memory_graph/tools/memory_tools.py index 798a36268..9f0e431c9 100644 --- a/src/memory_graph/tools/memory_tools.py +++ b/src/memory_graph/tools/memory_tools.py @@ -35,24 +35,27 @@ class MemoryTools: persistence_manager: PersistenceManager, embedding_generator: Optional[EmbeddingGenerator] = None, max_expand_depth: int = 1, + expand_semantic_threshold: float = 0.3, ): """ 初始化工具集 - + Args: vector_store: 向量存储 graph_store: 图存储 persistence_manager: 持久化管理器 embedding_generator: 嵌入生成器(可选) max_expand_depth: 图扩展深度的默认值(从配置读取) + expand_semantic_threshold: 图扩展时语义相似度阈值(从配置读取) """ self.vector_store = vector_store self.graph_store = graph_store self.persistence_manager = persistence_manager self._initialized = False self.max_expand_depth = max_expand_depth # 保存配置的默认值 - - logger.info(f"MemoryTools 初始化: max_expand_depth={max_expand_depth}") + self.expand_semantic_threshold = expand_semantic_threshold # 保存配置的语义阈值 + + logger.info(f"MemoryTools 初始化: max_expand_depth={max_expand_depth}, expand_semantic_threshold={expand_semantic_threshold}") # 初始化组件 self.extractor = MemoryExtractor() @@ -507,7 +510,7 @@ class MemoryTools: initial_memory_ids=list(initial_memory_ids), query_embedding=query_embedding, max_depth=expand_depth, - semantic_threshold=0.5, + semantic_threshold=self.expand_semantic_threshold, # 使用配置的阈值 max_expanded=top_k * 2 ) diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 81f34c347..60fbfbb83 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "7.6.2" +version = "7.6.4" #----以下是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -251,19 +251,32 @@ vector_db_path = "data/memory_graph/chroma_db" # 向量数据库路径 (使用 # === 记忆检索配置 === search_top_k = 10 # 默认检索返回数量 search_min_importance = 0.3 # 最小重要性阈值 (0.0-1.0) -search_similarity_threshold = 0.5 # 向量相似度阈值 +search_similarity_threshold = 0.6 # 向量相似度阈值 +search_expand_semantic_threshold = 0.3 # 图扩展时语义相似度阈值(建议0.3-0.5,过低可能引入无关记忆,过高无法扩展) # 智能查询优化 enable_query_optimization = true # 启用查询优化(使用小模型分析对话历史,生成综合性搜索查询) # === 记忆整合配置 === +# 记忆整合包含两个功能:1)去重(合并相似记忆)2)关联(建立记忆关系) # 注意:整合任务会遍历所有记忆进行相似度计算,可能占用较多资源 # 建议:1) 降低执行频率;2) 提高相似度阈值减少误判;3) 限制批量大小 consolidation_enabled = true # 是否启用记忆整合 consolidation_interval_hours = 1.0 # 整合任务执行间隔 -consolidation_similarity_threshold = 0.92 # 相似记忆去重阈值(建议>=0.92减少误判,0.85太低) -consolidation_time_window_hours = 6.0 # 整合时间窗口(小时) -consolidation_max_batch_size = 50 # 单次最多处理的记忆数量(限制批量避免阻塞) +consolidation_deduplication_threshold = 0.93 # 相似记忆去重阈值 +consolidation_time_window_hours = 2.0 # 整合时间窗口(小时)- 统一用于去重和关联 +consolidation_max_batch_size = 100 # 单次最多处理的记忆数量 + +# 记忆关联配置(整合功能的子模块) +consolidation_linking_enabled = true # 是否启用记忆关联建立 +consolidation_linking_max_candidates = 10 # 每个记忆最多关联的候选数 +consolidation_linking_max_memories = 20 # 单次最多处理的记忆总数 +consolidation_linking_min_importance = 0.5 # 最低重要性阈值(低于此值的记忆不参与关联) +consolidation_linking_pre_filter_threshold = 0.7 # 向量相似度预筛选阈值 +consolidation_linking_max_pairs_for_llm = 5 # 最多发送给LLM分析的候选对数 +consolidation_linking_min_confidence = 0.7 # LLM分析最低置信度阈值 +consolidation_linking_llm_temperature = 0.2 # LLM分析温度参数 +consolidation_linking_llm_max_tokens = 1500 # LLM分析最大输出长度 # === 记忆遗忘配置 === forgetting_enabled = true # 是否启用自动遗忘 @@ -273,7 +286,7 @@ forgetting_min_importance = 0.8 # 最小保护重要性(高于此值的记忆 # === 记忆激活配置 === activation_decay_rate = 0.9 # 激活度衰减率(每天衰减10%) activation_propagation_strength = 0.5 # 激活传播强度(传播到相关记忆的激活度比例) -activation_propagation_depth = 2 # 激活传播深度(最多传播几层,建议1-2) +activation_propagation_depth = 1 # 激活传播深度(最多传播几层,建议1-2) # === 记忆检索配置 === search_max_expand_depth = 2 # 检索时图扩展深度(0=仅直接匹配,1=扩展1跳,2=扩展2跳,推荐1-2)