feat(memory): 更新记忆管理和配置,优化整合逻辑,添加语义相似度阈值

This commit is contained in:
Windpicker-owo
2025-11-06 23:56:18 +08:00
parent a2ce020099
commit 023fab73a5
5 changed files with 579 additions and 136 deletions

View File

@@ -25,6 +25,10 @@ from src.memory_graph.storage.vector_store import VectorStore
from src.memory_graph.tools.memory_tools import MemoryTools
from src.memory_graph.utils.embeddings import EmbeddingGenerator
import uuid
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import numpy as np
logger = logging.getLogger(__name__)
@@ -135,14 +139,16 @@ class MemoryManager:
# 检查配置值
expand_depth = self.config.search_max_expand_depth
logger.info(f"📊 配置检查: search_max_expand_depth={expand_depth}")
expand_semantic_threshold = self.config.search_expand_semantic_threshold
logger.info(f"📊 配置检查: search_max_expand_depth={expand_depth}, search_expand_semantic_threshold={expand_semantic_threshold}")
self.tools = MemoryTools(
vector_store=self.vector_store,
graph_store=self.graph_store,
persistence_manager=self.persistence,
embedding_generator=self.embedding_generator,
max_expand_depth=expand_depth, # 从配置读取图扩展深度
expand_semantic_threshold=expand_semantic_threshold, # 从配置读取图扩展语义阈值
)
self._initialized = True
@@ -977,135 +983,227 @@ class MemoryManager:
) -> Dict[str, Any]:
"""
整理记忆:直接合并去重相似记忆(不创建新边)
优化点
1. 添加批量限制,避免长时间阻塞
2. 相似记忆直接覆盖合并,不创建关联边
3. 使用 asyncio.sleep 让出控制权,避免阻塞事件循环
性能优化版本
1. 使用 asyncio.create_task 在后台执行,避免阻塞主流程
2. 向量计算批量处理,减少重复计算
3. 延迟保存,批量写入数据库
4. 更频繁的协作式多任务让出
Args:
similarity_threshold: 相似度阈值默认0.85建议提高到0.9减少误判)
time_window_hours: 时间窗口(小时)
max_batch_size: 单次最多处理的记忆数量
Returns:
整理结果
整理结果(如果是异步执行,返回启动状态)
"""
if not self._initialized:
await self.initialize()
try:
logger.info(f"开始记忆整理 (similarity_threshold={similarity_threshold}, time_window={time_window_hours}h, max_batch={max_batch_size})...")
logger.info(f"🚀 启动记忆整理任务 (similarity_threshold={similarity_threshold}, time_window={time_window_hours}h, max_batch={max_batch_size})...")
# 创建后台任务执行整理
task = asyncio.create_task(
self._consolidate_memories_background(
similarity_threshold=similarity_threshold,
time_window_hours=time_window_hours,
max_batch_size=max_batch_size
)
)
# 返回任务启动状态,不等待完成
return {
"task_started": True,
"task_id": id(task),
"message": "记忆整理任务已在后台启动"
}
except Exception as e:
logger.error(f"启动记忆整理任务失败: {e}", exc_info=True)
return {"error": str(e), "task_started": False}
async def _consolidate_memories_background(
self,
similarity_threshold: float,
time_window_hours: float,
max_batch_size: int,
) -> None:
"""
后台执行记忆整理的具体实现
这个方法会在独立任务中运行,不阻塞主流程
"""
try:
result = {
"merged_count": 0,
"checked_count": 0,
"skipped_count": 0,
}
# 获取最近创建的记忆
cutoff_time = datetime.now() - timedelta(hours=time_window_hours)
all_memories = self.graph_store.get_all_memories()
recent_memories = [
mem for mem in all_memories
if mem.created_at >= cutoff_time and not mem.metadata.get("forgotten", False)
]
if not recent_memories:
logger.info("没有需要整理的记忆")
return result
logger.info("✅ 记忆整理完成: 没有需要整理的记忆")
return
# 限制批量处理数量
if len(recent_memories) > max_batch_size:
logger.info(f"记忆数量 {len(recent_memories)} 超过批量限制 {max_batch_size},仅处理最新的 {max_batch_size}")
logger.info(f"📊 记忆数量 {len(recent_memories)} 超过批量限制 {max_batch_size},仅处理最新的 {max_batch_size}")
recent_memories = sorted(recent_memories, key=lambda m: m.created_at, reverse=True)[:max_batch_size]
result["skipped_count"] = len(all_memories) - max_batch_size
logger.info(f"找到 {len(recent_memories)} 条待整理记忆")
logger.info(f"📋 找到 {len(recent_memories)} 条待整理记忆")
result["checked_count"] = len(recent_memories)
# 按记忆类型分组
# 按记忆类型分组,减少跨类型比较
memories_by_type: Dict[str, List[Memory]] = {}
for mem in recent_memories:
mem_type = mem.metadata.get("memory_type", "")
if mem_type not in memories_by_type:
memories_by_type[mem_type] = []
memories_by_type[mem_type].append(mem)
# 记录删除的记忆ID避免重复处理
# 记录需要删除的记忆,延迟批量删除
to_delete: List[Tuple[Memory, str]] = [] # (memory, reason)
deleted_ids = set()
# 对每个类型的记忆进行相似度检测
for mem_type, memories in memories_by_type.items():
if len(memories) < 2:
continue
logger.debug(f"检查类型 '{mem_type}'{len(memories)} 条记忆")
# 使用向量相似度检测
for i in range(len(memories)):
# 让出控制权,避免长时间阻塞
if i % 10 == 0:
await asyncio.sleep(0)
if memories[i].id in deleted_ids:
logger.debug(f"🔍 检查类型 '{mem_type}'{len(memories)} 条记忆")
# 预提取所有主题节点的嵌入向量
embeddings_map: Dict[str, "np.ndarray"] = {}
valid_memories = []
for mem in memories:
topic_node = next((n for n in mem.nodes if n.node_type == NodeType.TOPIC), None)
if topic_node and topic_node.embedding is not None:
embeddings_map[mem.id] = topic_node.embedding
valid_memories.append(mem)
# 批量计算相似度矩阵(比逐个计算更高效)
import numpy as np
for i in range(len(valid_memories)):
# 更频繁的协作式多任务让出
if i % 5 == 0:
await asyncio.sleep(0.001) # 1ms让出
mem_i = valid_memories[i]
if mem_i.id in deleted_ids:
continue
for j in range(i + 1, len(memories)):
if memories[j].id in deleted_ids:
for j in range(i + 1, len(valid_memories)):
if valid_memories[j].id in deleted_ids:
continue
mem_i = memories[i]
mem_j = memories[j]
# 获取主题节点的向量
topic_i = next((n for n in mem_i.nodes if n.node_type == NodeType.TOPIC), None)
topic_j = next((n for n in mem_j.nodes if n.node_type == NodeType.TOPIC), None)
if not topic_i or not topic_j:
continue
if topic_i.embedding is None or topic_j.embedding is None:
continue
# 计算余弦相似度
import numpy as np
similarity = np.dot(topic_i.embedding, topic_j.embedding) / (
np.linalg.norm(topic_i.embedding) * np.linalg.norm(topic_j.embedding)
)
mem_j = valid_memories[j]
# 快速向量相似度计算
embedding_i = embeddings_map[mem_i.id]
embedding_j = embeddings_map[mem_j.id]
# 优化的余弦相似度计算
similarity = self._fast_cosine_similarity(embedding_i, embedding_j)
if similarity >= similarity_threshold:
# 直接去重:保留重要性高的,删除另一个(不创建关联边)
# 决定保留哪个记忆
if mem_i.importance >= mem_j.importance:
keep_mem, remove_mem = mem_i, mem_j
else:
keep_mem, remove_mem = mem_j, mem_i
logger.info(
f"去重相似记忆 (similarity={similarity:.3f}): "
f"保留 {keep_mem.id}, 删除 {remove_mem.id}"
logger.debug(
f"🔄 标记相似记忆 (similarity={similarity:.3f}): "
f"保留 {keep_mem.id[:8]}, 删除 {remove_mem.id[:8]}"
)
# 增强保留记忆的重要性(合并信息价值)
# 增强保留记忆的重要性
keep_mem.importance = min(1.0, keep_mem.importance + 0.05)
keep_mem.activation = min(1.0, keep_mem.activation + 0.05)
# 将被删除记忆的访问次数累加到保留记忆
# 累加访问次数
if hasattr(keep_mem, 'access_count') and hasattr(remove_mem, 'access_count'):
keep_mem.access_count += remove_mem.access_count
# 直接删除相似记忆(不创建边,简化图结构
await self.delete_memory(remove_mem.id)
# 标记为待删除(不立即删除
to_delete.append((remove_mem, f"与记忆 {keep_mem.id[:8]} 相似度 {similarity:.3f}"))
deleted_ids.add(remove_mem.id)
result["merged_count"] += 1
logger.info(f"记忆整理完成: {result}")
return result
# 每处理完一个类型就让出控制权
await asyncio.sleep(0.005) # 5ms让出
# 批量删除标记的记忆
if to_delete:
logger.info(f"🗑️ 开始批量删除 {len(to_delete)} 条相似记忆")
for memory, reason in to_delete:
try:
# 从向量存储删除节点
for node in memory.nodes:
if node.embedding is not None:
await self.vector_store.delete_node(node.id)
# 从图存储删除记忆
self.graph_store.remove_memory(memory.id)
except Exception as e:
logger.warning(f"删除记忆 {memory.id[:8]} 失败: {e}")
# 批量保存一次性写入减少I/O
await self.persistence.save_graph_store(self.graph_store)
logger.info(f"💾 批量保存完成")
logger.info(f"✅ 记忆整理完成: {result}")
except Exception as e:
logger.error(f"记忆整理失败: {e}", exc_info=True)
return {"error": str(e), "merged_count": 0, "checked_count": 0}
logger.error(f"记忆整理失败: {e}", exc_info=True)
def _fast_cosine_similarity(self, vec1: "np.ndarray", vec2: "np.ndarray") -> float:
"""
快速余弦相似度计算(优化版本)
Args:
vec1, vec2: 向量
Returns:
余弦相似度
"""
try:
import numpy as np
# 避免重复的类型检查和转换
# 向量应该是numpy数组如果不是转换一次
if not isinstance(vec1, np.ndarray):
vec1 = np.asarray(vec1, dtype=np.float32)
if not isinstance(vec2, np.ndarray):
vec2 = np.asarray(vec2, dtype=np.float32)
# 使用更高效的范数计算
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
# 直接计算点积和除法
return float(np.dot(vec1, vec2) / (norm1 * norm2))
except Exception as e:
logger.warning(f"计算余弦相似度失败: {e}")
return 0.0
async def auto_link_memories(
self,
@@ -1478,14 +1576,14 @@ class MemoryManager:
async def maintenance(self) -> Dict[str, Any]:
"""
执行维护任务
执行维护任务(优化版本)
包括:
- 记忆整理(合并相似记忆
- 清理过期记忆
- 记忆整理(异步后台执行
- 自动关联记忆(轻量级执行)
- 自动遗忘低激活度记忆
- 保存数据
Returns:
维护结果
"""
@@ -1493,52 +1591,355 @@ class MemoryManager:
await self.initialize()
try:
logger.info("开始执行记忆系统维护...")
logger.info("🔧 开始执行记忆系统维护(优化版)...")
result = {
"consolidated": 0,
"consolidation_task": "none",
"linked": 0,
"forgotten": 0,
"deleted": 0,
"saved": False,
"total_time": 0,
}
# 1. 记忆整理(合并相似记忆)
# 默认禁用自动整理,因为可能阻塞主流程
# 建议提高阈值到0.92以上,减少误判;限制批量大小避免阻塞
start_time = datetime.now()
# 1. 记忆整理(异步后台执行,不阻塞主流程)
if getattr(self.config, 'consolidation_enabled', False):
logger.info("🚀 启动异步记忆整理任务...")
consolidate_result = await self.consolidate_memories(
similarity_threshold=getattr(self.config, 'consolidation_similarity_threshold', 0.92),
time_window_hours=getattr(self.config, 'consolidation_time_window_hours', 24.0),
max_batch_size=getattr(self.config, 'consolidation_max_batch_size', 50)
similarity_threshold=getattr(self.config, 'consolidation_deduplication_threshold', 0.93),
time_window_hours=getattr(self.config, 'consolidation_time_window_hours', 2.0), # 统一时间窗口
max_batch_size=getattr(self.config, 'consolidation_max_batch_size', 30)
)
result["consolidated"] = consolidate_result.get("merged_count", 0)
# 2. 自动关联记忆(发现和建立关系)
if getattr(self.config, 'auto_link_enabled', True):
link_result = await self.auto_link_memories()
if consolidate_result.get("task_started"):
result["consolidation_task"] = f"background_task_{consolidate_result.get('task_id', 'unknown')}"
logger.info("✅ 记忆整理任务已启动到后台执行")
else:
result["consolidation_task"] = "failed"
logger.warning("❌ 记忆整理任务启动失败")
# 2. 自动关联记忆(使用统一的时间窗口)
if getattr(self.config, 'consolidation_linking_enabled', True):
logger.info("🔗 执行轻量级自动关联...")
link_result = await self._lightweight_auto_link_memories()
result["linked"] = link_result.get("linked_count", 0)
# 3. 自动遗忘
# 3. 自动遗忘(快速执行)
if getattr(self.config, 'forgetting_enabled', True):
logger.info("🗑️ 执行自动遗忘...")
forgotten_count = await self.auto_forget_memories(
threshold=getattr(self.config, 'forgetting_activation_threshold', 0.1)
)
result["forgotten"] = forgotten_count
# 4. 清理非常旧的已遗忘记忆(可选
# TODO: 实现清理逻辑
# 5. 保存数据
await self.persistence.save_graph_store(self.graph_store)
result["saved"] = True
# 4. 保存数据(如果记忆整理不在后台执行
if result["consolidation_task"] == "none":
await self.persistence.save_graph_store(self.graph_store)
result["saved"] = True
logger.info("💾 数据保存完成")
self._last_maintenance = datetime.now()
logger.info(f"维护完成: {result}")
# 计算维护耗时
total_time = (datetime.now() - start_time).total_seconds()
result["total_time"] = total_time
logger.info(f"✅ 维护完成 (耗时 {total_time:.2f}s): {result}")
return result
except Exception as e:
logger.error(f"维护失败: {e}", exc_info=True)
return {"error": str(e)}
logger.error(f"维护失败: {e}", exc_info=True)
return {"error": str(e), "total_time": 0}
async def _lightweight_auto_link_memories(
self,
time_window_hours: float = None, # 从配置读取
max_candidates: int = None, # 从配置读取
max_memories: int = None, # 从配置读取
) -> Dict[str, Any]:
"""
智能轻量级自动关联记忆保留LLM判断优化性能
优化策略:
1. 从配置读取处理参数,尊重用户设置
2. 使用向量相似度预筛选仅对高相似度记忆调用LLM
3. 批量LLM调用减少网络开销
4. 异步执行,避免阻塞
"""
try:
result = {
"checked_count": 0,
"linked_count": 0,
"llm_calls": 0,
}
# 从配置读取参数,使用统一的时间窗口
if time_window_hours is None:
time_window_hours = getattr(self.config, 'consolidation_time_window_hours', 2.0)
if max_candidates is None:
max_candidates = getattr(self.config, 'consolidation_linking_max_candidates', 10)
if max_memories is None:
max_memories = getattr(self.config, 'consolidation_linking_max_memories', 20)
# 获取用户配置时间窗口内的记忆
time_threshold = datetime.now() - timedelta(hours=time_window_hours)
all_memories = self.graph_store.get_all_memories()
recent_memories = [
mem for mem in all_memories
if mem.created_at >= time_threshold
and not mem.metadata.get("forgotten", False)
and mem.importance >= getattr(self.config, 'consolidation_linking_min_importance', 0.5) # 从配置读取重要性阈值
]
if len(recent_memories) > max_memories:
recent_memories = sorted(recent_memories, key=lambda m: m.created_at, reverse=True)[:max_memories]
if len(recent_memories) < 2:
logger.debug("记忆数量不足,跳过智能关联")
return result
logger.debug(f"🧠 智能关联: 检查 {len(recent_memories)} 条重要记忆")
# 第一步:向量相似度预筛选,找到潜在关联对
candidate_pairs = []
import numpy as np
for i, memory in enumerate(recent_memories):
# 获取主题节点
topic_node = next(
(n for n in memory.nodes if n.node_type == NodeType.TOPIC),
None
)
if not topic_node or topic_node.embedding is None:
continue
# 与其他记忆计算相似度
for j, other_memory in enumerate(recent_memories[i+1:], i+1):
other_topic = next(
(n for n in other_memory.nodes if n.node_type == NodeType.TOPIC),
None
)
if not other_topic or other_topic.embedding is None:
continue
# 快速相似度计算
similarity = self._fast_cosine_similarity(
topic_node.embedding,
other_topic.embedding
)
# 使用配置的预筛选阈值
pre_filter_threshold = getattr(self.config, 'consolidation_linking_pre_filter_threshold', 0.7)
if similarity >= pre_filter_threshold:
candidate_pairs.append((memory, other_memory, similarity))
# 让出控制权
if i % 3 == 0:
await asyncio.sleep(0.001)
logger.debug(f"🔍 预筛选找到 {len(candidate_pairs)} 个候选关联对")
if not candidate_pairs:
return result
# 第二步批量LLM分析使用配置的最大候选对数
max_pairs_for_llm = getattr(self.config, 'consolidation_linking_max_pairs_for_llm', 5)
if len(candidate_pairs) <= max_pairs_for_llm:
link_relations = await self._batch_analyze_memory_relations(candidate_pairs)
result["llm_calls"] = 1
# 第三步建立LLM确认的关联
for relation_info in link_relations:
try:
memory_a, memory_b = relation_info["memory_pair"]
relation_type = relation_info["relation_type"]
confidence = relation_info["confidence"]
# 创建关联边
edge = MemoryEdge(
id=f"smart_edge_{uuid.uuid4().hex[:12]}",
source_id=memory_a.subject_id,
target_id=memory_b.subject_id,
relation=relation_type,
edge_type=EdgeType.RELATION,
importance=confidence,
metadata={
"auto_linked": True,
"method": "llm_analyzed",
"vector_similarity": relation_info.get("vector_similarity", 0.0),
"confidence": confidence,
"reasoning": relation_info.get("reasoning", ""),
"created_at": datetime.now().isoformat(),
}
)
# 添加到图
self.graph_store.graph.add_edge(
edge.source_id,
edge.target_id,
edge_id=edge.id,
relation=edge.relation,
edge_type=edge.edge_type.value,
importance=edge.importance,
metadata=edge.metadata,
)
memory_a.edges.append(edge)
result["linked_count"] += 1
logger.debug(f"🧠 智能关联: {memory_a.id[:8]} --[{relation_type}]--> {memory_b.id[:8]} (置信度={confidence:.2f})")
except Exception as e:
logger.warning(f"建立智能关联失败: {e}")
continue
# 保存关联结果
if result["linked_count"] > 0:
await self.persistence.save_graph_store(self.graph_store)
logger.debug(f"✅ 智能关联完成: 建立了 {result['linked_count']} 个关联LLM调用 {result['llm_calls']}")
return result
except Exception as e:
logger.error(f"智能关联失败: {e}", exc_info=True)
return {"error": str(e), "checked_count": 0, "linked_count": 0}
async def _batch_analyze_memory_relations(
self,
candidate_pairs: List[Tuple[Memory, Memory, float]]
) -> List[Dict[str, Any]]:
"""
批量分析记忆关系优化LLM调用
Args:
candidate_pairs: 候选记忆对列表,每项包含 (memory_a, memory_b, vector_similarity)
Returns:
关系分析结果列表
"""
try:
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
llm = LLMRequest(
model_set=model_config.model_task_config.utils_small,
request_type="memory.batch_relation_analysis"
)
# 格式化所有候选记忆对
candidates_text = ""
for i, (mem_a, mem_b, similarity) in enumerate(candidate_pairs):
desc_a = self._format_memory_for_llm(mem_a)
desc_b = self._format_memory_for_llm(mem_b)
candidates_text += f"""
候选对 {i+1}:
记忆A: {desc_a}
记忆B: {desc_b}
向量相似度: {similarity:.3f}
"""
# 构建批量分析提示词(使用配置的置信度阈值)
min_confidence = getattr(self.config, 'consolidation_linking_min_confidence', 0.7)
prompt = f"""你是记忆关系分析专家。请批量分析以下候选记忆对之间的关系。
**关系类型说明:**
- 导致: A的发生导致了B的发生因果关系
- 引用: A提到或涉及B引用关系
- 相似: A和B描述相似的内容相似关系
- 相反: A和B表达相反的观点对立关系
- 关联: A和B存在某种关联但不属于以上类型一般关联
**候选记忆对:**
{candidates_text}
**任务要求:**
1. 对每个候选对,判断是否存在有意义的关系
2. 如果存在关系,指定关系类型和置信度(0.0-1.0)
3. 简要说明判断理由
4. 只返回置信度 >= {min_confidence} 的关系
5. 优先考虑因果、引用等强关系,谨慎建立相似关系
**输出格式JSON**
```json
[
{{
"candidate_id": 1,
"has_relation": true,
"relation_type": "导致",
"confidence": 0.85,
"reasoning": "记忆A描述的原因导致记忆B的结果"
}},
{{
"candidate_id": 2,
"has_relation": false,
"reasoning": "两者无明显关联"
}}
]
```
请分析并输出JSON结果"""
# 调用LLM使用配置的参数
llm_temperature = getattr(self.config, 'consolidation_linking_llm_temperature', 0.2)
llm_max_tokens = getattr(self.config, 'consolidation_linking_llm_max_tokens', 1500)
response, _ = await llm.generate_response_async(
prompt,
temperature=llm_temperature,
max_tokens=llm_max_tokens,
)
# 解析响应
import json
import re
# 提取JSON
json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
json_str = response.strip()
try:
analysis_results = json.loads(json_str)
except json.JSONDecodeError:
logger.warning(f"LLM返回格式错误尝试修复: {response[:200]}")
# 尝试简单修复
json_str = re.sub(r'[\r\n\t]', '', json_str)
analysis_results = json.loads(json_str)
# 转换为结果格式
relations = []
for result in analysis_results:
if not result.get("has_relation", False):
continue
confidence = result.get("confidence", 0.0)
if confidence < min_confidence: # 使用配置的置信度阈值
continue
candidate_id = result.get("candidate_id", 0) - 1
if 0 <= candidate_id < len(candidate_pairs):
mem_a, mem_b, vector_similarity = candidate_pairs[candidate_id]
relations.append({
"memory_pair": (mem_a, mem_b),
"relation_type": result.get("relation_type", "关联"),
"confidence": confidence,
"reasoning": result.get("reasoning", ""),
"vector_similarity": vector_similarity,
})
logger.debug(f"🧠 LLM批量分析完成: 发现 {len(relations)} 个关系")
return relations
except Exception as e:
logger.error(f"LLM批量关系分析失败: {e}", exc_info=True)
return []
async def start_maintenance_scheduler(self) -> None:
"""