feat(persistence): 实现异步保存图存储以提高性能和响应性

This commit is contained in:
Windpicker-owo
2025-11-13 12:02:38 +08:00
parent b16b57b232
commit 10f5b7d037
4 changed files with 221 additions and 104 deletions

View File

@@ -218,7 +218,7 @@ async def get_full_graph():
data = _format_graph_data_from_manager(memory_manager)
else:
# 如果内存管理器不可用,则从文件加载
data = load_graph_data_from_file()
data = await load_graph_data_from_file()
return JSONResponse(content={"success": True, "data": data})
except Exception as e:
@@ -247,7 +247,7 @@ async def get_graph_summary():
"current_file": "memory_manager (实时数据)",
}})
else:
data = load_graph_data_from_file()
data = await load_graph_data_from_file()
return JSONResponse(content={"success": True, "data": {
"stats": data.get("stats", {}),
"current_file": data.get("current_file", ""),
@@ -273,7 +273,7 @@ async def get_paginated_graph(
if memory_manager and memory_manager._initialized:
full_data = _format_graph_data_from_manager(memory_manager)
else:
full_data = load_graph_data_from_file()
full_data = await load_graph_data_from_file()
nodes = full_data.get("nodes", [])
edges = full_data.get("edges", [])
@@ -349,7 +349,7 @@ async def get_clustered_graph(
if memory_manager and memory_manager._initialized:
full_data = _format_graph_data_from_manager(memory_manager)
else:
full_data = load_graph_data_from_file()
full_data = await load_graph_data_from_file()
nodes = full_data.get("nodes", [])
edges = full_data.get("edges", [])
@@ -514,7 +514,7 @@ async def select_file(request: Request):
graph_data_cache = None
current_data_file = file_to_load
graph_data = load_graph_data_from_file(file_to_load)
graph_data = await load_graph_data_from_file(file_to_load)
return JSONResponse(
content={
@@ -532,7 +532,7 @@ async def reload_data():
"""重新加载数据"""
global graph_data_cache
graph_data_cache = None
data = load_graph_data_from_file()
data = await load_graph_data_from_file()
return JSONResponse(content={"success": True, "message": "数据已重新加载", "stats": data.get("stats", {})})
@@ -560,7 +560,7 @@ async def search_memories(q: str, limit: int = 50):
)
else:
# 从文件加载的数据中搜索 (降级方案)
data = load_graph_data_from_file()
data = await load_graph_data_from_file()
for memory in data.get("memories", []):
if q.lower() in memory.get("text", "").lower():
results.append(memory)
@@ -582,7 +582,7 @@ async def search_memories(q: str, limit: int = 50):
async def get_statistics():
"""获取统计信息"""
try:
data = load_graph_data_from_file()
data = await load_graph_data_from_file()
node_types = {}
memory_types = {}

View File

@@ -327,8 +327,8 @@ class MemoryManager:
memory.updated_at = datetime.now()
# 保存更新
await self.persistence.save_graph_store(self.graph_store)
# 异步保存更新(不阻塞当前操作)
asyncio.create_task(self._async_save_graph_store("更新记忆"))
logger.info(f"记忆更新成功: {memory_id}")
return True
@@ -363,8 +363,8 @@ class MemoryManager:
# 从图存储删除记忆
self.graph_store.remove_memory(memory_id)
# 保存更新
await self.persistence.save_graph_store(self.graph_store)
# 异步保存更新(不阻塞当前操作)
asyncio.create_task(self._async_save_graph_store("删除记忆"))
logger.info(f"记忆删除成功: {memory_id}")
return True
@@ -595,8 +595,8 @@ class MemoryManager:
for related_id in related_memories[:max_related]:
await self.activate_memory(related_id, propagation_strength)
# 保存更新
await self.persistence.save_graph_store(self.graph_store)
# 异步保存更新(不阻塞当前操作)
asyncio.create_task(self._async_save_graph_store("激活记忆"))
logger.debug(f"记忆已激活: {memory_id} (level={new_activation:.3f})")
return True
@@ -663,9 +663,9 @@ class MemoryManager:
"strength": strength
})
# 批量保存到数据库
# 批量保存到数据库(异步执行)
if activation_updates:
await self.persistence.save_graph_store(self.graph_store)
asyncio.create_task(self._async_save_graph_store("批量激活更新"))
# 激活传播(异步执行,不阻塞主流程)
asyncio.create_task(self._batch_propagate_activation(memories_to_activate, base_strength))
@@ -752,8 +752,8 @@ class MemoryManager:
base_strength: 基础激活强度
"""
try:
# 批量保存到数据库
await self.persistence.save_graph_store(self.graph_store)
# 批量保存到数据库(异步执行)
asyncio.create_task(self._async_save_graph_store("后台激活更新"))
# 简化的激活传播(仅在强度足够时执行)
if base_strength > 0.08: # 提高传播阈值,减少传播频率
@@ -952,8 +952,8 @@ class MemoryManager:
else:
logger.debug(f"记忆已删除: {memory_id} (删除了 {deleted_vectors} 个向量)")
# 4. 保存更新
await self.persistence.save_graph_store(self.graph_store)
# 4. 异步保存更新(不阻塞当前操作)
asyncio.create_task(self._async_save_graph_store("删除相关记忆"))
return True
else:
logger.error(f"从图存储删除记忆失败: {memory_id}")
@@ -1375,9 +1375,9 @@ class MemoryManager:
except Exception as e:
logger.warning(f"删除记忆 {memory.id[:8]} 失败: {e}")
# 批量保存一次性写入减少I/O
await self.persistence.save_graph_store(self.graph_store)
logger.info("💾 去重保存完成")
# 批量保存一次性写入减少I/O,异步执行
asyncio.create_task(self._async_save_graph_store("记忆去重"))
logger.info("💾 去重保存任务已启动")
# ===== 步骤4: 向量检索关联记忆 + LLM分析关系 =====
# 过滤掉已删除的记忆
@@ -1486,9 +1486,9 @@ class MemoryManager:
except Exception as e:
logger.warning(f"添加边到图失败: {e}")
# 批量保存更新
await self.persistence.save_graph_store(self.graph_store)
logger.info("💾 关联边保存完成")
# 批量保存更新(异步执行)
asyncio.create_task(self._async_save_graph_store("记忆关联边"))
logger.info("💾 关联边保存任务已启动")
logger.info(f"✅ 记忆整理完成: {result}")
@@ -1636,10 +1636,10 @@ class MemoryManager:
logger.warning(f"建立关联失败: {e}")
continue
# 保存更新后的图数据
# 异步保存更新后的图数据
if result["linked_count"] > 0:
await self.persistence.save_graph_store(self.graph_store)
logger.info(f"保存 {result['linked_count']} 条自动关联边")
asyncio.create_task(self._async_save_graph_store("自动关联"))
logger.info(f"启动保存任务: {result['linked_count']} 条自动关联边")
logger.info(f"自动关联完成: {result}")
return result
@@ -2335,3 +2335,28 @@ class MemoryManager:
finally:
self._maintenance_running = False
logger.debug("维护循环已清理完毕")
async def _async_save_graph_store(self, operation_name: str = "未知操作") -> None:
"""
异步保存图存储到磁盘
此方法设计为在后台任务中执行,包含错误处理
Args:
operation_name: 操作名称,用于日志记录
"""
try:
# 确保图存储存在且已初始化
if self.graph_store is None:
logger.warning(f"图存储未初始化,跳过异步保存: {operation_name}")
return
if self.persistence is None:
logger.warning(f"持久化管理器未初始化,跳过异步保存: {operation_name}")
return
await self.persistence.save_graph_store(self.graph_store)
logger.debug(f"异步保存图数据成功: {operation_name}")
except Exception as e:
logger.error(f"异步保存图数据失败 ({operation_name}): {e}", exc_info=True)
# 可以考虑添加重试机制或者通知机制

View File

@@ -56,6 +56,7 @@ class PersistenceManager:
self.auto_save_interval = auto_save_interval
self._auto_save_task: asyncio.Task | None = None
self._running = False
self._file_lock = asyncio.Lock() # 文件操作锁
logger.info(f"初始化持久化管理器: data_dir={data_dir}")
@@ -66,6 +67,7 @@ class PersistenceManager:
Args:
graph_store: 图存储对象
"""
async with self._file_lock: # 使用文件锁防止并发访问
try:
# 转换为字典
data = graph_store.to_dict()
@@ -87,6 +89,24 @@ class PersistenceManager:
temp_file = self.graph_file.with_suffix(".tmp")
async with aiofiles.open(temp_file, "wb") as f:
await f.write(json_data)
# 在Windows上确保目标文件没有被占用
if self.graph_file.exists():
import os
try:
os.unlink(self.graph_file)
except OSError:
# 如果无法删除,等待一小段时间再重试
await asyncio.sleep(0.1)
try:
os.unlink(self.graph_file)
except OSError:
# 如果还是失败,使用备用策略
backup_file = self.graph_file.with_suffix(".bak")
if backup_file.exists():
os.unlink(backup_file)
self.graph_file.rename(backup_file)
temp_file.replace(self.graph_file)
logger.info(f"图数据已保存: {self.graph_file}, 大小: {len(json_data) / 1024:.2f} KB")
@@ -106,11 +126,21 @@ class PersistenceManager:
logger.info("图数据文件不存在,返回空图")
return None
async with self._file_lock: # 使用文件锁防止并发访问
try:
# 读取文件,添加重试机制处理可能的文件锁定
max_retries = 3
for attempt in range(max_retries):
try:
# 读取文件
async with aiofiles.open(self.graph_file, "rb") as f:
json_data = await f.read()
data = orjson.loads(json_data)
break
except OSError as e:
if attempt == max_retries - 1:
raise
logger.warning(f"读取图数据文件失败 (尝试 {attempt + 1}/{max_retries}): {e}")
await asyncio.sleep(0.1 * (attempt + 1))
# 检查版本(未来可能需要数据迁移)
version = data.get("metadata", {}).get("version", "unknown")
@@ -134,6 +164,7 @@ class PersistenceManager:
Args:
staged_memories: 临时记忆列表
"""
async with self._file_lock: # 使用文件锁防止并发访问
try:
data = {
"metadata": {
@@ -149,6 +180,24 @@ class PersistenceManager:
temp_file = self.staged_file.with_suffix(".tmp")
async with aiofiles.open(temp_file, "wb") as f:
await f.write(json_data)
# 在Windows上确保目标文件没有被占用
if self.staged_file.exists():
import os
try:
os.unlink(self.staged_file)
except OSError:
# 如果无法删除,等待一小段时间再重试
await asyncio.sleep(0.1)
try:
os.unlink(self.staged_file)
except OSError:
# 如果还是失败,使用备用策略
backup_file = self.staged_file.with_suffix(".bak")
if backup_file.exists():
os.unlink(backup_file)
self.staged_file.rename(backup_file)
temp_file.replace(self.staged_file)
logger.info(f"临时记忆已保存: {len(staged_memories)}")
@@ -168,10 +217,21 @@ class PersistenceManager:
logger.info("临时记忆文件不存在,返回空列表")
return []
async with self._file_lock: # 使用文件锁防止并发访问
try:
# 读取文件,添加重试机制处理可能的文件锁定
max_retries = 3
for attempt in range(max_retries):
try:
async with aiofiles.open(self.staged_file, "rb") as f:
json_data = await f.read()
data = orjson.loads(json_data)
break
except OSError as e:
if attempt == max_retries - 1:
raise
logger.warning(f"读取临时记忆文件失败 (尝试 {attempt + 1}/{max_retries}): {e}")
await asyncio.sleep(0.1 * (attempt + 1))
staged_memories = [StagedMemory.from_dict(sm) for sm in data.get("staged_memories", [])]
@@ -225,9 +285,19 @@ class PersistenceManager:
latest_backup = backup_files[0]
logger.warning(f"尝试从备份恢复: {latest_backup}")
# 读取备份文件,添加重试机制
max_retries = 3
for attempt in range(max_retries):
try:
async with aiofiles.open(latest_backup, "rb") as f:
json_data = await f.read()
data = orjson.loads(json_data)
break
except OSError as e:
if attempt == max_retries - 1:
raise
logger.warning(f"读取备份文件失败 (尝试 {attempt + 1}/{max_retries}): {e}")
await asyncio.sleep(0.1 * (attempt + 1))
graph_store = GraphStore.from_dict(data)
logger.info(f"从备份恢复成功: {graph_store.get_statistics()}")

View File

@@ -4,6 +4,7 @@ LLM 工具接口:定义记忆系统的工具 schema 和执行逻辑
from __future__ import annotations
import asyncio
from typing import Any
from src.common.logger import get_logger
@@ -376,8 +377,8 @@ class MemoryTools:
# 3. 添加到存储(暂存状态)
await self._add_memory_to_stores(memory)
# 4. 保存到磁盘
await self.persistence_manager.save_graph_store(self.graph_store)
# 4. 异步保存到磁盘(不阻塞当前操作)
asyncio.create_task(self._async_save_graph_store())
logger.info(f"记忆创建成功: {memory.id}")
@@ -456,8 +457,8 @@ class MemoryTools:
**edge.metadata
)
# 5. 保存
await self.persistence_manager.save_graph_store(self.graph_store)
# 5. 异步保存(不阻塞当前操作)
asyncio.create_task(self._async_save_graph_store())
logger.info(f"记忆关联成功: {source_memory.id} -> {target_memory.id}")
@@ -918,7 +919,6 @@ class MemoryTools:
# 获取上下文信息
chat_history = context.get("chat_history", "") if context else ""
sender = context.get("sender", "") if context else ""
# 处理聊天历史提取最近5条左右的对话
recent_chat = ""
@@ -937,6 +937,7 @@ class MemoryTools:
**最近聊天记录最近10条**
{recent_chat if recent_chat else '无聊天历史'}
**目标消息:** {query}
---
## 第一步:分析查询意图与记忆类型
@@ -1313,3 +1314,24 @@ class MemoryTools:
MemoryTools.get_link_memories_schema(),
MemoryTools.get_search_memories_schema(),
]
async def _async_save_graph_store(self) -> None:
"""
异步保存图存储到磁盘
此方法设计为在后台任务中执行,包含错误处理
"""
try:
# 确保组件已初始化
if self.graph_store is None:
logger.warning("图存储未初始化,跳过异步保存")
return
if self.persistence_manager is None:
logger.warning("持久化管理器未初始化,跳过异步保存")
return
await self.persistence_manager.save_graph_store(self.graph_store)
logger.debug("异步保存图数据成功")
except Exception as e:
logger.error(f"异步保存图数据失败: {e}", exc_info=True)