diff --git a/src/memory_graph/utils/path_expansion.py b/src/memory_graph/utils/path_expansion.py index 57b4c3e13..f24445495 100644 --- a/src/memory_graph/utils/path_expansion.py +++ b/src/memory_graph/utils/path_expansion.py @@ -71,6 +71,12 @@ class PathExpansionConfig: medium_score_threshold: float = 0.4 # 中分路径阈值 max_active_paths: int = 1000 # 最大活跃路径数(防止爆炸) top_paths_retain: int = 500 # 超限时保留的top路径数 + + # 🚀 性能优化参数 + enable_early_stop: bool = True # 启用早停(如果路径增长很少则提前结束) + early_stop_growth_threshold: float = 0.1 # 早停阈值(路径增长率低于10%则停止) + max_candidate_memories: int = 200 # 🆕 最大候选记忆数(在最终评分前过滤) + min_path_count_for_memory: int = 1 # 🆕 记忆至少需要的路径数(过滤弱关联记忆) # 边类型权重配置 edge_type_weights: dict[str, float] = field( @@ -115,6 +121,10 @@ class PathScoreExpansion: self.vector_store = vector_store self.config = config or PathExpansionConfig() self.prefer_node_types: list[str] = [] # 🆕 偏好节点类型 + + # 🚀 性能优化:邻居边缓存 + self._neighbor_cache: dict[str, list[Any]] = {} + self._node_score_cache: dict[str, float] = {} logger.info( f"PathScoreExpansion 初始化: max_hops={self.config.max_hops}, " @@ -147,6 +157,10 @@ class PathScoreExpansion: logger.warning("初始节点为空,无法进行路径扩展") return [] + # 🚀 清空缓存(每次查询重新开始) + self._neighbor_cache.clear() + self._node_score_cache.clear() + # 保存偏好类型 self.prefer_node_types = prefer_node_types or [] if self.prefer_node_types: @@ -173,6 +187,11 @@ class PathScoreExpansion: paths_merged = 0 paths_pruned = 0 + # 🚀 性能优化:收集所有需要评分的候选节点,然后批量计算 + candidate_nodes_for_batch = set() + path_candidates: list[tuple[Path, Any, str, float]] = [] # (path, edge, next_node, edge_weight) + + # 第一阶段:收集所有候选节点 for path in active_paths: current_node = path.get_leaf_node() if not current_node: @@ -192,52 +211,62 @@ class PathScoreExpansion: if path.contains_node(next_node): continue - # 计算新路径分数 edge_weight = self._get_edge_weight(edge) - node_score = await self._get_node_score(next_node, query_embedding) - - new_score = self._calculate_path_score( - old_score=path.score, - edge_weight=edge_weight, - node_score=node_score, - depth=hop + 1, - ) - - # 剪枝:如果到达该节点的分数远低于已有最优路径,跳过 - if next_node in best_score_to_node: - if new_score < best_score_to_node[next_node] * self.config.pruning_threshold: - paths_pruned += 1 - continue - - # 更新最佳分数 - best_score_to_node[next_node] = max(best_score_to_node.get(next_node, 0), new_score) - - # 创建新路径 - new_path = Path( - nodes=path.nodes + [next_node], - edges=path.edges + [edge], - score=new_score, - depth=hop + 1, - parent=path, - ) - - # 尝试路径合并 - merged_path = self._try_merge_paths(new_path, next_paths) - if merged_path: - next_paths.append(merged_path) - paths_merged += 1 - else: - next_paths.append(new_path) - - branches_created += 1 + + # 记录候选 + path_candidates.append((path, edge, next_node, edge_weight)) + candidate_nodes_for_batch.add(next_node) + branch_count += 1 - if branch_count >= max_branches: break - # 让出控制权 - if len(next_paths) % 100 == 0: - await asyncio.sleep(0) + # 🚀 第二阶段:批量计算所有候选节点的分数 + if candidate_nodes_for_batch: + batch_node_scores = await self._batch_get_node_scores( + list(candidate_nodes_for_batch), query_embedding + ) + else: + batch_node_scores = {} + + # 🚀 第三阶段:使用批量计算的分数创建路径 + for path, edge, next_node, edge_weight in path_candidates: + node_score = batch_node_scores.get(next_node, 0.3) + + new_score = self._calculate_path_score( + old_score=path.score, + edge_weight=edge_weight, + node_score=node_score, + depth=hop + 1, + ) + + # 剪枝:如果到达该节点的分数远低于已有最优路径,跳过 + if next_node in best_score_to_node: + if new_score < best_score_to_node[next_node] * self.config.pruning_threshold: + paths_pruned += 1 + continue + + # 更新最佳分数 + best_score_to_node[next_node] = max(best_score_to_node.get(next_node, 0), new_score) + + # 创建新路径 + new_path = Path( + nodes=path.nodes + [next_node], + edges=path.edges + [edge], + score=new_score, + depth=hop + 1, + parent=path, + ) + + # 尝试路径合并 + merged_path = self._try_merge_paths(new_path, next_paths) + if merged_path: + next_paths.append(merged_path) + paths_merged += 1 + else: + next_paths.append(new_path) + + branches_created += 1 # 路径数量控制:如果爆炸性增长,保留高分路径 if len(next_paths) > self.config.max_active_paths: @@ -249,7 +278,30 @@ class PathScoreExpansion: : self.config.top_paths_retain ] + # 🚀 早停检测:如果路径增长很少,提前终止 + prev_path_count = len(active_paths) active_paths = next_paths + + if self.config.enable_early_stop and prev_path_count > 0: + growth_rate = (len(active_paths) - prev_path_count) / prev_path_count + if growth_rate < self.config.early_stop_growth_threshold: + logger.info( + f"⏸️ 早停触发: 路径增长率 {growth_rate:.2%} < {self.config.early_stop_growth_threshold:.0%}, " + f"在第 {hop+1}/{self.config.max_hops} 跳停止" + ) + hop_time = time.time() - hop_start + hop_stats.append( + { + "hop": hop + 1, + "paths": len(active_paths), + "branches": branches_created, + "merged": paths_merged, + "pruned": paths_pruned, + "time": hop_time, + "early_stopped": True, + } + ) + break hop_time = time.time() - hop_start hop_stats.append( @@ -285,6 +337,31 @@ class PathScoreExpansion: memory_paths = await self._map_paths_to_memories(leaf_paths) logger.info(f"🔗 映射到 {len(memory_paths)} 条候选记忆") + # 🚀 4.5. 粗排过滤:在详细评分前过滤掉低质量记忆 + if len(memory_paths) > self.config.max_candidate_memories: + # 按路径数量和路径最大分数进行粗排 + memory_scores_rough = [] + for mem_id, (memory, paths) in memory_paths.items(): + # 简单评分:路径数量 × 最高路径分数 × 重要性 + max_path_score = max(p.score for p in paths) if paths else 0 + rough_score = len(paths) * max_path_score * memory.importance + memory_scores_rough.append((mem_id, rough_score)) + + # 保留top候选 + memory_scores_rough.sort(key=lambda x: x[1], reverse=True) + retained_mem_ids = set(mem_id for mem_id, _ in memory_scores_rough[:self.config.max_candidate_memories]) + + # 过滤 + memory_paths = { + mem_id: (memory, paths) + for mem_id, (memory, paths) in memory_paths.items() + if mem_id in retained_mem_ids + } + + logger.info( + f"⚡ 粗排过滤: {len(memory_scores_rough)} → {len(memory_paths)} 条候选记忆" + ) + # 5. 最终评分 scored_memories = await self._final_scoring(memory_paths) @@ -310,7 +387,7 @@ class PathScoreExpansion: async def _get_sorted_neighbor_edges(self, node_id: str) -> list[Any]: """ - 获取节点的排序邻居边(按边权重排序) + 获取节点的排序邻居边(按边权重排序)- 带缓存优化 Args: node_id: 节点ID @@ -318,6 +395,10 @@ class PathScoreExpansion: Returns: 排序后的边列表 """ + # 🚀 缓存检查 + if node_id in self._neighbor_cache: + return self._neighbor_cache[node_id] + edges = [] # 从图存储中获取与该节点相关的所有边 @@ -335,6 +416,9 @@ class PathScoreExpansion: # 按边权重排序 unique_edges.sort(key=lambda e: self._get_edge_weight(e), reverse=True) + # 🚀 存入缓存 + self._neighbor_cache[node_id] = unique_edges + return unique_edges def _get_edge_weight(self, edge: Any) -> float: @@ -393,6 +477,78 @@ class PathScoreExpansion: return base_score + async def _batch_get_node_scores( + self, node_ids: list[str], query_embedding: "np.ndarray | None" + ) -> dict[str, float]: + """ + 批量获取节点分数(性能优化版本) + + Args: + node_ids: 节点ID列表 + query_embedding: 查询向量 + + Returns: + {node_id: score} 字典 + """ + import numpy as np + + scores = {} + + if query_embedding is None: + # 无查询向量时,返回默认分数 + return {nid: 0.5 for nid in node_ids} + + # 批量获取节点数据 + node_data_list = await asyncio.gather( + *[self.vector_store.get_node_by_id(nid) for nid in node_ids], + return_exceptions=True + ) + + # 收集有效的嵌入向量 + valid_embeddings = [] + valid_node_ids = [] + node_metadata_map = {} + + for nid, node_data in zip(node_ids, node_data_list): + if isinstance(node_data, Exception): + scores[nid] = 0.3 + continue + + # 类型守卫:确保 node_data 是字典 + if not node_data or not isinstance(node_data, dict) or "embedding" not in node_data: + scores[nid] = 0.3 + else: + valid_embeddings.append(node_data["embedding"]) + valid_node_ids.append(nid) + node_metadata_map[nid] = node_data.get("metadata", {}) + + if valid_embeddings: + # 批量计算相似度(使用矩阵运算) + embeddings_matrix = np.array(valid_embeddings) + query_norm = np.linalg.norm(query_embedding) + embeddings_norms = np.linalg.norm(embeddings_matrix, axis=1) + + # 向量化计算余弦相似度 + similarities = np.dot(embeddings_matrix, query_embedding) / (embeddings_norms * query_norm + 1e-8) + similarities = np.clip(similarities, 0.0, 1.0) + + # 应用偏好类型加成 + for nid, sim in zip(valid_node_ids, similarities): + base_score = float(sim) + + # 偏好类型加成 + if self.prefer_node_types and nid in node_metadata_map: + node_type = node_metadata_map[nid].get("node_type") + if node_type and node_type in self.prefer_node_types: + bonus = base_score * 0.2 + scores[nid] = base_score + bonus + else: + scores[nid] = base_score + else: + scores[nid] = base_score + + return scores + def _calculate_path_score(self, old_score: float, edge_weight: float, node_score: float, depth: int) -> float: """ 计算路径分数(核心公式) @@ -522,7 +678,7 @@ class PathScoreExpansion: async def _map_paths_to_memories(self, paths: list[Path]) -> dict[str, tuple[Any, list[Path]]]: """ - 将路径映射到记忆 + 将路径映射到记忆 - 优化版 Args: paths: 路径列表 @@ -533,22 +689,39 @@ class PathScoreExpansion: # 使用临时字典存储路径列表 temp_paths: dict[str, list[Path]] = {} temp_memories: dict[str, Any] = {} # 存储 Memory 对象 + + # 🚀 性能优化:收集所有需要获取的记忆ID,然后批量获取 + all_memory_ids = set() + path_to_memory_ids: dict[int, set[str]] = {} # path对象id -> 记忆ID集合 for path in paths: - # 收集路径中所有节点涉及的记忆 memory_ids_in_path = set() - + + # 收集路径中所有节点涉及的记忆 for node_id in path.nodes: memory_ids = self.graph_store.node_to_memories.get(node_id, []) memory_ids_in_path.update(memory_ids) + + all_memory_ids.update(memory_ids_in_path) + path_to_memory_ids[id(path)] = memory_ids_in_path - # 将路径贡献给所有涉及的记忆 + # 🚀 批量获取记忆对象(如果graph_store支持批量获取) + # 注意:这里假设逐个获取,如果有批量API可以进一步优化 + memory_cache: dict[str, Any] = {} + for mem_id in all_memory_ids: + memory = self.graph_store.get_memory_by_id(mem_id) + if memory: + memory_cache[mem_id] = memory + + # 构建映射关系 + for path in paths: + memory_ids_in_path = path_to_memory_ids[id(path)] + for mem_id in memory_ids_in_path: - memory = self.graph_store.get_memory_by_id(mem_id) - if memory: + if mem_id in memory_cache: if mem_id not in temp_paths: temp_paths[mem_id] = [] - temp_memories[mem_id] = memory + temp_memories[mem_id] = memory_cache[mem_id] temp_paths[mem_id].append(path) # 构建最终返回的字典 @@ -563,7 +736,7 @@ class PathScoreExpansion: self, memory_paths: dict[str, tuple[Any, list[Path]]] ) -> list[tuple[Any, float, list[Path]]]: """ - 最终评分(结合路径分数、重要性、时效性 + 偏好类型加成) + 最终评分(结合路径分数、重要性、时效性 + 偏好类型加成)- 优化版 Args: memory_paths: 记忆ID到(记忆, 路径列表)的映射 @@ -572,7 +745,36 @@ class PathScoreExpansion: [(Memory, final_score, paths), ...] """ scored_memories = [] + + # 🚀 性能优化:如果需要偏好类型加成,批量预加载所有节点的类型信息 + node_type_cache: dict[str, str | None] = {} + + if self.prefer_node_types: + # 收集所有需要查询的节点ID + all_node_ids = set() + for memory, _ in memory_paths.values(): + memory_nodes = getattr(memory, "nodes", []) + for node in memory_nodes: + node_id = node.id if hasattr(node, "id") else str(node) + all_node_ids.add(node_id) + + # 批量获取节点数据 + if all_node_ids: + logger.debug(f"🔍 批量预加载 {len(all_node_ids)} 个节点的类型信息") + node_data_list = await asyncio.gather( + *[self.vector_store.get_node_by_id(nid) for nid in all_node_ids], + return_exceptions=True + ) + + # 构建类型缓存 + for nid, node_data in zip(all_node_ids, node_data_list): + if isinstance(node_data, Exception) or not node_data or not isinstance(node_data, dict): + node_type_cache[nid] = None + else: + metadata = node_data.get("metadata", {}) + node_type_cache[nid] = metadata.get("node_type") + # 遍历所有记忆进行评分 for mem_id, (memory, paths) in memory_paths.items(): # 1. 聚合路径分数 path_score = self._aggregate_path_scores(paths) @@ -591,23 +793,18 @@ class PathScoreExpansion: + recency_score * weights["recency"] ) - # 🆕 5. 偏好类型加成(检查记忆中的节点类型) + # 🆕 5. 偏好类型加成(使用缓存的类型信息) type_bonus = 0.0 - if self.prefer_node_types: - # 获取记忆中的所有节点 + if self.prefer_node_types and node_type_cache: memory_nodes = getattr(memory, "nodes", []) if memory_nodes: - # 计算匹配偏好类型的节点比例 + # 使用缓存快速计算匹配数 matched_count = 0 for node in memory_nodes: - # 从 Node 对象提取 ID node_id = node.id if hasattr(node, "id") else str(node) - node_data = await self.vector_store.get_node_by_id(node_id) - if node_data: - metadata = node_data.get("metadata", {}) - node_type = metadata.get("node_type") - if node_type and node_type in self.prefer_node_types: - matched_count += 1 + node_type = node_type_cache.get(node_id) + if node_type and node_type in self.prefer_node_types: + matched_count += 1 if matched_count > 0: match_ratio = matched_count / len(memory_nodes)