chore: perform widespread code cleanup and formatting

Perform a comprehensive code cleanup across multiple modules to improve code quality, consistency, and maintainability.

Key changes include:
- Removing numerous unused imports.
- Standardizing import order.
- Eliminating trailing whitespace and inconsistent newlines.
- Updating legacy type hints to modern syntax (e.g., `List` -> `list`).
- Making minor improvements for code robustness and style.
This commit is contained in:
minecraft1024a
2025-11-15 17:12:46 +08:00
parent bd45899dce
commit 6f62073630
26 changed files with 109 additions and 117 deletions

View File

@@ -507,7 +507,7 @@ class PersistenceManager:
GraphStore 对象
"""
try:
async with aiofiles.open(input_file, "r", encoding="utf-8") as f:
async with aiofiles.open(input_file, encoding="utf-8") as f:
content = await f.read()
data = json.loads(content)

View File

@@ -98,7 +98,7 @@ class MemoryTools:
graph_store=graph_store,
embedding_generator=embedding_generator,
)
# 初始化路径扩展器(延迟初始化,仅在启用时创建)
self.path_expander: PathScoreExpansion | None = None
@@ -573,7 +573,7 @@ class MemoryTools:
# 检查是否启用路径扩展算法
use_path_expansion = getattr(global_config.memory, "enable_path_expansion", False) and expand_depth > 0
expanded_memory_scores = {}
if expand_depth > 0 and initial_memory_ids:
# 获取查询的embedding
query_embedding = None
@@ -582,12 +582,12 @@ class MemoryTools:
query_embedding = await self.builder.embedding_generator.generate(query)
except Exception as e:
logger.warning(f"生成查询embedding失败: {e}")
if query_embedding is not None:
if use_path_expansion:
# 🆕 使用路径评分扩展算法
logger.info(f"🔬 使用路径评分扩展算法: 初始{len(similar_nodes)}个节点, 深度={expand_depth}")
# 延迟初始化路径扩展器
if self.path_expander is None:
path_config = PathExpansionConfig(
@@ -607,7 +607,7 @@ class MemoryTools:
vector_store=self.vector_store,
config=path_config
)
try:
# 执行路径扩展(传递偏好类型)
path_results = await self.path_expander.expand_with_path_scoring(
@@ -616,11 +616,11 @@ class MemoryTools:
top_k=top_k,
prefer_node_types=all_prefer_types # 🆕 传递偏好类型
)
# 路径扩展返回的是 [(Memory, final_score, paths), ...]
# 我们需要直接返回这些记忆,跳过后续的传统评分
logger.info(f"✅ 路径扩展返回 {len(path_results)} 条记忆")
# 直接构建返回结果
path_memories = []
for memory, score, paths in path_results:
@@ -635,25 +635,25 @@ class MemoryTools:
"max_path_depth": max(p.depth for p in paths) if paths else 0
}
})
logger.info(f"🎯 路径扩展最终返回: {len(path_memories)} 条记忆")
return {
"success": True,
"results": path_memories,
"total": len(path_memories),
"expansion_method": "path_scoring"
}
except Exception as e:
logger.error(f"路径扩展失败: {e}", exc_info=True)
logger.info("回退到传统图扩展算法")
# 继续执行下面的传统图扩展
# 传统图扩展(仅在未启用路径扩展或路径扩展失败时执行)
if not use_path_expansion or expanded_memory_scores == {}:
logger.info(f"开始传统图扩展: 初始记忆{len(initial_memory_ids)}个, 深度={expand_depth}")
try:
# 使用共享的图扩展工具函数
expanded_results = await expand_memories_with_semantic_filter(

View File

@@ -9,10 +9,10 @@ from src.memory_graph.utils.time_parser import TimeParser
__all__ = [
"EmbeddingGenerator",
"Path",
"PathExpansionConfig",
"PathScoreExpansion",
"TimeParser",
"cosine_similarity",
"get_embedding_generator",
"PathScoreExpansion",
"PathExpansionConfig",
"Path",
]

View File

@@ -12,7 +12,7 @@ from src.common.logger import get_logger
from src.memory_graph.utils.similarity import cosine_similarity
if TYPE_CHECKING:
from src.memory_graph.models import Memory
pass
logger = get_logger(__name__)
@@ -41,52 +41,52 @@ async def deduplicate_memories_by_similarity(
"""
if len(memories) <= 1:
return memories
logger.info(f"开始记忆去重: {len(memories)} 条记忆 (阈值={similarity_threshold})")
# 准备数据结构
memory_embeddings = []
for memory, score, extra in memories:
# 获取记忆的向量表示
embedding = await _get_memory_embedding(memory)
memory_embeddings.append((memory, score, extra, embedding))
# 构建相似度矩阵并找出重复组
duplicate_groups = _find_duplicate_groups(memory_embeddings, similarity_threshold)
# 合并每个重复组
deduplicated = []
processed_indices = set()
for group_indices in duplicate_groups:
if any(i in processed_indices for i in group_indices):
continue # 已经处理过
# 标记为已处理
processed_indices.update(group_indices)
# 合并组内记忆
group_memories = [memory_embeddings[i] for i in group_indices]
merged_memory = _merge_memory_group(group_memories)
deduplicated.append(merged_memory)
# 添加未被合并的记忆
for i, (memory, score, extra, _) in enumerate(memory_embeddings):
if i not in processed_indices:
deduplicated.append((memory, score, extra))
# 按分数排序
deduplicated.sort(key=lambda x: x[1], reverse=True)
# 限制数量
if keep_top_n is not None:
deduplicated = deduplicated[:keep_top_n]
logger.info(
f"去重完成: {len(memories)}{len(deduplicated)} 条记忆 "
f"(合并了 {len(memories) - len(deduplicated)} 条重复)"
)
return deduplicated
@@ -104,7 +104,7 @@ async def _get_memory_embedding(memory: Any) -> list[float] | None:
# nodes 是 MemoryNode 对象列表
first_node = memory.nodes[0]
node_id = getattr(first_node, "id", None)
if node_id:
# 直接从 embedding 属性获取(如果存在)
if hasattr(first_node, "embedding") and first_node.embedding is not None:
@@ -114,7 +114,7 @@ async def _get_memory_embedding(memory: Any) -> list[float] | None:
return embedding.tolist()
elif isinstance(embedding, list):
return embedding
# 无法获取 embedding
return None
@@ -132,13 +132,13 @@ def _find_duplicate_groups(
"""
n = len(memory_embeddings)
similarity_matrix = [[0.0] * n for _ in range(n)]
# 计算相似度矩阵
for i in range(n):
for j in range(i + 1, n):
embedding_i = memory_embeddings[i][3]
embedding_j = memory_embeddings[j][3]
# 跳过 None 或零向量
if (embedding_i is None or embedding_j is None or
all(x == 0.0 for x in embedding_i) or all(x == 0.0 for x in embedding_j)):
@@ -146,29 +146,29 @@ def _find_duplicate_groups(
else:
# cosine_similarity 会自动转换为 numpy 数组
similarity = float(cosine_similarity(embedding_i, embedding_j)) # type: ignore
similarity_matrix[i][j] = similarity
similarity_matrix[j][i] = similarity
# 使用并查集找出连通分量
parent = list(range(n))
def find(x):
if parent[x] != x:
parent[x] = find(parent[x])
return parent[x]
def union(x, y):
px, py = find(x), find(y)
if px != py:
parent[px] = py
# 合并相似的记忆
for i in range(n):
for j in range(i + 1, n):
if similarity_matrix[i][j] >= threshold:
union(i, j)
# 构建组
groups_dict: dict[int, list[int]] = {}
for i in range(n):
@@ -176,10 +176,10 @@ def _find_duplicate_groups(
if root not in groups_dict:
groups_dict[root] = []
groups_dict[root].append(i)
# 只返回大小 > 1 的组(真正的重复组)
duplicate_groups = [group for group in groups_dict.values() if len(group) > 1]
return duplicate_groups
@@ -196,10 +196,10 @@ def _merge_memory_group(
"""
# 按分数排序
sorted_group = sorted(group, key=lambda x: x[1], reverse=True)
# 保留分数最高的记忆
best_memory, best_score, best_extra, _ = sorted_group[0]
# 计算合并后的分数(加权平均,权重递减)
total_weight = 0.0
weighted_sum = 0.0
@@ -207,17 +207,17 @@ def _merge_memory_group(
weight = 1.0 / (i + 1) # 第1名权重1.0第2名0.5第3名0.33...
weighted_sum += score * weight
total_weight += weight
merged_score = weighted_sum / total_weight if total_weight > 0 else best_score
# 增强 extra_data
merged_extra = best_extra if isinstance(best_extra, dict) else {}
merged_extra["merged_count"] = len(sorted_group)
merged_extra["original_scores"] = [score for _, score, _, _ in sorted_group]
logger.debug(
f"合并 {len(sorted_group)} 条相似记忆: "
f"分数 {best_score:.3f}{merged_score:.3f}"
)
return (best_memory, merged_score, merged_extra)

View File

@@ -26,7 +26,6 @@ from src.memory_graph.utils.similarity import cosine_similarity
if TYPE_CHECKING:
import numpy as np
from src.memory_graph.models import Memory
from src.memory_graph.storage.graph_store import GraphStore
from src.memory_graph.storage.vector_store import VectorStore
@@ -71,7 +70,7 @@ class PathExpansionConfig:
medium_score_threshold: float = 0.4 # 中分路径阈值
max_active_paths: int = 1000 # 最大活跃路径数(防止爆炸)
top_paths_retain: int = 500 # 超限时保留的top路径数
# 🚀 性能优化参数
enable_early_stop: bool = True # 启用早停(如果路径增长很少则提前结束)
early_stop_growth_threshold: float = 0.1 # 早停阈值路径增长率低于10%则停止)
@@ -121,7 +120,7 @@ class PathScoreExpansion:
self.vector_store = vector_store
self.config = config or PathExpansionConfig()
self.prefer_node_types: list[str] = [] # 🆕 偏好节点类型
# 🚀 性能优化:邻居边缓存
self._neighbor_cache: dict[str, list[Any]] = {}
self._node_score_cache: dict[str, float] = {}
@@ -212,11 +211,11 @@ class PathScoreExpansion:
continue
edge_weight = self._get_edge_weight(edge)
# 记录候选
path_candidates.append((path, edge, next_node, edge_weight))
candidate_nodes_for_batch.add(next_node)
branch_count += 1
if branch_count >= max_branches:
break
@@ -281,7 +280,7 @@ class PathScoreExpansion:
# 🚀 早停检测:如果路径增长很少,提前终止
prev_path_count = len(active_paths)
active_paths = next_paths
if self.config.enable_early_stop and prev_path_count > 0:
growth_rate = (len(active_paths) - prev_path_count) / prev_path_count
if growth_rate < self.config.early_stop_growth_threshold:
@@ -346,18 +345,18 @@ class PathScoreExpansion:
max_path_score = max(p.score for p in paths) if paths else 0
rough_score = len(paths) * max_path_score * memory.importance
memory_scores_rough.append((mem_id, rough_score))
# 保留top候选
memory_scores_rough.sort(key=lambda x: x[1], reverse=True)
retained_mem_ids = set(mem_id for mem_id, _ in memory_scores_rough[:self.config.max_candidate_memories])
# 过滤
memory_paths = {
mem_id: (memory, paths)
for mem_id, (memory, paths) in memory_paths.items()
if mem_id in retained_mem_ids
}
logger.info(
f"⚡ 粗排过滤: {len(memory_scores_rough)}{len(memory_paths)} 条候选记忆"
)
@@ -398,7 +397,7 @@ class PathScoreExpansion:
# 🚀 缓存检查
if node_id in self._neighbor_cache:
return self._neighbor_cache[node_id]
edges = []
# 从图存储中获取与该节点相关的所有边
@@ -454,7 +453,7 @@ class PathScoreExpansion:
"""
# 从向量存储获取节点数据
node_data = await self.vector_store.get_node_by_id(node_id)
if query_embedding is None:
base_score = 0.5 # 默认中等分数
else:
@@ -493,27 +492,27 @@ class PathScoreExpansion:
import numpy as np
scores = {}
if query_embedding is None:
# 无查询向量时,返回默认分数
return {nid: 0.5 for nid in node_ids}
return dict.fromkeys(node_ids, 0.5)
# 批量获取节点数据
node_data_list = await asyncio.gather(
*[self.vector_store.get_node_by_id(nid) for nid in node_ids],
return_exceptions=True
)
# 收集有效的嵌入向量
valid_embeddings = []
valid_node_ids = []
node_metadata_map = {}
for nid, node_data in zip(node_ids, node_data_list):
if isinstance(node_data, Exception):
scores[nid] = 0.3
continue
# 类型守卫:确保 node_data 是字典
if not node_data or not isinstance(node_data, dict) or "embedding" not in node_data:
scores[nid] = 0.3
@@ -521,21 +520,21 @@ class PathScoreExpansion:
valid_embeddings.append(node_data["embedding"])
valid_node_ids.append(nid)
node_metadata_map[nid] = node_data.get("metadata", {})
if valid_embeddings:
# 批量计算相似度(使用矩阵运算)
embeddings_matrix = np.array(valid_embeddings)
query_norm = np.linalg.norm(query_embedding)
embeddings_norms = np.linalg.norm(embeddings_matrix, axis=1)
# 向量化计算余弦相似度
similarities = np.dot(embeddings_matrix, query_embedding) / (embeddings_norms * query_norm + 1e-8)
similarities = np.clip(similarities, 0.0, 1.0)
# 应用偏好类型加成
for nid, sim in zip(valid_node_ids, similarities):
base_score = float(sim)
# 偏好类型加成
if self.prefer_node_types and nid in node_metadata_map:
node_type = node_metadata_map[nid].get("node_type")
@@ -546,7 +545,7 @@ class PathScoreExpansion:
scores[nid] = base_score
else:
scores[nid] = base_score
return scores
def _calculate_path_score(self, old_score: float, edge_weight: float, node_score: float, depth: int) -> float:
@@ -689,19 +688,19 @@ class PathScoreExpansion:
# 使用临时字典存储路径列表
temp_paths: dict[str, list[Path]] = {}
temp_memories: dict[str, Any] = {} # 存储 Memory 对象
# 🚀 性能优化收集所有需要获取的记忆ID然后批量获取
all_memory_ids = set()
path_to_memory_ids: dict[int, set[str]] = {} # path对象id -> 记忆ID集合
for path in paths:
memory_ids_in_path = set()
# 收集路径中所有节点涉及的记忆
for node_id in path.nodes:
memory_ids = self.graph_store.node_to_memories.get(node_id, [])
memory_ids_in_path.update(memory_ids)
all_memory_ids.update(memory_ids_in_path)
path_to_memory_ids[id(path)] = memory_ids_in_path
@@ -712,11 +711,11 @@ class PathScoreExpansion:
memory = self.graph_store.get_memory_by_id(mem_id)
if memory:
memory_cache[mem_id] = memory
# 构建映射关系
for path in paths:
memory_ids_in_path = path_to_memory_ids[id(path)]
for mem_id in memory_ids_in_path:
if mem_id in memory_cache:
if mem_id not in temp_paths:
@@ -745,10 +744,10 @@ class PathScoreExpansion:
[(Memory, final_score, paths), ...]
"""
scored_memories = []
# 🚀 性能优化:如果需要偏好类型加成,批量预加载所有节点的类型信息
node_type_cache: dict[str, str | None] = {}
if self.prefer_node_types:
# 收集所有需要查询的节点ID
all_node_ids = set()
@@ -757,7 +756,7 @@ class PathScoreExpansion:
for node in memory_nodes:
node_id = node.id if hasattr(node, "id") else str(node)
all_node_ids.add(node_id)
# 批量获取节点数据
if all_node_ids:
logger.debug(f"🔍 批量预加载 {len(all_node_ids)} 个节点的类型信息")
@@ -765,7 +764,7 @@ class PathScoreExpansion:
*[self.vector_store.get_node_by_id(nid) for nid in all_node_ids],
return_exceptions=True
)
# 构建类型缓存
for nid, node_data in zip(all_node_ids, node_data_list):
if isinstance(node_data, Exception) or not node_data or not isinstance(node_data, dict):
@@ -805,7 +804,7 @@ class PathScoreExpansion:
node_type = node_type_cache.get(node_id)
if node_type and node_type in self.prefer_node_types:
matched_count += 1
if matched_count > 0:
match_ratio = matched_count / len(memory_nodes)
# 根据匹配比例给予加成最高10%
@@ -870,4 +869,4 @@ class PathScoreExpansion:
return recency_score
__all__ = ["PathScoreExpansion", "PathExpansionConfig", "Path"]
__all__ = ["Path", "PathExpansionConfig", "PathScoreExpansion"]