diff --git a/src/api/memory_visualizer_router.py b/src/api/memory_visualizer_router.py index e1137d684..2e4e1d482 100644 --- a/src/api/memory_visualizer_router.py +++ b/src/api/memory_visualizer_router.py @@ -218,7 +218,7 @@ async def get_full_graph(): data = _format_graph_data_from_manager(memory_manager) else: # 如果内存管理器不可用,则从文件加载 - data = load_graph_data_from_file() + data = await load_graph_data_from_file() return JSONResponse(content={"success": True, "data": data}) except Exception as e: @@ -247,7 +247,7 @@ async def get_graph_summary(): "current_file": "memory_manager (实时数据)", }}) else: - data = load_graph_data_from_file() + data = await load_graph_data_from_file() return JSONResponse(content={"success": True, "data": { "stats": data.get("stats", {}), "current_file": data.get("current_file", ""), @@ -273,7 +273,7 @@ async def get_paginated_graph( if memory_manager and memory_manager._initialized: full_data = _format_graph_data_from_manager(memory_manager) else: - full_data = load_graph_data_from_file() + full_data = await load_graph_data_from_file() nodes = full_data.get("nodes", []) edges = full_data.get("edges", []) @@ -349,7 +349,7 @@ async def get_clustered_graph( if memory_manager and memory_manager._initialized: full_data = _format_graph_data_from_manager(memory_manager) else: - full_data = load_graph_data_from_file() + full_data = await load_graph_data_from_file() nodes = full_data.get("nodes", []) edges = full_data.get("edges", []) @@ -514,7 +514,7 @@ async def select_file(request: Request): graph_data_cache = None current_data_file = file_to_load - graph_data = load_graph_data_from_file(file_to_load) + graph_data = await load_graph_data_from_file(file_to_load) return JSONResponse( content={ @@ -532,7 +532,7 @@ async def reload_data(): """重新加载数据""" global graph_data_cache graph_data_cache = None - data = load_graph_data_from_file() + data = await load_graph_data_from_file() return JSONResponse(content={"success": True, "message": "数据已重新加载", "stats": data.get("stats", {})}) @@ -560,7 +560,7 @@ async def search_memories(q: str, limit: int = 50): ) else: # 从文件加载的数据中搜索 (降级方案) - data = load_graph_data_from_file() + data = await load_graph_data_from_file() for memory in data.get("memories", []): if q.lower() in memory.get("text", "").lower(): results.append(memory) @@ -582,7 +582,7 @@ async def search_memories(q: str, limit: int = 50): async def get_statistics(): """获取统计信息""" try: - data = load_graph_data_from_file() + data = await load_graph_data_from_file() node_types = {} memory_types = {} diff --git a/src/memory_graph/manager.py b/src/memory_graph/manager.py index 32fe76f20..ac43ff954 100644 --- a/src/memory_graph/manager.py +++ b/src/memory_graph/manager.py @@ -327,8 +327,8 @@ class MemoryManager: memory.updated_at = datetime.now() - # 保存更新 - await self.persistence.save_graph_store(self.graph_store) + # 异步保存更新(不阻塞当前操作) + asyncio.create_task(self._async_save_graph_store("更新记忆")) logger.info(f"记忆更新成功: {memory_id}") return True @@ -363,8 +363,8 @@ class MemoryManager: # 从图存储删除记忆 self.graph_store.remove_memory(memory_id) - # 保存更新 - await self.persistence.save_graph_store(self.graph_store) + # 异步保存更新(不阻塞当前操作) + asyncio.create_task(self._async_save_graph_store("删除记忆")) logger.info(f"记忆删除成功: {memory_id}") return True @@ -595,8 +595,8 @@ class MemoryManager: for related_id in related_memories[:max_related]: await self.activate_memory(related_id, propagation_strength) - # 保存更新 - await self.persistence.save_graph_store(self.graph_store) + # 异步保存更新(不阻塞当前操作) + asyncio.create_task(self._async_save_graph_store("激活记忆")) logger.debug(f"记忆已激活: {memory_id} (level={new_activation:.3f})") return True @@ -663,9 +663,9 @@ class MemoryManager: "strength": strength }) - # 批量保存到数据库 + # 批量保存到数据库(异步执行) if activation_updates: - await self.persistence.save_graph_store(self.graph_store) + asyncio.create_task(self._async_save_graph_store("批量激活更新")) # 激活传播(异步执行,不阻塞主流程) asyncio.create_task(self._batch_propagate_activation(memories_to_activate, base_strength)) @@ -752,8 +752,8 @@ class MemoryManager: base_strength: 基础激活强度 """ try: - # 批量保存到数据库 - await self.persistence.save_graph_store(self.graph_store) + # 批量保存到数据库(异步执行) + asyncio.create_task(self._async_save_graph_store("后台激活更新")) # 简化的激活传播(仅在强度足够时执行) if base_strength > 0.08: # 提高传播阈值,减少传播频率 @@ -952,8 +952,8 @@ class MemoryManager: else: logger.debug(f"记忆已删除: {memory_id} (删除了 {deleted_vectors} 个向量)") - # 4. 保存更新 - await self.persistence.save_graph_store(self.graph_store) + # 4. 异步保存更新(不阻塞当前操作) + asyncio.create_task(self._async_save_graph_store("删除相关记忆")) return True else: logger.error(f"从图存储删除记忆失败: {memory_id}") @@ -1375,9 +1375,9 @@ class MemoryManager: except Exception as e: logger.warning(f"删除记忆 {memory.id[:8]} 失败: {e}") - # 批量保存(一次性写入,减少I/O) - await self.persistence.save_graph_store(self.graph_store) - logger.info("💾 去重保存完成") + # 批量保存(一次性写入,减少I/O,异步执行) + asyncio.create_task(self._async_save_graph_store("记忆去重")) + logger.info("💾 去重保存任务已启动") # ===== 步骤4: 向量检索关联记忆 + LLM分析关系 ===== # 过滤掉已删除的记忆 @@ -1486,9 +1486,9 @@ class MemoryManager: except Exception as e: logger.warning(f"添加边到图失败: {e}") - # 批量保存更新 - await self.persistence.save_graph_store(self.graph_store) - logger.info("💾 关联边保存完成") + # 批量保存更新(异步执行) + asyncio.create_task(self._async_save_graph_store("记忆关联边")) + logger.info("💾 关联边保存任务已启动") logger.info(f"✅ 记忆整理完成: {result}") @@ -1636,10 +1636,10 @@ class MemoryManager: logger.warning(f"建立关联失败: {e}") continue - # 保存更新后的图数据 + # 异步保存更新后的图数据 if result["linked_count"] > 0: - await self.persistence.save_graph_store(self.graph_store) - logger.info(f"已保存 {result['linked_count']} 条自动关联边") + asyncio.create_task(self._async_save_graph_store("自动关联")) + logger.info(f"已启动保存任务: {result['linked_count']} 条自动关联边") logger.info(f"自动关联完成: {result}") return result @@ -2335,3 +2335,28 @@ class MemoryManager: finally: self._maintenance_running = False logger.debug("维护循环已清理完毕") + + async def _async_save_graph_store(self, operation_name: str = "未知操作") -> None: + """ + 异步保存图存储到磁盘 + + 此方法设计为在后台任务中执行,包含错误处理 + + Args: + operation_name: 操作名称,用于日志记录 + """ + try: + # 确保图存储存在且已初始化 + if self.graph_store is None: + logger.warning(f"图存储未初始化,跳过异步保存: {operation_name}") + return + + if self.persistence is None: + logger.warning(f"持久化管理器未初始化,跳过异步保存: {operation_name}") + return + + await self.persistence.save_graph_store(self.graph_store) + logger.debug(f"异步保存图数据成功: {operation_name}") + except Exception as e: + logger.error(f"异步保存图数据失败 ({operation_name}): {e}", exc_info=True) + # 可以考虑添加重试机制或者通知机制 diff --git a/src/memory_graph/storage/persistence.py b/src/memory_graph/storage/persistence.py index 23b6a0735..70e745099 100644 --- a/src/memory_graph/storage/persistence.py +++ b/src/memory_graph/storage/persistence.py @@ -56,6 +56,7 @@ class PersistenceManager: self.auto_save_interval = auto_save_interval self._auto_save_task: asyncio.Task | None = None self._running = False + self._file_lock = asyncio.Lock() # 文件操作锁 logger.info(f"初始化持久化管理器: data_dir={data_dir}") @@ -66,34 +67,53 @@ class PersistenceManager: Args: graph_store: 图存储对象 """ - try: - # 转换为字典 - data = graph_store.to_dict() + async with self._file_lock: # 使用文件锁防止并发访问 + try: + # 转换为字典 + data = graph_store.to_dict() - # 添加元数据 - data["metadata"] = { - "version": "0.1.0", - "saved_at": datetime.now().isoformat(), - "statistics": graph_store.get_statistics(), - } + # 添加元数据 + data["metadata"] = { + "version": "0.1.0", + "saved_at": datetime.now().isoformat(), + "statistics": graph_store.get_statistics(), + } - # 使用 orjson 序列化(更快) - json_data = orjson.dumps( - data, - option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY, - ) + # 使用 orjson 序列化(更快) + json_data = orjson.dumps( + data, + option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY, + ) - # 原子写入(先写临时文件,再重命名) - temp_file = self.graph_file.with_suffix(".tmp") - async with aiofiles.open(temp_file, "wb") as f: - await f.write(json_data) - temp_file.replace(self.graph_file) + # 原子写入(先写临时文件,再重命名) + temp_file = self.graph_file.with_suffix(".tmp") + async with aiofiles.open(temp_file, "wb") as f: + await f.write(json_data) - logger.info(f"图数据已保存: {self.graph_file}, 大小: {len(json_data) / 1024:.2f} KB") + # 在Windows上,确保目标文件没有被占用 + if self.graph_file.exists(): + import os + try: + os.unlink(self.graph_file) + except OSError: + # 如果无法删除,等待一小段时间再重试 + await asyncio.sleep(0.1) + try: + os.unlink(self.graph_file) + except OSError: + # 如果还是失败,使用备用策略 + backup_file = self.graph_file.with_suffix(".bak") + if backup_file.exists(): + os.unlink(backup_file) + self.graph_file.rename(backup_file) - except Exception as e: - logger.error(f"保存图数据失败: {e}", exc_info=True) - raise + temp_file.replace(self.graph_file) + + logger.info(f"图数据已保存: {self.graph_file}, 大小: {len(json_data) / 1024:.2f} KB") + + except Exception as e: + logger.error(f"保存图数据失败: {e}", exc_info=True) + raise async def load_graph_store(self) -> GraphStore | None: """ @@ -106,26 +126,36 @@ class PersistenceManager: logger.info("图数据文件不存在,返回空图") return None - try: - # 读取文件 - async with aiofiles.open(self.graph_file, "rb") as f: - json_data = await f.read() - data = orjson.loads(json_data) + async with self._file_lock: # 使用文件锁防止并发访问 + try: + # 读取文件,添加重试机制处理可能的文件锁定 + max_retries = 3 + for attempt in range(max_retries): + try: + async with aiofiles.open(self.graph_file, "rb") as f: + json_data = await f.read() + data = orjson.loads(json_data) + break + except OSError as e: + if attempt == max_retries - 1: + raise + logger.warning(f"读取图数据文件失败 (尝试 {attempt + 1}/{max_retries}): {e}") + await asyncio.sleep(0.1 * (attempt + 1)) - # 检查版本(未来可能需要数据迁移) - version = data.get("metadata", {}).get("version", "unknown") - logger.info(f"加载图数据: version={version}") + # 检查版本(未来可能需要数据迁移) + version = data.get("metadata", {}).get("version", "unknown") + logger.info(f"加载图数据: version={version}") - # 恢复图存储 - graph_store = GraphStore.from_dict(data) + # 恢复图存储 + graph_store = GraphStore.from_dict(data) - logger.info(f"图数据加载完成: {graph_store.get_statistics()}") - return graph_store + logger.info(f"图数据加载完成: {graph_store.get_statistics()}") + return graph_store - except Exception as e: - logger.error(f"加载图数据失败: {e}", exc_info=True) - # 尝试加载备份 - return await self._load_from_backup() + except Exception as e: + logger.error(f"加载图数据失败: {e}", exc_info=True) + # 尝试加载备份 + return await self._load_from_backup() async def save_staged_memories(self, staged_memories: list[StagedMemory]) -> None: """ @@ -134,28 +164,47 @@ class PersistenceManager: Args: staged_memories: 临时记忆列表 """ - try: - data = { - "metadata": { - "version": "0.1.0", - "saved_at": datetime.now().isoformat(), - "count": len(staged_memories), - }, - "staged_memories": [sm.to_dict() for sm in staged_memories], - } + async with self._file_lock: # 使用文件锁防止并发访问 + try: + data = { + "metadata": { + "version": "0.1.0", + "saved_at": datetime.now().isoformat(), + "count": len(staged_memories), + }, + "staged_memories": [sm.to_dict() for sm in staged_memories], + } - json_data = orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY) + json_data = orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY) - temp_file = self.staged_file.with_suffix(".tmp") - async with aiofiles.open(temp_file, "wb") as f: - await f.write(json_data) - temp_file.replace(self.staged_file) + temp_file = self.staged_file.with_suffix(".tmp") + async with aiofiles.open(temp_file, "wb") as f: + await f.write(json_data) - logger.info(f"临时记忆已保存: {len(staged_memories)} 条") + # 在Windows上,确保目标文件没有被占用 + if self.staged_file.exists(): + import os + try: + os.unlink(self.staged_file) + except OSError: + # 如果无法删除,等待一小段时间再重试 + await asyncio.sleep(0.1) + try: + os.unlink(self.staged_file) + except OSError: + # 如果还是失败,使用备用策略 + backup_file = self.staged_file.with_suffix(".bak") + if backup_file.exists(): + os.unlink(backup_file) + self.staged_file.rename(backup_file) - except Exception as e: - logger.error(f"保存临时记忆失败: {e}", exc_info=True) - raise + temp_file.replace(self.staged_file) + + logger.info(f"临时记忆已保存: {len(staged_memories)} 条") + + except Exception as e: + logger.error(f"保存临时记忆失败: {e}", exc_info=True) + raise async def load_staged_memories(self) -> list[StagedMemory]: """ @@ -168,19 +217,30 @@ class PersistenceManager: logger.info("临时记忆文件不存在,返回空列表") return [] - try: - async with aiofiles.open(self.staged_file, "rb") as f: - json_data = await f.read() - data = orjson.loads(json_data) + async with self._file_lock: # 使用文件锁防止并发访问 + try: + # 读取文件,添加重试机制处理可能的文件锁定 + max_retries = 3 + for attempt in range(max_retries): + try: + async with aiofiles.open(self.staged_file, "rb") as f: + json_data = await f.read() + data = orjson.loads(json_data) + break + except OSError as e: + if attempt == max_retries - 1: + raise + logger.warning(f"读取临时记忆文件失败 (尝试 {attempt + 1}/{max_retries}): {e}") + await asyncio.sleep(0.1 * (attempt + 1)) - staged_memories = [StagedMemory.from_dict(sm) for sm in data.get("staged_memories", [])] + staged_memories = [StagedMemory.from_dict(sm) for sm in data.get("staged_memories", [])] - logger.info(f"临时记忆加载完成: {len(staged_memories)} 条") - return staged_memories + logger.info(f"临时记忆加载完成: {len(staged_memories)} 条") + return staged_memories - except Exception as e: - logger.error(f"加载临时记忆失败: {e}", exc_info=True) - return [] + except Exception as e: + logger.error(f"加载临时记忆失败: {e}", exc_info=True) + return [] async def create_backup(self) -> Path | None: """ @@ -225,9 +285,19 @@ class PersistenceManager: latest_backup = backup_files[0] logger.warning(f"尝试从备份恢复: {latest_backup}") - async with aiofiles.open(latest_backup, "rb") as f: - json_data = await f.read() - data = orjson.loads(json_data) + # 读取备份文件,添加重试机制 + max_retries = 3 + for attempt in range(max_retries): + try: + async with aiofiles.open(latest_backup, "rb") as f: + json_data = await f.read() + data = orjson.loads(json_data) + break + except OSError as e: + if attempt == max_retries - 1: + raise + logger.warning(f"读取备份文件失败 (尝试 {attempt + 1}/{max_retries}): {e}") + await asyncio.sleep(0.1 * (attempt + 1)) graph_store = GraphStore.from_dict(data) logger.info(f"从备份恢复成功: {graph_store.get_statistics()}") diff --git a/src/memory_graph/tools/memory_tools.py b/src/memory_graph/tools/memory_tools.py index 20a9022f3..88e77b34b 100644 --- a/src/memory_graph/tools/memory_tools.py +++ b/src/memory_graph/tools/memory_tools.py @@ -4,6 +4,7 @@ LLM 工具接口:定义记忆系统的工具 schema 和执行逻辑 from __future__ import annotations +import asyncio from typing import Any from src.common.logger import get_logger @@ -376,8 +377,8 @@ class MemoryTools: # 3. 添加到存储(暂存状态) await self._add_memory_to_stores(memory) - # 4. 保存到磁盘 - await self.persistence_manager.save_graph_store(self.graph_store) + # 4. 异步保存到磁盘(不阻塞当前操作) + asyncio.create_task(self._async_save_graph_store()) logger.info(f"记忆创建成功: {memory.id}") @@ -456,8 +457,8 @@ class MemoryTools: **edge.metadata ) - # 5. 保存 - await self.persistence_manager.save_graph_store(self.graph_store) + # 5. 异步保存(不阻塞当前操作) + asyncio.create_task(self._async_save_graph_store()) logger.info(f"记忆关联成功: {source_memory.id} -> {target_memory.id}") @@ -918,7 +919,6 @@ class MemoryTools: # 获取上下文信息 chat_history = context.get("chat_history", "") if context else "" - sender = context.get("sender", "") if context else "" # 处理聊天历史,提取最近5条左右的对话 recent_chat = "" @@ -937,6 +937,7 @@ class MemoryTools: **最近聊天记录(最近10条):** {recent_chat if recent_chat else '无聊天历史'} +**目标消息:** {query} --- ## 第一步:分析查询意图与记忆类型 @@ -1313,3 +1314,24 @@ class MemoryTools: MemoryTools.get_link_memories_schema(), MemoryTools.get_search_memories_schema(), ] + + async def _async_save_graph_store(self) -> None: + """ + 异步保存图存储到磁盘 + + 此方法设计为在后台任务中执行,包含错误处理 + """ + try: + # 确保组件已初始化 + if self.graph_store is None: + logger.warning("图存储未初始化,跳过异步保存") + return + + if self.persistence_manager is None: + logger.warning("持久化管理器未初始化,跳过异步保存") + return + + await self.persistence_manager.save_graph_store(self.graph_store) + logger.debug("异步保存图数据成功") + except Exception as e: + logger.error(f"异步保存图数据失败: {e}", exc_info=True)