From 936d5d3a0e7a39a7be987cd0e241c8adc0dde978 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Tue, 16 Dec 2025 23:40:11 +0800 Subject: [PATCH] =?UTF-8?q?refactor(short=5Fterm=5Fmanager):=20=E7=AE=80?= =?UTF-8?q?=E5=8C=96=E7=9F=AD=E6=9C=9F=E8=AE=B0=E5=BF=86=E8=BD=AC=E7=A7=BB?= =?UTF-8?q?=E7=AD=96=E7=95=A5=EF=BC=8C=E4=BB=85=E5=9C=A8=E6=BB=A1=E9=A2=9D?= =?UTF-8?q?=E6=97=B6=E6=95=B4=E6=89=B9=E8=BD=AC=E7=A7=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/memory_graph/short_term_manager.py | 55 ++---------- src/memory_graph/short_term_pressure_patch.md | 13 ++- src/memory_graph/unified_manager.py | 86 +++++-------------- 3 files changed, 36 insertions(+), 118 deletions(-) diff --git a/src/memory_graph/short_term_manager.py b/src/memory_graph/short_term_manager.py index 45911547c..0c598d6d6 100644 --- a/src/memory_graph/short_term_manager.py +++ b/src/memory_graph/short_term_manager.py @@ -639,54 +639,17 @@ class ShortTermMemoryManager: def get_memories_for_transfer(self) -> list[ShortTermMemory]: """ - 获取需要转移到长期记忆的记忆(改进版:转移优先于删除) + 获取需要转移到长期记忆的记忆(简化版:满额整批转移) - 优化的转移策略: - 1. 优先选择重要性 >= 阈值的记忆进行转移 - 2. 如果高重要性记忆已清空但仍超过容量,则考虑转移低重要性记忆 - 3. 仅当转移不能解决容量问题时,才进行强制删除(由 force_cleanup_overflow 处理) - - 返回: - 需要转移的记忆列表(优先返回高重要性,次选低重要性) + 策略: + - 当短期记忆数量达到上限(>= max_memories)时,返回当前全部短期记忆; + - 没满则返回空列表,不触发转移。 """ - # 单次遍历:同时分类高重要性和低重要性记忆 - high_importance_memories = [] - low_importance_memories = [] - - for mem in self.memories: - if mem.importance >= self.transfer_importance_threshold: - high_importance_memories.append(mem) - else: - low_importance_memories.append(mem) - - # 策略1:优先返回高重要性记忆进行转移 - if high_importance_memories: - logger.debug( - f"转移候选: 发现 {len(high_importance_memories)} 条高重要性记忆待转移" - ) - return high_importance_memories - - # 策略2:如果没有高重要性记忆但总体超过容量上限, - # 返回一部分低重要性记忆用于转移(而非删除) - if len(self.memories) > self.max_memories: - # 计算需要转移的数量(目标:降到上限) - num_to_transfer = len(self.memories) - self.max_memories - - # 按创建时间排序低重要性记忆,优先转移最早的(可能包含过时信息) - low_importance_memories.sort(key=lambda x: x.created_at) - to_transfer = low_importance_memories[:num_to_transfer] - - if to_transfer: - logger.debug( - f"转移候选: 发现 {len(to_transfer)} 条低重要性记忆待转移 " - f"(当前容量 {len(self.memories)}/{self.max_memories})" - ) - return to_transfer - - # 策略3:容量充足,无需转移 - logger.debug( - f"转移检查: 无需转移 (当前容量 {len(self.memories)}/{self.max_memories})" - ) + if self.max_memories <= 0: + return [] + if len(self.memories) >= self.max_memories: + logger.debug(f"转移候选: 短期记忆已满,准备整批转移 {len(self.memories)} 条") + return list(self.memories) return [] def force_cleanup_overflow(self, keep_ratio: float | None = None) -> int: diff --git a/src/memory_graph/short_term_pressure_patch.md b/src/memory_graph/short_term_pressure_patch.md index 10b97a167..093ef6d8d 100644 --- a/src/memory_graph/short_term_pressure_patch.md +++ b/src/memory_graph/short_term_pressure_patch.md @@ -1,15 +1,12 @@ -# 短期记忆压力泄压补丁 +# 短期记忆压力泄压补丁(已弃用) ## 📋 概述 -在高频消息场景下,短期记忆层(`ShortTermMemoryManager`)可能在自动转移机制触发前快速堆积大量记忆,当达到容量上限(`max_memories`)时可能阻塞后续写入。本功能提供一个**可选的泄压开关**,在容量溢出时自动删除低优先级记忆,防止系统阻塞。 +该文档描述的“泄压删除”与“复杂自动转移”机制已不再作为默认策略使用:现在短期记忆采用最简单策略——**只有当短期记忆满额时,才整批转移全部短期记忆到长期记忆;没满就不处理**。因此,本补丁说明仅供历史参考。 -**关键特性**: -- ✅ 默认开启(在高频场景中保护系统),可关闭保持向后兼容 -- ✅ 基于重要性和时间的智能删除策略 -- ✅ 异步持久化,不阻塞主流程 -- ✅ 可通过配置文件或代码灵活控制 -- ✅ 支持自定义保留比例 +**当前行为(简化版)**: +- ✅ 短期记忆未满:不触发转移 +- ✅ 短期记忆满额:一次性整批转移全部短期记忆到长期记忆 --- diff --git a/src/memory_graph/unified_manager.py b/src/memory_graph/unified_manager.py index 170ce3eab..04a82b0db 100644 --- a/src/memory_graph/unified_manager.py +++ b/src/memory_graph/unified_manager.py @@ -112,8 +112,6 @@ class UnifiedMemoryManager: self._initialized = False self._auto_transfer_task: asyncio.Task | None = None self._auto_transfer_interval = max(10.0, float(long_term_auto_transfer_interval)) - # 优化:降低最大延迟时间,加快转移节奏 (原为 300.0) - self._max_transfer_delay = min(max(30.0, self._auto_transfer_interval), 60.0) self._transfer_wakeup_event: asyncio.Event | None = None logger.info("统一记忆管理器已创建") @@ -574,11 +572,7 @@ class UnifiedMemoryManager: logger.debug("自动转移任务已启动并触发首次检查") async def _auto_transfer_loop(self) -> None: - """自动转移循环(批量缓存模式,优化:更高效的缓存管理)""" - transfer_cache: list[ShortTermMemory] = [] - cached_ids: set[str] = set() - cache_size_threshold = max(1, self._config["long_term"].get("batch_size", 1)) - last_transfer_time = time.monotonic() + """自动转移循环(简化版:短期记忆满额时整批转移)""" while True: try: @@ -595,67 +589,25 @@ class UnifiedMemoryManager: else: await asyncio.sleep(sleep_interval) - memories_to_transfer = self.short_term_manager.get_memories_for_transfer() - - if memories_to_transfer: - # 优化:批量构建缓存而不是逐条添加 - new_memories = [] - for memory in memories_to_transfer: - mem_id = getattr(memory, "id", None) - if not (mem_id and mem_id in cached_ids): - new_memories.append(memory) - if mem_id: - cached_ids.add(mem_id) - - if new_memories: - transfer_cache.extend(new_memories) - logger.debug( - f"自动转移缓存: 新增{len(new_memories)}条, 当前缓存{len(transfer_cache)}/{cache_size_threshold}" - ) - + # 最简单策略:仅当短期记忆满额时,直接整批转移全部短期记忆;没满则不处理 max_memories = max(1, getattr(self.short_term_manager, "max_memories", 1)) - occupancy_ratio = len(self.short_term_manager.memories) / max_memories - time_since_last_transfer = time.monotonic() - last_transfer_time + if len(self.short_term_manager.memories) < max_memories: + continue - if occupancy_ratio >= 1.0 and not transfer_cache: - removed = self.short_term_manager.force_cleanup_overflow() - if removed > 0: - logger.warning( - f"短期记忆占用率 {occupancy_ratio:.0%},已强制删除 {removed} 条低重要性记忆泄压" - ) + batch = list(self.short_term_manager.memories) + if not batch: + continue - # 优化:优先级判断重构(早期 return) - should_transfer = ( - len(transfer_cache) >= cache_size_threshold - or occupancy_ratio >= 0.5 - or (transfer_cache and time_since_last_transfer >= self._max_transfer_delay) - or len(self.short_term_manager.memories) >= self.short_term_manager.max_memories + logger.info( + f"短期记忆已满({len(batch)}/{max_memories}),开始整批转移到长期记忆" ) + result = await self.long_term_manager.transfer_from_short_term(batch) - if should_transfer and transfer_cache: - logger.debug( - f"准备批量转移: {len(transfer_cache)}条短期记忆到长期记忆 (占用率 {occupancy_ratio:.0%})" + if result.get("transferred_memory_ids"): + await self.short_term_manager.clear_transferred_memories( + result["transferred_memory_ids"] ) - - # 优化:直接传递列表而不再复制 - result = await self.long_term_manager.transfer_from_short_term(transfer_cache) - - if result.get("transferred_memory_ids"): - transferred_ids = set(result["transferred_memory_ids"]) - await self.short_term_manager.clear_transferred_memories( - result["transferred_memory_ids"] - ) - - # 优化:使用生成器表达式保留未转移的记忆 - transfer_cache = [ - m - for m in transfer_cache - if getattr(m, "id", None) not in transferred_ids - ] - cached_ids.difference_update(transferred_ids) - - last_transfer_time = time.monotonic() - logger.debug(f"✅ 批量转移完成: {result}") + logger.debug(f"✅ 整批转移完成: {result}") except asyncio.CancelledError: logger.debug("自动转移循环被取消") @@ -674,10 +626,16 @@ class UnifiedMemoryManager: await self.initialize() try: - memories_to_transfer = self.short_term_manager.get_memories_for_transfer() + max_memories = max(1, getattr(self.short_term_manager, "max_memories", 1)) + if len(self.short_term_manager.memories) < max_memories: + return { + "message": f"短期记忆未满({len(self.short_term_manager.memories)}/{max_memories}),不触发转移", + "transferred_count": 0, + } + memories_to_transfer = list(self.short_term_manager.memories) if not memories_to_transfer: - return {"message": "没有需要转移的记忆", "transferred_count": 0} + return {"message": "短期记忆为空,无需转移", "transferred_count": 0} # 执行转移 result = await self.long_term_manager.transfer_from_short_term(memories_to_transfer)