feat: 优化长期记忆转移间隔和最大延迟,增强短期记忆清理逻辑,改进三级记忆系统属性处理

This commit is contained in:
Windpicker-owo
2025-11-19 18:52:01 +08:00
parent 14133410e6
commit 5231404852
5 changed files with 100 additions and 30 deletions

View File

@@ -530,7 +530,7 @@ class ThreeTierMemoryConfig(ValidatedConfigBase):
# 长期记忆层配置
long_term_batch_size: int = Field(default=10, description="批量转移大小")
long_term_decay_factor: float = Field(default=0.95, description="衰减因子")
long_term_auto_transfer_interval: int = Field(default=600, description="自动转移间隔(秒)")
long_term_auto_transfer_interval: int = Field(default=60, description="自动转移间隔(秒)")
# Judge模型配置
judge_model_name: str = Field(default="utils_small", description="用于决策的LLM模型")

View File

@@ -553,9 +553,17 @@ class LongTermMemoryManager:
return lowered.startswith(("new_", "temp_"))
def _register_temp_id(
self, placeholder: str | None, actual_id: str, temp_id_map: dict[str, str]
self,
placeholder: str | None,
actual_id: str,
temp_id_map: dict[str, str],
force: bool = False,
) -> None:
if actual_id and placeholder and self._is_placeholder_id(placeholder):
if not actual_id or not placeholder or not isinstance(placeholder, str):
return
if placeholder == actual_id:
return
if force or self._is_placeholder_id(placeholder):
temp_id_map[placeholder] = actual_id
def _resolve_id(self, raw_id: str | None, temp_id_map: dict[str, str]) -> str | None:
@@ -578,22 +586,36 @@ class LongTermMemoryManager:
return {k: self._resolve_value(v, temp_id_map) for k, v in params.items()}
def _register_aliases_from_params(
self, params: dict[str, Any], actual_id: str, temp_id_map: dict[str, str]
self,
params: dict[str, Any],
actual_id: str,
temp_id_map: dict[str, str],
*,
extra_keywords: tuple[str, ...] = (),
force: bool = False,
) -> None:
alias_keywords = ("alias", "placeholder", "temp_id", "register_as")
alias_keywords = ("alias", "placeholder", "temp_id", "register_as") + tuple(
extra_keywords
)
for key, value in params.items():
if isinstance(value, str):
lower_key = key.lower()
if any(keyword in lower_key for keyword in alias_keywords):
self._register_temp_id(value, actual_id, temp_id_map)
self._register_temp_id(value, actual_id, temp_id_map, force=force)
elif isinstance(value, list):
lower_key = key.lower()
if any(keyword in lower_key for keyword in alias_keywords):
for item in value:
if isinstance(item, str):
self._register_temp_id(item, actual_id, temp_id_map)
self._register_temp_id(item, actual_id, temp_id_map, force=force)
elif isinstance(value, dict):
self._register_aliases_from_params(value, actual_id, temp_id_map)
self._register_aliases_from_params(
value,
actual_id,
temp_id_map,
extra_keywords=extra_keywords,
force=force,
)
async def _execute_create_memory(
self,
@@ -620,7 +642,13 @@ class LongTermMemoryManager:
logger.info(f"✅ 创建长期记忆: {memory.id} (来自短期记忆 {source_stm.id})")
self._register_temp_id(op.target_id, memory.id, temp_id_map)
self._register_aliases_from_params(op.parameters, memory.id, temp_id_map)
self._register_aliases_from_params(
op.parameters,
memory.id,
temp_id_map,
extra_keywords=("memory_id", "memory_alias", "memory_placeholder"),
force=True,
)
else:
logger.error(f"创建长期记忆失败: {op}")
@@ -722,7 +750,13 @@ class LongTermMemoryManager:
asyncio.create_task(self._generate_node_embedding(node_id, content))
logger.info(f"✅ 创建节点: {content} ({node_type}) -> {memory_id}")
self._register_temp_id(op.target_id, node_id, temp_id_map)
self._register_aliases_from_params(op.parameters, node_id, temp_id_map)
self._register_aliases_from_params(
op.parameters,
node_id,
temp_id_map,
extra_keywords=("node_id", "node_alias", "node_placeholder"),
force=True,
)
else:
logger.error(f"创建节点失败: {op}")

View File

@@ -566,12 +566,43 @@ class ShortTermMemoryManager:
"""
获取需要转移到长期记忆的记忆
筛选条件:重要性 >= transfer_importance_threshold
Returns:
待转移的记忆列表
逻辑:
1. 优先选择重要性 >= 阈值的记忆
2. 如果剩余记忆数量仍超过 max_memories直接清理最早的低重要性记忆直到低于上限
"""
return [mem for mem in self.memories if mem.importance >= self.transfer_importance_threshold]
# 1. 正常筛选:重要性达标的记忆
candidates = [mem for mem in self.memories if mem.importance >= self.transfer_importance_threshold]
candidate_ids = {mem.id for mem in candidates}
# 2. 检查低重要性记忆是否积压
# 剩余的都是低重要性记忆
low_importance_memories = [mem for mem in self.memories if mem.id not in candidate_ids]
# 如果低重要性记忆数量超过了上限(说明积压严重)
# 我们需要清理掉一部分,而不是转移它们
if len(low_importance_memories) > self.max_memories:
# 目标保留数量(降至上限的 90%
target_keep_count = int(self.max_memories * 0.9)
num_to_remove = len(low_importance_memories) - target_keep_count
if num_to_remove > 0:
# 按创建时间排序,删除最早的
low_importance_memories.sort(key=lambda x: x.created_at)
to_remove = low_importance_memories[:num_to_remove]
for mem in to_remove:
if mem in self.memories:
self.memories.remove(mem)
logger.info(
f"短期记忆清理: 移除了 {len(to_remove)} 条低重要性记忆 "
f"(保留 {len(self.memories)} 条)"
)
# 触发保存
asyncio.create_task(self._save_to_disk())
return candidates
async def clear_transferred_memories(self, memory_ids: list[str]) -> None:
"""

View File

@@ -108,7 +108,8 @@ class UnifiedMemoryManager:
self._initialized = False
self._auto_transfer_task: asyncio.Task | None = None
self._auto_transfer_interval = max(10.0, float(long_term_auto_transfer_interval))
self._max_transfer_delay = min(max(30.0, self._auto_transfer_interval), 300.0)
# 优化:降低最大延迟时间,加快转移节奏 (原为 300.0)
self._max_transfer_delay = min(max(30.0, self._auto_transfer_interval), 60.0)
self._transfer_wakeup_event: asyncio.Event | None = None
logger.info("统一记忆管理器已创建")
@@ -430,14 +431,15 @@ class UnifiedMemoryManager:
max_memories = max(1, getattr(self.short_term_manager, "max_memories", 1))
occupancy = len(self.short_term_manager.memories) / max_memories
if occupancy >= 0.9:
return max(5.0, base_interval * 0.1)
if occupancy >= 0.75:
return max(10.0, base_interval * 0.2)
# 优化:更激进的自适应间隔,加快高负载下的转移
if occupancy >= 0.8:
return max(2.0, base_interval * 0.1)
if occupancy >= 0.5:
return max(15.0, base_interval * 0.4)
return max(5.0, base_interval * 0.2)
if occupancy >= 0.3:
return max(20.0, base_interval * 0.6)
return max(10.0, base_interval * 0.4)
if occupancy >= 0.1:
return max(15.0, base_interval * 0.6)
return base_interval
@@ -470,7 +472,7 @@ class UnifiedMemoryManager:
if len(deduplicated) <= 1:
return []
manual_queries: list[dict[str, float]] = []
manual_queries: list[dict[str, Any]] = []
decay = 0.15
for idx, text in enumerate(deduplicated):
weight = max(0.3, 1.0 - idx * decay)
@@ -587,7 +589,7 @@ class UnifiedMemoryManager:
should_transfer = (
len(transfer_cache) >= cache_size_threshold
or occupancy_ratio >= 0.85
or occupancy_ratio >= 0.5 # 优化:降低触发阈值 (原为 0.85)
or (transfer_cache and time_since_last_transfer >= self._max_transfer_delay)
or len(self.short_term_manager.memories) >= self.short_term_manager.max_memories
)

View File

@@ -312,7 +312,7 @@ class ThreeTierMemoryFormatter:
# 查找客体和属性
objects = []
attributes = []
attributes = {}
for edge in memory.edges:
edge_type = edge.edge_type.value if hasattr(edge.edge_type, 'value') else str(edge.edge_type)
@@ -329,17 +329,18 @@ class ThreeTierMemoryFormatter:
attr_node = memory.get_node_by_id(edge.target_id)
if attr_node:
attr_name = edge.relation if edge.relation else "属性"
attributes.append(f"{attr_name}{attr_node.content}")
# 使用字典避免重复属性,后面的会覆盖前面的
attributes[attr_name] = attr_node.content
# 检查节点中的属性
# 检查节点中的属性(处理 "key=value" 格式)
for node in memory.nodes:
if hasattr(node, 'node_type') and str(node.node_type) == "属性":
# 处理 "key=value" 格式的属性
if "=" in node.content:
key, value = node.content.split("=", 1)
attributes.append(f"{key.strip()}{value.strip()}")
attributes[key.strip()] = value.strip()
else:
attributes.append(f"属性{node.content}")
attributes["属性"] = node.content
# 构建最终格式
result = f"[{type_label}] {subject}-{topic}"
@@ -348,7 +349,9 @@ class ThreeTierMemoryFormatter:
result += "-" + "-".join(objects)
if attributes:
result += "" + "".join(attributes) + ""
# 将属性字典格式化为简洁的字符串
attr_strs = [f"{key}{value}" for key, value in attributes.items()]
result += "" + "".join(attr_strs) + ""
return result