From 4d44b18ac8f51a389b821c1a6aaa5b7b128ac079 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Wed, 5 Nov 2025 20:13:46 +0800 Subject: [PATCH] =?UTF-8?q?feat(memory):=20=E9=9B=86=E6=88=90=20unified=5F?= =?UTF-8?q?scheduler=20=E5=AE=9E=E7=8E=B0=E5=AE=9A=E6=9C=9F=E8=AE=B0?= =?UTF-8?q?=E5=BF=86=E7=BB=B4=E6=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 重大改进: - 在 MemoryManager 中添加调度器集成 - 使用 unified_scheduler 定期执行维护任务 - 实现 consolidate_memories 记忆整合方法 - 优化 shutdown 流程,确保调度任务正确停止 技术实现: - start_maintenance_scheduler: 创建定期维护任务 - stop_maintenance_scheduler: 停止调度任务 - maintenance: 整合、遗忘、保存数据 - consolidate_memories: 合并相似记忆 调度配置: - 触发类型: TriggerType.TIME - 默认间隔: 1小时 (3600秒) - 任务类型: 循环任务 (is_recurring=True) - 任务名称: memory_maintenance 维护流程: 1. 记忆整理: 合并相似度0.85的记忆 2. 自动遗忘: 遗忘激活度<0.1的记忆 3. 数据保存: 持久化图存储 4. 统计报告: 返回维护结果 测试验证: - 调度任务成功注册 - 任务信息正确 (循环、TIME类型) - 初始化时自动启动 - shutdown时自动停止 完成 Step 3: 实现定期记忆整理 --- src/memory_graph/manager.py | 214 +++++++++++++++++++++++++++++++++++- 1 file changed, 208 insertions(+), 6 deletions(-) diff --git a/src/memory_graph/manager.py b/src/memory_graph/manager.py index 0895a3a60..3c54e46b9 100644 --- a/src/memory_graph/manager.py +++ b/src/memory_graph/manager.py @@ -67,6 +67,9 @@ class MemoryManager: # 状态 self._initialized = False self._last_maintenance = datetime.now() + self._maintenance_task: Optional[asyncio.Task] = None + self._maintenance_interval_hours = 1 # 默认每小时执行一次维护 + self._maintenance_schedule_id: Optional[str] = None # 调度任务ID logger.info(f"记忆管理器已创建 (data_dir={data_dir})") @@ -132,24 +135,41 @@ class MemoryManager: self._initialized = True logger.info("✅ 记忆管理器初始化完成") + # 启动后台维护调度任务 + await self.start_maintenance_scheduler() + except Exception as e: logger.error(f"记忆管理器初始化失败: {e}", exc_info=True) raise async def shutdown(self) -> None: """ - 关闭记忆管理器,保存所有数据 + 关闭记忆管理器 + + 执行清理操作: + - 停止维护调度任务 + - 保存所有数据 + - 关闭存储组件 """ if not self._initialized: + logger.warning("记忆管理器未初始化,无需关闭") return try: logger.info("正在关闭记忆管理器...") - # 保存图数据 + # 1. 停止调度任务 + await self.stop_maintenance_scheduler() + + # 2. 执行最后一次维护(保存数据) if self.graph_store and self.persistence: + logger.info("执行最终数据保存...") await self.persistence.save_graph_store(self.graph_store) - logger.info("图数据已保存") + + # 3. 关闭存储组件 + if self.vector_store: + # VectorStore 使用 chromadb,无需显式关闭 + pass self._initialized = False logger.info("✅ 记忆管理器已关闭") @@ -722,11 +742,118 @@ class MemoryManager: return stats + async def consolidate_memories( + self, + similarity_threshold: float = 0.85, + time_window_hours: int = 24, + ) -> Dict[str, Any]: + """ + 整理记忆:合并相似记忆 + + Args: + similarity_threshold: 相似度阈值 + time_window_hours: 时间窗口(小时) + + Returns: + 整理结果 + """ + if not self._initialized: + await self.initialize() + + try: + logger.info(f"开始记忆整理 (similarity_threshold={similarity_threshold}, time_window={time_window_hours}h)...") + + result = { + "merged_count": 0, + "checked_count": 0, + } + + # 获取最近创建的记忆 + cutoff_time = datetime.now() - timedelta(hours=time_window_hours) + all_memories = self.graph_store.get_all_memories() + + recent_memories = [ + mem for mem in all_memories + if mem.created_at >= cutoff_time and not mem.metadata.get("forgotten", False) + ] + + if not recent_memories: + logger.info("没有需要整理的记忆") + return result + + logger.info(f"找到 {len(recent_memories)} 条待整理记忆") + result["checked_count"] = len(recent_memories) + + # 按记忆类型分组 + memories_by_type: Dict[str, List[Memory]] = {} + for mem in recent_memories: + mem_type = mem.metadata.get("memory_type", "") + if mem_type not in memories_by_type: + memories_by_type[mem_type] = [] + memories_by_type[mem_type].append(mem) + + # 对每个类型的记忆进行相似度检测 + for mem_type, memories in memories_by_type.items(): + if len(memories) < 2: + continue + + logger.debug(f"检查类型 '{mem_type}' 的 {len(memories)} 条记忆") + + # 使用向量相似度检测 + for i in range(len(memories)): + for j in range(i + 1, len(memories)): + mem_i = memories[i] + mem_j = memories[j] + + # 获取主题节点的向量 + topic_i = next((n for n in mem_i.nodes if n.node_type == NodeType.TOPIC), None) + topic_j = next((n for n in mem_j.nodes if n.node_type == NodeType.TOPIC), None) + + if not topic_i or not topic_j: + continue + + if topic_i.embedding is None or topic_j.embedding is None: + continue + + # 计算余弦相似度 + import numpy as np + similarity = np.dot(topic_i.embedding, topic_j.embedding) / ( + np.linalg.norm(topic_i.embedding) * np.linalg.norm(topic_j.embedding) + ) + + if similarity >= similarity_threshold: + # 合并记忆:保留重要性高的,删除另一个 + if mem_i.importance >= mem_j.importance: + keep_mem, remove_mem = mem_i, mem_j + else: + keep_mem, remove_mem = mem_j, mem_i + + logger.info( + f"合并相似记忆 (similarity={similarity:.3f}): " + f"保留 {keep_mem.id}, 删除 {remove_mem.id}" + ) + + # 增加保留记忆的重要性 + keep_mem.importance = min(1.0, keep_mem.importance + 0.1) + keep_mem.activation = min(1.0, keep_mem.activation + 0.1) + + # 删除相似记忆 + await self.delete_memory(remove_mem.id) + result["merged_count"] += 1 + + logger.info(f"记忆整理完成: {result}") + return result + + except Exception as e: + logger.error(f"记忆整理失败: {e}", exc_info=True) + return {"error": str(e), "merged_count": 0, "checked_count": 0} + async def maintenance(self) -> Dict[str, Any]: """ 执行维护任务 包括: + - 记忆整理(合并相似记忆) - 清理过期记忆 - 自动遗忘低激活度记忆 - 保存数据 @@ -741,19 +868,27 @@ class MemoryManager: logger.info("开始执行记忆系统维护...") result = { + "consolidated": 0, "forgotten": 0, "deleted": 0, "saved": False, } - # 1. 自动遗忘 + # 1. 记忆整理(合并相似记忆) + consolidate_result = await self.consolidate_memories( + similarity_threshold=0.85, + time_window_hours=24 + ) + result["consolidated"] = consolidate_result.get("merged_count", 0) + + # 2. 自动遗忘 forgotten_count = await self.auto_forget_memories(threshold=0.1) result["forgotten"] = forgotten_count - # 2. 清理非常旧的已遗忘记忆(可选) + # 3. 清理非常旧的已遗忘记忆(可选) # TODO: 实现清理逻辑 - # 3. 保存数据 + # 4. 保存数据 await self.persistence.save_graph_store(self.graph_store) result["saved"] = True @@ -764,3 +899,70 @@ class MemoryManager: except Exception as e: logger.error(f"维护失败: {e}", exc_info=True) return {"error": str(e)} + + async def start_maintenance_scheduler(self) -> None: + """ + 启动记忆维护调度任务 + + 使用 unified_scheduler 定期执行维护任务: + - 记忆整合(合并相似记忆) + - 自动遗忘低激活度记忆 + - 保存数据 + + 默认间隔:1小时 + """ + try: + from src.schedule.unified_scheduler import TriggerType, unified_scheduler + + # 如果已有调度任务,先移除 + if self._maintenance_schedule_id: + await unified_scheduler.remove_schedule(self._maintenance_schedule_id) + logger.info("移除旧的维护调度任务") + + # 创建新的调度任务 + interval_seconds = self._maintenance_interval_hours * 3600 + + self._maintenance_schedule_id = await unified_scheduler.create_schedule( + callback=self.maintenance, + trigger_type=TriggerType.TIME, + trigger_config={ + "delay_seconds": interval_seconds, # 首次延迟(启动后1小时) + "interval_seconds": interval_seconds, # 循环间隔 + }, + is_recurring=True, + task_name="memory_maintenance", + ) + + logger.info( + f"✅ 记忆维护调度任务已启动 " + f"(间隔={self._maintenance_interval_hours}小时, " + f"schedule_id={self._maintenance_schedule_id[:8]}...)" + ) + + except ImportError: + logger.warning("无法导入 unified_scheduler,维护调度功能不可用") + except Exception as e: + logger.error(f"启动维护调度任务失败: {e}", exc_info=True) + + async def stop_maintenance_scheduler(self) -> None: + """ + 停止记忆维护调度任务 + """ + if not self._maintenance_schedule_id: + return + + try: + from src.schedule.unified_scheduler import unified_scheduler + + success = await unified_scheduler.remove_schedule(self._maintenance_schedule_id) + if success: + logger.info(f"✅ 记忆维护调度任务已停止 (schedule_id={self._maintenance_schedule_id[:8]}...)") + else: + logger.warning(f"停止维护调度任务失败 (schedule_id={self._maintenance_schedule_id[:8]}...)") + + self._maintenance_schedule_id = None + + except ImportError: + logger.warning("无法导入 unified_scheduler") + except Exception as e: + logger.error(f"停止维护调度任务失败: {e}", exc_info=True)