feat:增强记忆节点的嵌入生成和日志记录- 在 MemoryBuilder 中为 SUBJECT 和 VALUE 节点类型添加了嵌入生成,确保仅为内容足够的节点创建嵌入。- 改进了 MemoryTools 的日志记录,在初始向量搜索期间提供详细见解,包括低召回情况的警告。- 调整了不同记忆类型的评分权重,以强调相似性和重要性,提高记忆检索的质量。- 将向量搜索限制从 2 倍提高到 5 倍,以改善初始召回率。- 引入了一个新脚本,用于为现有节点生成缺失的嵌入,支持批量处理并改进索引。

This commit is contained in:
Windpicker-owo
2025-11-11 19:25:03 +08:00
parent a8fe969c70
commit 8f668f18f7
5 changed files with 1296 additions and 189 deletions

View File

@@ -7,9 +7,10 @@
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from collections import defaultdict
import orjson
from fastapi import APIRouter, HTTPException, Request
from fastapi import APIRouter, HTTPException, Request, Query
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
@@ -227,6 +228,242 @@ async def get_full_graph():
return JSONResponse(content={"success": False, "error": str(e)}, status_code=500)
@router.get("/api/graph/summary")
async def get_graph_summary():
"""获取图的摘要信息(仅统计数据,不包含节点和边)"""
try:
from src.memory_graph.manager_singleton import get_memory_manager
memory_manager = get_memory_manager()
if memory_manager and memory_manager._initialized:
stats = memory_manager.get_statistics()
return JSONResponse(content={"success": True, "data": {
"stats": {
"total_nodes": stats.get("total_nodes", 0),
"total_edges": stats.get("total_edges", 0),
"total_memories": stats.get("total_memories", 0),
},
"current_file": "memory_manager (实时数据)",
}})
else:
data = load_graph_data_from_file()
return JSONResponse(content={"success": True, "data": {
"stats": data.get("stats", {}),
"current_file": data.get("current_file", ""),
}})
except Exception as e:
return JSONResponse(content={"success": False, "error": str(e)}, status_code=500)
@router.get("/api/graph/paginated")
async def get_paginated_graph(
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(500, ge=100, le=2000, description="每页节点数"),
min_importance: float = Query(0.0, ge=0.0, le=1.0, description="最小重要性阈值"),
node_types: Optional[str] = Query(None, description="节点类型过滤,逗号分隔"),
):
"""分页获取图数据,支持重要性过滤"""
try:
from src.memory_graph.manager_singleton import get_memory_manager
memory_manager = get_memory_manager()
# 获取完整数据
if memory_manager and memory_manager._initialized:
full_data = _format_graph_data_from_manager(memory_manager)
else:
full_data = load_graph_data_from_file()
nodes = full_data.get("nodes", [])
edges = full_data.get("edges", [])
# 过滤节点类型
if node_types:
allowed_types = set(node_types.split(","))
nodes = [n for n in nodes if n.get("group") in allowed_types]
# 按重要性排序如果有importance字段
nodes_with_importance = []
for node in nodes:
# 计算节点重要性(连接的边数)
edge_count = sum(1 for e in edges if e.get("from") == node["id"] or e.get("to") == node["id"])
importance = edge_count / max(len(edges), 1)
if importance >= min_importance:
node["importance"] = importance
nodes_with_importance.append(node)
# 按重要性降序排序
nodes_with_importance.sort(key=lambda x: x.get("importance", 0), reverse=True)
# 分页
total_nodes = len(nodes_with_importance)
total_pages = (total_nodes + page_size - 1) // page_size
start_idx = (page - 1) * page_size
end_idx = min(start_idx + page_size, total_nodes)
paginated_nodes = nodes_with_importance[start_idx:end_idx]
node_ids = set(n["id"] for n in paginated_nodes)
# 只保留连接分页节点的边
paginated_edges = [
e for e in edges
if e.get("from") in node_ids and e.get("to") in node_ids
]
return JSONResponse(content={"success": True, "data": {
"nodes": paginated_nodes,
"edges": paginated_edges,
"pagination": {
"page": page,
"page_size": page_size,
"total_nodes": total_nodes,
"total_pages": total_pages,
"has_next": page < total_pages,
"has_prev": page > 1,
},
"stats": {
"total_nodes": total_nodes,
"total_edges": len(paginated_edges),
"total_memories": full_data.get("stats", {}).get("total_memories", 0),
},
}})
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse(content={"success": False, "error": str(e)}, status_code=500)
@router.get("/api/graph/clustered")
async def get_clustered_graph(
max_nodes: int = Query(300, ge=50, le=1000, description="最大节点数"),
cluster_threshold: int = Query(10, ge=2, le=50, description="聚类阈值")
):
"""获取聚类简化后的图数据"""
try:
from src.memory_graph.manager_singleton import get_memory_manager
memory_manager = get_memory_manager()
# 获取完整数据
if memory_manager and memory_manager._initialized:
full_data = _format_graph_data_from_manager(memory_manager)
else:
full_data = load_graph_data_from_file()
nodes = full_data.get("nodes", [])
edges = full_data.get("edges", [])
# 如果节点数小于阈值,直接返回
if len(nodes) <= max_nodes:
return JSONResponse(content={"success": True, "data": {
"nodes": nodes,
"edges": edges,
"stats": full_data.get("stats", {}),
"clustered": False,
}})
# 执行聚类
clustered_data = _cluster_graph_data(nodes, edges, max_nodes, cluster_threshold)
return JSONResponse(content={"success": True, "data": {
**clustered_data,
"stats": {
"original_nodes": len(nodes),
"original_edges": len(edges),
"clustered_nodes": len(clustered_data["nodes"]),
"clustered_edges": len(clustered_data["edges"]),
"total_memories": full_data.get("stats", {}).get("total_memories", 0),
},
"clustered": True,
}})
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse(content={"success": False, "error": str(e)}, status_code=500)
def _cluster_graph_data(nodes: List[Dict], edges: List[Dict], max_nodes: int, cluster_threshold: int) -> Dict:
"""简单的图聚类算法:按类型和连接度聚类"""
# 构建邻接表
adjacency = defaultdict(set)
for edge in edges:
adjacency[edge["from"]].add(edge["to"])
adjacency[edge["to"]].add(edge["from"])
# 按类型分组
type_groups = defaultdict(list)
for node in nodes:
type_groups[node.get("group", "UNKNOWN")].append(node)
clustered_nodes = []
clustered_edges = []
node_mapping = {} # 原始节点ID -> 聚类节点ID
for node_type, type_nodes in type_groups.items():
# 如果该类型节点少于阈值,直接保留
if len(type_nodes) <= cluster_threshold:
for node in type_nodes:
clustered_nodes.append(node)
node_mapping[node["id"]] = node["id"]
else:
# 按连接度排序,保留最重要的节点
node_importance = []
for node in type_nodes:
importance = len(adjacency[node["id"]])
node_importance.append((node, importance))
node_importance.sort(key=lambda x: x[1], reverse=True)
# 保留前N个重要节点
keep_count = min(len(type_nodes), max_nodes // len(type_groups))
for node, importance in node_importance[:keep_count]:
clustered_nodes.append(node)
node_mapping[node["id"]] = node["id"]
# 其余节点聚合为一个超级节点
if len(node_importance) > keep_count:
clustered_node_ids = [n["id"] for n, _ in node_importance[keep_count:]]
cluster_id = f"cluster_{node_type}_{len(clustered_nodes)}"
cluster_label = f"{node_type} 集群 ({len(clustered_node_ids)}个节点)"
clustered_nodes.append({
"id": cluster_id,
"label": cluster_label,
"group": node_type,
"title": f"包含 {len(clustered_node_ids)}{node_type}节点",
"is_cluster": True,
"cluster_size": len(clustered_node_ids),
"clustered_nodes": clustered_node_ids[:10], # 只保留前10个用于展示
})
for node_id in clustered_node_ids:
node_mapping[node_id] = cluster_id
# 重建边(去重)
edge_set = set()
for edge in edges:
from_id = node_mapping.get(edge["from"])
to_id = node_mapping.get(edge["to"])
if from_id and to_id and from_id != to_id:
edge_key = tuple(sorted([from_id, to_id]))
if edge_key not in edge_set:
edge_set.add(edge_key)
clustered_edges.append({
"id": f"{from_id}_{to_id}",
"from": from_id,
"to": to_id,
"label": edge.get("label", ""),
"arrows": "to",
})
return {
"nodes": clustered_nodes,
"edges": clustered_edges,
}
@router.get("/api/files")
async def list_files_api():
"""列出所有可用的数据文件"""

File diff suppressed because it is too large Load Diff

View File

@@ -185,12 +185,19 @@ class MemoryBuilder:
logger.debug(f"复用已存在的主体节点: {existing.id}")
return existing
# 为主体和值节点生成嵌入向量(用于人名/实体和重要描述检索)
embedding = None
if node_type in (NodeType.SUBJECT, NodeType.VALUE):
# 只为有足够内容的节点生成嵌入(避免浪费)
if len(content.strip()) >= 2:
embedding = await self._generate_embedding(content)
# 创建新节点
node = MemoryNode(
id=self._generate_node_id(),
content=content,
node_type=node_type,
embedding=None, # 主体属性不需要嵌入
embedding=embedding, # 主体、值需要嵌入,属性不需要
metadata={"memory_ids": [memory_id]},
)

View File

@@ -516,6 +516,22 @@ class MemoryTools:
# 记录最高分数
if mem_id not in memory_scores or similarity > memory_scores[mem_id]:
memory_scores[mem_id] = similarity
# 🔥 详细日志:检查初始召回情况
logger.info(
f"初始向量搜索: 返回{len(similar_nodes)}个节点 → "
f"提取{len(initial_memory_ids)}条记忆"
)
if len(initial_memory_ids) == 0:
logger.warning(
f"⚠️ 向量搜索未找到任何记忆!"
f"可能原因1) 嵌入模型理解问题 2) 记忆节点未建立索引 3) 查询表达与存储内容差异过大"
)
# 输出相似节点的详细信息用于调试
if similar_nodes:
logger.debug(f"向量搜索返回的节点元数据样例: {similar_nodes[0][2] if len(similar_nodes) > 0 else 'None'}")
elif len(initial_memory_ids) < 3:
logger.warning(f"⚠️ 初始召回记忆数量较少({len(initial_memory_ids)}条),可能影响结果质量")
# 3. 图扩展如果启用且有expand_depth
expanded_memory_scores = {}
@@ -609,42 +625,37 @@ class MemoryTools:
if dominant_node_type in ["ATTRIBUTE", "REFERENCE"] or memory_type == "FACT":
# 事实性记忆(如文档地址、配置信息):语义相似度最重要
weights = {
"similarity": 0.65, # 语义相似度 65% ⬆️
"importance": 0.20, # 重要性 20%
"recency": 0.05, # 时效性 5% ⬇️(事实不随时间失效)
"activation": 0.10 # 激活度 10% ⬇️(避免冷门信息被压制)
"similarity": 0.70, # 语义相似度 70% ⬆️
"importance": 0.25, # 重要性 25% ⬆️
"recency": 0.05, # 时效性 5%(事实不随时间失效)
}
elif memory_type in ["CONVERSATION", "EPISODIC"] or dominant_node_type == "EVENT":
# 对话/事件记忆:时效性和激活度更重要
# 对话/事件记忆:时效性更重要
weights = {
"similarity": 0.45, # 语义相似度 45%
"importance": 0.15, # 重要性 15%
"recency": 0.20, # 时效性 20% ⬆️
"activation": 0.20 # 激活度 20%
"similarity": 0.55, # 语义相似度 55% ⬆️
"importance": 0.20, # 重要性 20% ⬆️
"recency": 0.25, # 时效性 25% ⬆️
}
elif dominant_node_type == "ENTITY" or memory_type == "SEMANTIC":
# 实体/语义记忆:平衡各项
weights = {
"similarity": 0.50, # 语义相似度 50%
"importance": 0.25, # 重要性 25%
"similarity": 0.60, # 语义相似度 60% ⬆️
"importance": 0.30, # 重要性 30% ⬆️
"recency": 0.10, # 时效性 10%
"activation": 0.15 # 激活度 15%
}
else:
# 默认权重(保守策略,偏向语义)
weights = {
"similarity": 0.55, # 语义相似度 55%
"importance": 0.20, # 重要性 20%
"similarity": 0.65, # 语义相似度 65% ⬆️
"importance": 0.25, # 重要性 25% ⬆️
"recency": 0.10, # 时效性 10%
"activation": 0.15 # 激活度 15%
}
# 综合分数计算
# 综合分数计算(🔥 移除激活度影响)
final_score = (
similarity_score * weights["similarity"] +
importance_score * weights["importance"] +
recency_score * weights["recency"] +
activation_score * weights["activation"]
recency_score * weights["recency"]
)
# 🆕 节点类型加权对REFERENCE/ATTRIBUTE节点额外加分促进事实性信息召回
@@ -943,11 +954,16 @@ class MemoryTools:
logger.warning("嵌入生成失败,跳过节点搜索")
return []
# 向量搜索
# 向量搜索(增加返回数量以提高召回率)
similar_nodes = await self.vector_store.search_similar_nodes(
query_embedding=query_embedding,
limit=top_k * 2, # 多取一些,后续过滤
limit=top_k * 5, # 🔥 从2倍提升到5倍提高初始召回率
min_similarity=0.0, # 不在这里过滤,交给后续评分
)
logger.debug(f"单查询向量搜索: 查询='{query}', 返回节点数={len(similar_nodes)}")
if similar_nodes:
logger.debug(f"Top 3相似度: {[f'{sim:.3f}' for _, sim, _ in similar_nodes[:3]]}")
return similar_nodes
@@ -1003,11 +1019,13 @@ class MemoryTools:
similar_nodes = await self.vector_store.search_with_multiple_queries(
query_embeddings=query_embeddings,
query_weights=query_weights,
limit=top_k * 2, # 多取一些,后续过滤
limit=top_k * 5, # 🔥 从2倍提升到5倍提高初始召回率
fusion_strategy="weighted_max",
)
logger.info(f"多查询检索完成: {len(similar_nodes)} 个节点 (偏好类型: {prefer_node_types})")
if similar_nodes:
logger.debug(f"Top 5融合相似度: {[f'{sim:.3f}' for _, sim, _ in similar_nodes[:5]]}")
return similar_nodes, prefer_node_types