refactor(short_term_manager): 简化短期记忆转移策略,仅在满额时整批转移
This commit is contained in:
@@ -639,54 +639,17 @@ class ShortTermMemoryManager:
|
||||
|
||||
def get_memories_for_transfer(self) -> list[ShortTermMemory]:
|
||||
"""
|
||||
获取需要转移到长期记忆的记忆(改进版:转移优先于删除)
|
||||
获取需要转移到长期记忆的记忆(简化版:满额整批转移)
|
||||
|
||||
优化的转移策略:
|
||||
1. 优先选择重要性 >= 阈值的记忆进行转移
|
||||
2. 如果高重要性记忆已清空但仍超过容量,则考虑转移低重要性记忆
|
||||
3. 仅当转移不能解决容量问题时,才进行强制删除(由 force_cleanup_overflow 处理)
|
||||
|
||||
返回:
|
||||
需要转移的记忆列表(优先返回高重要性,次选低重要性)
|
||||
策略:
|
||||
- 当短期记忆数量达到上限(>= max_memories)时,返回当前全部短期记忆;
|
||||
- 没满则返回空列表,不触发转移。
|
||||
"""
|
||||
# 单次遍历:同时分类高重要性和低重要性记忆
|
||||
high_importance_memories = []
|
||||
low_importance_memories = []
|
||||
|
||||
for mem in self.memories:
|
||||
if mem.importance >= self.transfer_importance_threshold:
|
||||
high_importance_memories.append(mem)
|
||||
else:
|
||||
low_importance_memories.append(mem)
|
||||
|
||||
# 策略1:优先返回高重要性记忆进行转移
|
||||
if high_importance_memories:
|
||||
logger.debug(
|
||||
f"转移候选: 发现 {len(high_importance_memories)} 条高重要性记忆待转移"
|
||||
)
|
||||
return high_importance_memories
|
||||
|
||||
# 策略2:如果没有高重要性记忆但总体超过容量上限,
|
||||
# 返回一部分低重要性记忆用于转移(而非删除)
|
||||
if len(self.memories) > self.max_memories:
|
||||
# 计算需要转移的数量(目标:降到上限)
|
||||
num_to_transfer = len(self.memories) - self.max_memories
|
||||
|
||||
# 按创建时间排序低重要性记忆,优先转移最早的(可能包含过时信息)
|
||||
low_importance_memories.sort(key=lambda x: x.created_at)
|
||||
to_transfer = low_importance_memories[:num_to_transfer]
|
||||
|
||||
if to_transfer:
|
||||
logger.debug(
|
||||
f"转移候选: 发现 {len(to_transfer)} 条低重要性记忆待转移 "
|
||||
f"(当前容量 {len(self.memories)}/{self.max_memories})"
|
||||
)
|
||||
return to_transfer
|
||||
|
||||
# 策略3:容量充足,无需转移
|
||||
logger.debug(
|
||||
f"转移检查: 无需转移 (当前容量 {len(self.memories)}/{self.max_memories})"
|
||||
)
|
||||
if self.max_memories <= 0:
|
||||
return []
|
||||
if len(self.memories) >= self.max_memories:
|
||||
logger.debug(f"转移候选: 短期记忆已满,准备整批转移 {len(self.memories)} 条")
|
||||
return list(self.memories)
|
||||
return []
|
||||
|
||||
def force_cleanup_overflow(self, keep_ratio: float | None = None) -> int:
|
||||
|
||||
@@ -1,15 +1,12 @@
|
||||
# 短期记忆压力泄压补丁
|
||||
# 短期记忆压力泄压补丁(已弃用)
|
||||
|
||||
## 📋 概述
|
||||
|
||||
在高频消息场景下,短期记忆层(`ShortTermMemoryManager`)可能在自动转移机制触发前快速堆积大量记忆,当达到容量上限(`max_memories`)时可能阻塞后续写入。本功能提供一个**可选的泄压开关**,在容量溢出时自动删除低优先级记忆,防止系统阻塞。
|
||||
该文档描述的“泄压删除”与“复杂自动转移”机制已不再作为默认策略使用:现在短期记忆采用最简单策略——**只有当短期记忆满额时,才整批转移全部短期记忆到长期记忆;没满就不处理**。因此,本补丁说明仅供历史参考。
|
||||
|
||||
**关键特性**:
|
||||
- ✅ 默认开启(在高频场景中保护系统),可关闭保持向后兼容
|
||||
- ✅ 基于重要性和时间的智能删除策略
|
||||
- ✅ 异步持久化,不阻塞主流程
|
||||
- ✅ 可通过配置文件或代码灵活控制
|
||||
- ✅ 支持自定义保留比例
|
||||
**当前行为(简化版)**:
|
||||
- ✅ 短期记忆未满:不触发转移
|
||||
- ✅ 短期记忆满额:一次性整批转移全部短期记忆到长期记忆
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -112,8 +112,6 @@ class UnifiedMemoryManager:
|
||||
self._initialized = False
|
||||
self._auto_transfer_task: asyncio.Task | None = None
|
||||
self._auto_transfer_interval = max(10.0, float(long_term_auto_transfer_interval))
|
||||
# 优化:降低最大延迟时间,加快转移节奏 (原为 300.0)
|
||||
self._max_transfer_delay = min(max(30.0, self._auto_transfer_interval), 60.0)
|
||||
self._transfer_wakeup_event: asyncio.Event | None = None
|
||||
|
||||
logger.info("统一记忆管理器已创建")
|
||||
@@ -574,11 +572,7 @@ class UnifiedMemoryManager:
|
||||
logger.debug("自动转移任务已启动并触发首次检查")
|
||||
|
||||
async def _auto_transfer_loop(self) -> None:
|
||||
"""自动转移循环(批量缓存模式,优化:更高效的缓存管理)"""
|
||||
transfer_cache: list[ShortTermMemory] = []
|
||||
cached_ids: set[str] = set()
|
||||
cache_size_threshold = max(1, self._config["long_term"].get("batch_size", 1))
|
||||
last_transfer_time = time.monotonic()
|
||||
"""自动转移循环(简化版:短期记忆满额时整批转移)"""
|
||||
|
||||
while True:
|
||||
try:
|
||||
@@ -595,67 +589,25 @@ class UnifiedMemoryManager:
|
||||
else:
|
||||
await asyncio.sleep(sleep_interval)
|
||||
|
||||
memories_to_transfer = self.short_term_manager.get_memories_for_transfer()
|
||||
|
||||
if memories_to_transfer:
|
||||
# 优化:批量构建缓存而不是逐条添加
|
||||
new_memories = []
|
||||
for memory in memories_to_transfer:
|
||||
mem_id = getattr(memory, "id", None)
|
||||
if not (mem_id and mem_id in cached_ids):
|
||||
new_memories.append(memory)
|
||||
if mem_id:
|
||||
cached_ids.add(mem_id)
|
||||
|
||||
if new_memories:
|
||||
transfer_cache.extend(new_memories)
|
||||
logger.debug(
|
||||
f"自动转移缓存: 新增{len(new_memories)}条, 当前缓存{len(transfer_cache)}/{cache_size_threshold}"
|
||||
)
|
||||
|
||||
# 最简单策略:仅当短期记忆满额时,直接整批转移全部短期记忆;没满则不处理
|
||||
max_memories = max(1, getattr(self.short_term_manager, "max_memories", 1))
|
||||
occupancy_ratio = len(self.short_term_manager.memories) / max_memories
|
||||
time_since_last_transfer = time.monotonic() - last_transfer_time
|
||||
if len(self.short_term_manager.memories) < max_memories:
|
||||
continue
|
||||
|
||||
if occupancy_ratio >= 1.0 and not transfer_cache:
|
||||
removed = self.short_term_manager.force_cleanup_overflow()
|
||||
if removed > 0:
|
||||
logger.warning(
|
||||
f"短期记忆占用率 {occupancy_ratio:.0%},已强制删除 {removed} 条低重要性记忆泄压"
|
||||
batch = list(self.short_term_manager.memories)
|
||||
if not batch:
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
f"短期记忆已满({len(batch)}/{max_memories}),开始整批转移到长期记忆"
|
||||
)
|
||||
|
||||
# 优化:优先级判断重构(早期 return)
|
||||
should_transfer = (
|
||||
len(transfer_cache) >= cache_size_threshold
|
||||
or occupancy_ratio >= 0.5
|
||||
or (transfer_cache and time_since_last_transfer >= self._max_transfer_delay)
|
||||
or len(self.short_term_manager.memories) >= self.short_term_manager.max_memories
|
||||
)
|
||||
|
||||
if should_transfer and transfer_cache:
|
||||
logger.debug(
|
||||
f"准备批量转移: {len(transfer_cache)}条短期记忆到长期记忆 (占用率 {occupancy_ratio:.0%})"
|
||||
)
|
||||
|
||||
# 优化:直接传递列表而不再复制
|
||||
result = await self.long_term_manager.transfer_from_short_term(transfer_cache)
|
||||
result = await self.long_term_manager.transfer_from_short_term(batch)
|
||||
|
||||
if result.get("transferred_memory_ids"):
|
||||
transferred_ids = set(result["transferred_memory_ids"])
|
||||
await self.short_term_manager.clear_transferred_memories(
|
||||
result["transferred_memory_ids"]
|
||||
)
|
||||
|
||||
# 优化:使用生成器表达式保留未转移的记忆
|
||||
transfer_cache = [
|
||||
m
|
||||
for m in transfer_cache
|
||||
if getattr(m, "id", None) not in transferred_ids
|
||||
]
|
||||
cached_ids.difference_update(transferred_ids)
|
||||
|
||||
last_transfer_time = time.monotonic()
|
||||
logger.debug(f"✅ 批量转移完成: {result}")
|
||||
logger.debug(f"✅ 整批转移完成: {result}")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("自动转移循环被取消")
|
||||
@@ -674,10 +626,16 @@ class UnifiedMemoryManager:
|
||||
await self.initialize()
|
||||
|
||||
try:
|
||||
memories_to_transfer = self.short_term_manager.get_memories_for_transfer()
|
||||
max_memories = max(1, getattr(self.short_term_manager, "max_memories", 1))
|
||||
if len(self.short_term_manager.memories) < max_memories:
|
||||
return {
|
||||
"message": f"短期记忆未满({len(self.short_term_manager.memories)}/{max_memories}),不触发转移",
|
||||
"transferred_count": 0,
|
||||
}
|
||||
|
||||
memories_to_transfer = list(self.short_term_manager.memories)
|
||||
if not memories_to_transfer:
|
||||
return {"message": "没有需要转移的记忆", "transferred_count": 0}
|
||||
return {"message": "短期记忆为空,无需转移", "transferred_count": 0}
|
||||
|
||||
# 执行转移
|
||||
result = await self.long_term_manager.transfer_from_short_term(memories_to_transfer)
|
||||
|
||||
Reference in New Issue
Block a user