feat(memory): 集成 unified_scheduler 实现定期记忆维护

重大改进:
- 在 MemoryManager 中添加调度器集成
- 使用 unified_scheduler 定期执行维护任务
- 实现 consolidate_memories 记忆整合方法
- 优化 shutdown 流程,确保调度任务正确停止

技术实现:
- start_maintenance_scheduler: 创建定期维护任务
- stop_maintenance_scheduler: 停止调度任务
- maintenance: 整合、遗忘、保存数据
- consolidate_memories: 合并相似记忆

调度配置:
- 触发类型: TriggerType.TIME
- 默认间隔: 1小时 (3600秒)
- 任务类型: 循环任务 (is_recurring=True)
- 任务名称: memory_maintenance

维护流程:
1. 记忆整理: 合并相似度0.85的记忆
2. 自动遗忘: 遗忘激活度<0.1的记忆
3. 数据保存: 持久化图存储
4. 统计报告: 返回维护结果

测试验证:
- 调度任务成功注册
- 任务信息正确 (循环、TIME类型)
- 初始化时自动启动
- shutdown时自动停止

完成 Step 3: 实现定期记忆整理
This commit is contained in:
Windpicker-owo
2025-11-05 20:13:46 +08:00
parent c3ca811e46
commit 4d44b18ac8

View File

@@ -67,6 +67,9 @@ class MemoryManager:
# 状态
self._initialized = False
self._last_maintenance = datetime.now()
self._maintenance_task: Optional[asyncio.Task] = None
self._maintenance_interval_hours = 1 # 默认每小时执行一次维护
self._maintenance_schedule_id: Optional[str] = None # 调度任务ID
logger.info(f"记忆管理器已创建 (data_dir={data_dir})")
@@ -132,24 +135,41 @@ class MemoryManager:
self._initialized = True
logger.info("✅ 记忆管理器初始化完成")
# 启动后台维护调度任务
await self.start_maintenance_scheduler()
except Exception as e:
logger.error(f"记忆管理器初始化失败: {e}", exc_info=True)
raise
async def shutdown(self) -> None:
"""
关闭记忆管理器,保存所有数据
关闭记忆管理器
执行清理操作:
- 停止维护调度任务
- 保存所有数据
- 关闭存储组件
"""
if not self._initialized:
logger.warning("记忆管理器未初始化,无需关闭")
return
try:
logger.info("正在关闭记忆管理器...")
# 保存图数据
# 1. 停止调度任务
await self.stop_maintenance_scheduler()
# 2. 执行最后一次维护(保存数据)
if self.graph_store and self.persistence:
logger.info("执行最终数据保存...")
await self.persistence.save_graph_store(self.graph_store)
logger.info("图数据已保存")
# 3. 关闭存储组件
if self.vector_store:
# VectorStore 使用 chromadb无需显式关闭
pass
self._initialized = False
logger.info("✅ 记忆管理器已关闭")
@@ -722,11 +742,118 @@ class MemoryManager:
return stats
async def consolidate_memories(
self,
similarity_threshold: float = 0.85,
time_window_hours: int = 24,
) -> Dict[str, Any]:
"""
整理记忆:合并相似记忆
Args:
similarity_threshold: 相似度阈值
time_window_hours: 时间窗口(小时)
Returns:
整理结果
"""
if not self._initialized:
await self.initialize()
try:
logger.info(f"开始记忆整理 (similarity_threshold={similarity_threshold}, time_window={time_window_hours}h)...")
result = {
"merged_count": 0,
"checked_count": 0,
}
# 获取最近创建的记忆
cutoff_time = datetime.now() - timedelta(hours=time_window_hours)
all_memories = self.graph_store.get_all_memories()
recent_memories = [
mem for mem in all_memories
if mem.created_at >= cutoff_time and not mem.metadata.get("forgotten", False)
]
if not recent_memories:
logger.info("没有需要整理的记忆")
return result
logger.info(f"找到 {len(recent_memories)} 条待整理记忆")
result["checked_count"] = len(recent_memories)
# 按记忆类型分组
memories_by_type: Dict[str, List[Memory]] = {}
for mem in recent_memories:
mem_type = mem.metadata.get("memory_type", "")
if mem_type not in memories_by_type:
memories_by_type[mem_type] = []
memories_by_type[mem_type].append(mem)
# 对每个类型的记忆进行相似度检测
for mem_type, memories in memories_by_type.items():
if len(memories) < 2:
continue
logger.debug(f"检查类型 '{mem_type}'{len(memories)} 条记忆")
# 使用向量相似度检测
for i in range(len(memories)):
for j in range(i + 1, len(memories)):
mem_i = memories[i]
mem_j = memories[j]
# 获取主题节点的向量
topic_i = next((n for n in mem_i.nodes if n.node_type == NodeType.TOPIC), None)
topic_j = next((n for n in mem_j.nodes if n.node_type == NodeType.TOPIC), None)
if not topic_i or not topic_j:
continue
if topic_i.embedding is None or topic_j.embedding is None:
continue
# 计算余弦相似度
import numpy as np
similarity = np.dot(topic_i.embedding, topic_j.embedding) / (
np.linalg.norm(topic_i.embedding) * np.linalg.norm(topic_j.embedding)
)
if similarity >= similarity_threshold:
# 合并记忆:保留重要性高的,删除另一个
if mem_i.importance >= mem_j.importance:
keep_mem, remove_mem = mem_i, mem_j
else:
keep_mem, remove_mem = mem_j, mem_i
logger.info(
f"合并相似记忆 (similarity={similarity:.3f}): "
f"保留 {keep_mem.id}, 删除 {remove_mem.id}"
)
# 增加保留记忆的重要性
keep_mem.importance = min(1.0, keep_mem.importance + 0.1)
keep_mem.activation = min(1.0, keep_mem.activation + 0.1)
# 删除相似记忆
await self.delete_memory(remove_mem.id)
result["merged_count"] += 1
logger.info(f"记忆整理完成: {result}")
return result
except Exception as e:
logger.error(f"记忆整理失败: {e}", exc_info=True)
return {"error": str(e), "merged_count": 0, "checked_count": 0}
async def maintenance(self) -> Dict[str, Any]:
"""
执行维护任务
包括:
- 记忆整理(合并相似记忆)
- 清理过期记忆
- 自动遗忘低激活度记忆
- 保存数据
@@ -741,19 +868,27 @@ class MemoryManager:
logger.info("开始执行记忆系统维护...")
result = {
"consolidated": 0,
"forgotten": 0,
"deleted": 0,
"saved": False,
}
# 1. 自动遗忘
# 1. 记忆整理(合并相似记忆)
consolidate_result = await self.consolidate_memories(
similarity_threshold=0.85,
time_window_hours=24
)
result["consolidated"] = consolidate_result.get("merged_count", 0)
# 2. 自动遗忘
forgotten_count = await self.auto_forget_memories(threshold=0.1)
result["forgotten"] = forgotten_count
# 2. 清理非常旧的已遗忘记忆(可选)
# 3. 清理非常旧的已遗忘记忆(可选)
# TODO: 实现清理逻辑
# 3. 保存数据
# 4. 保存数据
await self.persistence.save_graph_store(self.graph_store)
result["saved"] = True
@@ -764,3 +899,70 @@ class MemoryManager:
except Exception as e:
logger.error(f"维护失败: {e}", exc_info=True)
return {"error": str(e)}
async def start_maintenance_scheduler(self) -> None:
"""
启动记忆维护调度任务
使用 unified_scheduler 定期执行维护任务:
- 记忆整合(合并相似记忆)
- 自动遗忘低激活度记忆
- 保存数据
默认间隔1小时
"""
try:
from src.schedule.unified_scheduler import TriggerType, unified_scheduler
# 如果已有调度任务,先移除
if self._maintenance_schedule_id:
await unified_scheduler.remove_schedule(self._maintenance_schedule_id)
logger.info("移除旧的维护调度任务")
# 创建新的调度任务
interval_seconds = self._maintenance_interval_hours * 3600
self._maintenance_schedule_id = await unified_scheduler.create_schedule(
callback=self.maintenance,
trigger_type=TriggerType.TIME,
trigger_config={
"delay_seconds": interval_seconds, # 首次延迟启动后1小时
"interval_seconds": interval_seconds, # 循环间隔
},
is_recurring=True,
task_name="memory_maintenance",
)
logger.info(
f"✅ 记忆维护调度任务已启动 "
f"(间隔={self._maintenance_interval_hours}小时, "
f"schedule_id={self._maintenance_schedule_id[:8]}...)"
)
except ImportError:
logger.warning("无法导入 unified_scheduler维护调度功能不可用")
except Exception as e:
logger.error(f"启动维护调度任务失败: {e}", exc_info=True)
async def stop_maintenance_scheduler(self) -> None:
"""
停止记忆维护调度任务
"""
if not self._maintenance_schedule_id:
return
try:
from src.schedule.unified_scheduler import unified_scheduler
success = await unified_scheduler.remove_schedule(self._maintenance_schedule_id)
if success:
logger.info(f"✅ 记忆维护调度任务已停止 (schedule_id={self._maintenance_schedule_id[:8]}...)")
else:
logger.warning(f"停止维护调度任务失败 (schedule_id={self._maintenance_schedule_id[:8]}...)")
self._maintenance_schedule_id = None
except ImportError:
logger.warning("无法导入 unified_scheduler")
except Exception as e:
logger.error(f"停止维护调度任务失败: {e}", exc_info=True)