feat(path_expansion): 添加性能优化参数和批量计算功能,提升路径评分效率

This commit is contained in:
Windpicker-owo
2025-11-13 16:40:13 +08:00
parent fd24e7cb0f
commit e119b33a69

View File

@@ -71,6 +71,12 @@ class PathExpansionConfig:
medium_score_threshold: float = 0.4 # 中分路径阈值
max_active_paths: int = 1000 # 最大活跃路径数(防止爆炸)
top_paths_retain: int = 500 # 超限时保留的top路径数
# 🚀 性能优化参数
enable_early_stop: bool = True # 启用早停(如果路径增长很少则提前结束)
early_stop_growth_threshold: float = 0.1 # 早停阈值路径增长率低于10%则停止)
max_candidate_memories: int = 200 # 🆕 最大候选记忆数(在最终评分前过滤)
min_path_count_for_memory: int = 1 # 🆕 记忆至少需要的路径数(过滤弱关联记忆)
# 边类型权重配置
edge_type_weights: dict[str, float] = field(
@@ -115,6 +121,10 @@ class PathScoreExpansion:
self.vector_store = vector_store
self.config = config or PathExpansionConfig()
self.prefer_node_types: list[str] = [] # 🆕 偏好节点类型
# 🚀 性能优化:邻居边缓存
self._neighbor_cache: dict[str, list[Any]] = {}
self._node_score_cache: dict[str, float] = {}
logger.info(
f"PathScoreExpansion 初始化: max_hops={self.config.max_hops}, "
@@ -147,6 +157,10 @@ class PathScoreExpansion:
logger.warning("初始节点为空,无法进行路径扩展")
return []
# 🚀 清空缓存(每次查询重新开始)
self._neighbor_cache.clear()
self._node_score_cache.clear()
# 保存偏好类型
self.prefer_node_types = prefer_node_types or []
if self.prefer_node_types:
@@ -173,6 +187,11 @@ class PathScoreExpansion:
paths_merged = 0
paths_pruned = 0
# 🚀 性能优化:收集所有需要评分的候选节点,然后批量计算
candidate_nodes_for_batch = set()
path_candidates: list[tuple[Path, Any, str, float]] = [] # (path, edge, next_node, edge_weight)
# 第一阶段:收集所有候选节点
for path in active_paths:
current_node = path.get_leaf_node()
if not current_node:
@@ -192,52 +211,62 @@ class PathScoreExpansion:
if path.contains_node(next_node):
continue
# 计算新路径分数
edge_weight = self._get_edge_weight(edge)
node_score = await self._get_node_score(next_node, query_embedding)
new_score = self._calculate_path_score(
old_score=path.score,
edge_weight=edge_weight,
node_score=node_score,
depth=hop + 1,
)
# 剪枝:如果到达该节点的分数远低于已有最优路径,跳过
if next_node in best_score_to_node:
if new_score < best_score_to_node[next_node] * self.config.pruning_threshold:
paths_pruned += 1
continue
# 更新最佳分数
best_score_to_node[next_node] = max(best_score_to_node.get(next_node, 0), new_score)
# 创建新路径
new_path = Path(
nodes=path.nodes + [next_node],
edges=path.edges + [edge],
score=new_score,
depth=hop + 1,
parent=path,
)
# 尝试路径合并
merged_path = self._try_merge_paths(new_path, next_paths)
if merged_path:
next_paths.append(merged_path)
paths_merged += 1
else:
next_paths.append(new_path)
branches_created += 1
# 记录候选
path_candidates.append((path, edge, next_node, edge_weight))
candidate_nodes_for_batch.add(next_node)
branch_count += 1
if branch_count >= max_branches:
break
# 让出控制权
if len(next_paths) % 100 == 0:
await asyncio.sleep(0)
# 🚀 第二阶段:批量计算所有候选节点的分数
if candidate_nodes_for_batch:
batch_node_scores = await self._batch_get_node_scores(
list(candidate_nodes_for_batch), query_embedding
)
else:
batch_node_scores = {}
# 🚀 第三阶段:使用批量计算的分数创建路径
for path, edge, next_node, edge_weight in path_candidates:
node_score = batch_node_scores.get(next_node, 0.3)
new_score = self._calculate_path_score(
old_score=path.score,
edge_weight=edge_weight,
node_score=node_score,
depth=hop + 1,
)
# 剪枝:如果到达该节点的分数远低于已有最优路径,跳过
if next_node in best_score_to_node:
if new_score < best_score_to_node[next_node] * self.config.pruning_threshold:
paths_pruned += 1
continue
# 更新最佳分数
best_score_to_node[next_node] = max(best_score_to_node.get(next_node, 0), new_score)
# 创建新路径
new_path = Path(
nodes=path.nodes + [next_node],
edges=path.edges + [edge],
score=new_score,
depth=hop + 1,
parent=path,
)
# 尝试路径合并
merged_path = self._try_merge_paths(new_path, next_paths)
if merged_path:
next_paths.append(merged_path)
paths_merged += 1
else:
next_paths.append(new_path)
branches_created += 1
# 路径数量控制:如果爆炸性增长,保留高分路径
if len(next_paths) > self.config.max_active_paths:
@@ -249,7 +278,30 @@ class PathScoreExpansion:
: self.config.top_paths_retain
]
# 🚀 早停检测:如果路径增长很少,提前终止
prev_path_count = len(active_paths)
active_paths = next_paths
if self.config.enable_early_stop and prev_path_count > 0:
growth_rate = (len(active_paths) - prev_path_count) / prev_path_count
if growth_rate < self.config.early_stop_growth_threshold:
logger.info(
f"⏸️ 早停触发: 路径增长率 {growth_rate:.2%} < {self.config.early_stop_growth_threshold:.0%}, "
f"在第 {hop+1}/{self.config.max_hops} 跳停止"
)
hop_time = time.time() - hop_start
hop_stats.append(
{
"hop": hop + 1,
"paths": len(active_paths),
"branches": branches_created,
"merged": paths_merged,
"pruned": paths_pruned,
"time": hop_time,
"early_stopped": True,
}
)
break
hop_time = time.time() - hop_start
hop_stats.append(
@@ -285,6 +337,31 @@ class PathScoreExpansion:
memory_paths = await self._map_paths_to_memories(leaf_paths)
logger.info(f"🔗 映射到 {len(memory_paths)} 条候选记忆")
# 🚀 4.5. 粗排过滤:在详细评分前过滤掉低质量记忆
if len(memory_paths) > self.config.max_candidate_memories:
# 按路径数量和路径最大分数进行粗排
memory_scores_rough = []
for mem_id, (memory, paths) in memory_paths.items():
# 简单评分:路径数量 × 最高路径分数 × 重要性
max_path_score = max(p.score for p in paths) if paths else 0
rough_score = len(paths) * max_path_score * memory.importance
memory_scores_rough.append((mem_id, rough_score))
# 保留top候选
memory_scores_rough.sort(key=lambda x: x[1], reverse=True)
retained_mem_ids = set(mem_id for mem_id, _ in memory_scores_rough[:self.config.max_candidate_memories])
# 过滤
memory_paths = {
mem_id: (memory, paths)
for mem_id, (memory, paths) in memory_paths.items()
if mem_id in retained_mem_ids
}
logger.info(
f"⚡ 粗排过滤: {len(memory_scores_rough)}{len(memory_paths)} 条候选记忆"
)
# 5. 最终评分
scored_memories = await self._final_scoring(memory_paths)
@@ -310,7 +387,7 @@ class PathScoreExpansion:
async def _get_sorted_neighbor_edges(self, node_id: str) -> list[Any]:
"""
获取节点的排序邻居边(按边权重排序)
获取节点的排序邻居边(按边权重排序)- 带缓存优化
Args:
node_id: 节点ID
@@ -318,6 +395,10 @@ class PathScoreExpansion:
Returns:
排序后的边列表
"""
# 🚀 缓存检查
if node_id in self._neighbor_cache:
return self._neighbor_cache[node_id]
edges = []
# 从图存储中获取与该节点相关的所有边
@@ -335,6 +416,9 @@ class PathScoreExpansion:
# 按边权重排序
unique_edges.sort(key=lambda e: self._get_edge_weight(e), reverse=True)
# 🚀 存入缓存
self._neighbor_cache[node_id] = unique_edges
return unique_edges
def _get_edge_weight(self, edge: Any) -> float:
@@ -393,6 +477,78 @@ class PathScoreExpansion:
return base_score
async def _batch_get_node_scores(
self, node_ids: list[str], query_embedding: "np.ndarray | None"
) -> dict[str, float]:
"""
批量获取节点分数(性能优化版本)
Args:
node_ids: 节点ID列表
query_embedding: 查询向量
Returns:
{node_id: score} 字典
"""
import numpy as np
scores = {}
if query_embedding is None:
# 无查询向量时,返回默认分数
return {nid: 0.5 for nid in node_ids}
# 批量获取节点数据
node_data_list = await asyncio.gather(
*[self.vector_store.get_node_by_id(nid) for nid in node_ids],
return_exceptions=True
)
# 收集有效的嵌入向量
valid_embeddings = []
valid_node_ids = []
node_metadata_map = {}
for nid, node_data in zip(node_ids, node_data_list):
if isinstance(node_data, Exception):
scores[nid] = 0.3
continue
# 类型守卫:确保 node_data 是字典
if not node_data or not isinstance(node_data, dict) or "embedding" not in node_data:
scores[nid] = 0.3
else:
valid_embeddings.append(node_data["embedding"])
valid_node_ids.append(nid)
node_metadata_map[nid] = node_data.get("metadata", {})
if valid_embeddings:
# 批量计算相似度(使用矩阵运算)
embeddings_matrix = np.array(valid_embeddings)
query_norm = np.linalg.norm(query_embedding)
embeddings_norms = np.linalg.norm(embeddings_matrix, axis=1)
# 向量化计算余弦相似度
similarities = np.dot(embeddings_matrix, query_embedding) / (embeddings_norms * query_norm + 1e-8)
similarities = np.clip(similarities, 0.0, 1.0)
# 应用偏好类型加成
for nid, sim in zip(valid_node_ids, similarities):
base_score = float(sim)
# 偏好类型加成
if self.prefer_node_types and nid in node_metadata_map:
node_type = node_metadata_map[nid].get("node_type")
if node_type and node_type in self.prefer_node_types:
bonus = base_score * 0.2
scores[nid] = base_score + bonus
else:
scores[nid] = base_score
else:
scores[nid] = base_score
return scores
def _calculate_path_score(self, old_score: float, edge_weight: float, node_score: float, depth: int) -> float:
"""
计算路径分数(核心公式)
@@ -522,7 +678,7 @@ class PathScoreExpansion:
async def _map_paths_to_memories(self, paths: list[Path]) -> dict[str, tuple[Any, list[Path]]]:
"""
将路径映射到记忆
将路径映射到记忆 - 优化版
Args:
paths: 路径列表
@@ -533,22 +689,39 @@ class PathScoreExpansion:
# 使用临时字典存储路径列表
temp_paths: dict[str, list[Path]] = {}
temp_memories: dict[str, Any] = {} # 存储 Memory 对象
# 🚀 性能优化收集所有需要获取的记忆ID然后批量获取
all_memory_ids = set()
path_to_memory_ids: dict[int, set[str]] = {} # path对象id -> 记忆ID集合
for path in paths:
# 收集路径中所有节点涉及的记忆
memory_ids_in_path = set()
# 收集路径中所有节点涉及的记忆
for node_id in path.nodes:
memory_ids = self.graph_store.node_to_memories.get(node_id, [])
memory_ids_in_path.update(memory_ids)
all_memory_ids.update(memory_ids_in_path)
path_to_memory_ids[id(path)] = memory_ids_in_path
# 将路径贡献给所有涉及的记忆
# 🚀 批量获取记忆对象如果graph_store支持批量获取
# 注意这里假设逐个获取如果有批量API可以进一步优化
memory_cache: dict[str, Any] = {}
for mem_id in all_memory_ids:
memory = self.graph_store.get_memory_by_id(mem_id)
if memory:
memory_cache[mem_id] = memory
# 构建映射关系
for path in paths:
memory_ids_in_path = path_to_memory_ids[id(path)]
for mem_id in memory_ids_in_path:
memory = self.graph_store.get_memory_by_id(mem_id)
if memory:
if mem_id in memory_cache:
if mem_id not in temp_paths:
temp_paths[mem_id] = []
temp_memories[mem_id] = memory
temp_memories[mem_id] = memory_cache[mem_id]
temp_paths[mem_id].append(path)
# 构建最终返回的字典
@@ -563,7 +736,7 @@ class PathScoreExpansion:
self, memory_paths: dict[str, tuple[Any, list[Path]]]
) -> list[tuple[Any, float, list[Path]]]:
"""
最终评分(结合路径分数、重要性、时效性 + 偏好类型加成)
最终评分(结合路径分数、重要性、时效性 + 偏好类型加成)- 优化版
Args:
memory_paths: 记忆ID到(记忆, 路径列表)的映射
@@ -572,7 +745,36 @@ class PathScoreExpansion:
[(Memory, final_score, paths), ...]
"""
scored_memories = []
# 🚀 性能优化:如果需要偏好类型加成,批量预加载所有节点的类型信息
node_type_cache: dict[str, str | None] = {}
if self.prefer_node_types:
# 收集所有需要查询的节点ID
all_node_ids = set()
for memory, _ in memory_paths.values():
memory_nodes = getattr(memory, "nodes", [])
for node in memory_nodes:
node_id = node.id if hasattr(node, "id") else str(node)
all_node_ids.add(node_id)
# 批量获取节点数据
if all_node_ids:
logger.debug(f"🔍 批量预加载 {len(all_node_ids)} 个节点的类型信息")
node_data_list = await asyncio.gather(
*[self.vector_store.get_node_by_id(nid) for nid in all_node_ids],
return_exceptions=True
)
# 构建类型缓存
for nid, node_data in zip(all_node_ids, node_data_list):
if isinstance(node_data, Exception) or not node_data or not isinstance(node_data, dict):
node_type_cache[nid] = None
else:
metadata = node_data.get("metadata", {})
node_type_cache[nid] = metadata.get("node_type")
# 遍历所有记忆进行评分
for mem_id, (memory, paths) in memory_paths.items():
# 1. 聚合路径分数
path_score = self._aggregate_path_scores(paths)
@@ -591,23 +793,18 @@ class PathScoreExpansion:
+ recency_score * weights["recency"]
)
# 🆕 5. 偏好类型加成(检查记忆中的节点类型
# 🆕 5. 偏好类型加成(使用缓存的类型信息
type_bonus = 0.0
if self.prefer_node_types:
# 获取记忆中的所有节点
if self.prefer_node_types and node_type_cache:
memory_nodes = getattr(memory, "nodes", [])
if memory_nodes:
# 计算匹配偏好类型的节点比例
# 使用缓存快速计算匹配数
matched_count = 0
for node in memory_nodes:
# 从 Node 对象提取 ID
node_id = node.id if hasattr(node, "id") else str(node)
node_data = await self.vector_store.get_node_by_id(node_id)
if node_data:
metadata = node_data.get("metadata", {})
node_type = metadata.get("node_type")
if node_type and node_type in self.prefer_node_types:
matched_count += 1
node_type = node_type_cache.get(node_id)
if node_type and node_type in self.prefer_node_types:
matched_count += 1
if matched_count > 0:
match_ratio = matched_count / len(memory_nodes)