fix(manager): 优化记忆整合逻辑,添加批量处理限制,提升性能和稳定性

fix(config): 更新配置文件版本,调整记忆整合阈值和时间窗口设置
This commit is contained in:
Windpicker-owo
2025-11-06 20:56:41 +08:00
parent 306749731e
commit 1396e94a86
2 changed files with 55 additions and 17 deletions

View File

@@ -967,14 +967,21 @@ class MemoryManager:
async def consolidate_memories(
self,
similarity_threshold: float = 0.85,
time_window_hours: int = 24,
time_window_hours: float = 24.0,
max_batch_size: int = 50,
) -> Dict[str, Any]:
"""
整理记忆:合并相似记忆
整理记忆:直接合并去重相似记忆(不创建新边)
优化点:
1. 添加批量限制,避免长时间阻塞
2. 相似记忆直接覆盖合并,不创建关联边
3. 使用 asyncio.sleep 让出控制权,避免阻塞事件循环
Args:
similarity_threshold: 相似度阈值
similarity_threshold: 相似度阈值默认0.85建议提高到0.9减少误判)
time_window_hours: 时间窗口(小时)
max_batch_size: 单次最多处理的记忆数量
Returns:
整理结果
@@ -983,11 +990,12 @@ class MemoryManager:
await self.initialize()
try:
logger.info(f"开始记忆整理 (similarity_threshold={similarity_threshold}, time_window={time_window_hours}h)...")
logger.info(f"开始记忆整理 (similarity_threshold={similarity_threshold}, time_window={time_window_hours}h, max_batch={max_batch_size})...")
result = {
"merged_count": 0,
"checked_count": 0,
"skipped_count": 0,
}
# 获取最近创建的记忆
@@ -1003,6 +1011,12 @@ class MemoryManager:
logger.info("没有需要整理的记忆")
return result
# 限制批量处理数量
if len(recent_memories) > max_batch_size:
logger.info(f"记忆数量 {len(recent_memories)} 超过批量限制 {max_batch_size},仅处理最新的 {max_batch_size}")
recent_memories = sorted(recent_memories, key=lambda m: m.created_at, reverse=True)[:max_batch_size]
result["skipped_count"] = len(all_memories) - max_batch_size
logger.info(f"找到 {len(recent_memories)} 条待整理记忆")
result["checked_count"] = len(recent_memories)
@@ -1014,6 +1028,9 @@ class MemoryManager:
memories_by_type[mem_type] = []
memories_by_type[mem_type].append(mem)
# 记录已删除的记忆ID避免重复处理
deleted_ids = set()
# 对每个类型的记忆进行相似度检测
for mem_type, memories in memories_by_type.items():
if len(memories) < 2:
@@ -1023,7 +1040,17 @@ class MemoryManager:
# 使用向量相似度检测
for i in range(len(memories)):
# 让出控制权,避免长时间阻塞
if i % 10 == 0:
await asyncio.sleep(0)
if memories[i].id in deleted_ids:
continue
for j in range(i + 1, len(memories)):
if memories[j].id in deleted_ids:
continue
mem_i = memories[i]
mem_j = memories[j]
@@ -1044,23 +1071,28 @@ class MemoryManager:
)
if similarity >= similarity_threshold:
# 合并记忆:保留重要性高的,删除另一个
# 直接去重:保留重要性高的,删除另一个(不创建关联边)
if mem_i.importance >= mem_j.importance:
keep_mem, remove_mem = mem_i, mem_j
else:
keep_mem, remove_mem = mem_j, mem_i
logger.info(
f"合并相似记忆 (similarity={similarity:.3f}): "
f"去重相似记忆 (similarity={similarity:.3f}): "
f"保留 {keep_mem.id}, 删除 {remove_mem.id}"
)
# 增保留记忆的重要性
keep_mem.importance = min(1.0, keep_mem.importance + 0.1)
keep_mem.activation = min(1.0, keep_mem.activation + 0.1)
# 增保留记忆的重要性(合并信息价值)
keep_mem.importance = min(1.0, keep_mem.importance + 0.05)
keep_mem.activation = min(1.0, keep_mem.activation + 0.05)
# 删除相似记忆
# 将被删除记忆的访问次数累加到保留记忆
if hasattr(keep_mem, 'access_count') and hasattr(remove_mem, 'access_count'):
keep_mem.access_count += remove_mem.access_count
# 直接删除相似记忆(不创建边,简化图结构)
await self.delete_memory(remove_mem.id)
deleted_ids.add(remove_mem.id)
result["merged_count"] += 1
logger.info(f"记忆整理完成: {result}")
@@ -1466,10 +1498,13 @@ class MemoryManager:
}
# 1. 记忆整理(合并相似记忆)
# 默认禁用自动整理,因为可能阻塞主流程
# 建议提高阈值到0.92以上,减少误判;限制批量大小避免阻塞
if getattr(self.config, 'consolidation_enabled', False):
consolidate_result = await self.consolidate_memories(
similarity_threshold=getattr(self.config, 'consolidation_similarity_threshold', 0.9),
time_window_hours=getattr(self.config, 'consolidation_time_window_hours', 24.0)
similarity_threshold=getattr(self.config, 'consolidation_similarity_threshold', 0.92),
time_window_hours=getattr(self.config, 'consolidation_time_window_hours', 24.0),
max_batch_size=getattr(self.config, 'consolidation_max_batch_size', 50)
)
result["consolidated"] = consolidate_result.get("merged_count", 0)