feat:实现短期内存管理器和统一内存管理器
- 添加了ShortTermMemoryManager来管理短期记忆,包括提取、决策和记忆操作。 - 集成大型语言模型(LLM),用于结构化记忆提取和决策过程。 - 基于重要性阈值,实现了从短期到长期的内存转移逻辑。 - 创建了UnifiedMemoryManager,通过统一接口整合感知记忆、短期记忆和长期记忆的管理。 - 通过法官模型评估来增强记忆提取过程的充分性。 - 增加了自动和手动内存传输功能。 - 包含内存管理操作和决策的全面日志记录。
This commit is contained in:
@@ -1,230 +0,0 @@
|
||||
"""
|
||||
图扩展工具(优化版)
|
||||
|
||||
提供记忆图的扩展算法,用于从初始记忆集合沿图结构扩展查找相关记忆。
|
||||
优化重点:
|
||||
1. 改进BFS遍历效率
|
||||
2. 批量向量检索,减少数据库调用
|
||||
3. 早停机制,避免不必要的扩展
|
||||
4. 更清晰的日志输出
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from src.common.logger import get_logger
|
||||
from src.memory_graph.utils.similarity import cosine_similarity
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import numpy as np
|
||||
|
||||
from src.memory_graph.storage.graph_store import GraphStore
|
||||
from src.memory_graph.storage.vector_store import VectorStore
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
async def expand_memories_with_semantic_filter(
|
||||
graph_store: "GraphStore",
|
||||
vector_store: "VectorStore",
|
||||
initial_memory_ids: list[str],
|
||||
query_embedding: "np.ndarray",
|
||||
max_depth: int = 2,
|
||||
semantic_threshold: float = 0.5,
|
||||
max_expanded: int = 20,
|
||||
) -> list[tuple[str, float]]:
|
||||
"""
|
||||
从初始记忆集合出发,沿图结构扩展,并用语义相似度过滤(优化版)
|
||||
|
||||
这个方法解决了纯向量搜索可能遗漏的"语义相关且图结构相关"的记忆。
|
||||
|
||||
优化改进:
|
||||
- 使用记忆级别的BFS,而非节点级别(更直接)
|
||||
- 批量获取邻居记忆,减少遍历次数
|
||||
- 早停机制:达到max_expanded后立即停止
|
||||
- 更详细的调试日志
|
||||
|
||||
Args:
|
||||
graph_store: 图存储
|
||||
vector_store: 向量存储
|
||||
initial_memory_ids: 初始记忆ID集合(由向量搜索得到)
|
||||
query_embedding: 查询向量
|
||||
max_depth: 最大扩展深度(1-3推荐)
|
||||
semantic_threshold: 语义相似度阈值(0.5推荐)
|
||||
max_expanded: 最多扩展多少个记忆
|
||||
|
||||
Returns:
|
||||
List[(memory_id, relevance_score)] 按相关度排序
|
||||
"""
|
||||
if not initial_memory_ids or query_embedding is None:
|
||||
return []
|
||||
|
||||
try:
|
||||
import time
|
||||
start_time = time.time()
|
||||
|
||||
# 记录已访问的记忆,避免重复
|
||||
visited_memories = set(initial_memory_ids)
|
||||
# 记录扩展的记忆及其分数
|
||||
expanded_memories: dict[str, float] = {}
|
||||
|
||||
# BFS扩展(基于记忆而非节点)
|
||||
current_level_memories = initial_memory_ids
|
||||
depth_stats = [] # 每层统计
|
||||
|
||||
for depth in range(max_depth):
|
||||
next_level_memories = []
|
||||
candidates_checked = 0
|
||||
candidates_passed = 0
|
||||
|
||||
logger.debug(f"🔍 图扩展 - 深度 {depth+1}/{max_depth}, 当前层记忆数: {len(current_level_memories)}")
|
||||
|
||||
# 遍历当前层的记忆
|
||||
for memory_id in current_level_memories:
|
||||
memory = graph_store.get_memory_by_id(memory_id)
|
||||
if not memory:
|
||||
continue
|
||||
|
||||
# 获取该记忆的邻居记忆(通过边关系)
|
||||
neighbor_memory_ids = set()
|
||||
|
||||
# 🆕 遍历记忆的所有边,收集邻居记忆(带边类型权重)
|
||||
edge_weights = {} # 记录通过不同边类型到达的记忆的权重
|
||||
|
||||
for edge in memory.edges:
|
||||
# 获取边的目标节点
|
||||
target_node_id = edge.target_id
|
||||
source_node_id = edge.source_id
|
||||
|
||||
# 🆕 根据边类型设置权重(优先扩展REFERENCE、ATTRIBUTE相关的边)
|
||||
edge_type_str = edge.edge_type.value if hasattr(edge.edge_type, "value") else str(edge.edge_type)
|
||||
if edge_type_str == "REFERENCE":
|
||||
edge_weight = 1.3 # REFERENCE边权重最高(引用关系)
|
||||
elif edge_type_str in ["ATTRIBUTE", "HAS_PROPERTY"]:
|
||||
edge_weight = 1.2 # 属性边次之
|
||||
elif edge_type_str == "TEMPORAL":
|
||||
edge_weight = 0.7 # 时间关系降权(避免扩展到无关时间点)
|
||||
elif edge_type_str == "RELATION":
|
||||
edge_weight = 0.9 # 一般关系适中降权
|
||||
else:
|
||||
edge_weight = 1.0 # 默认权重
|
||||
|
||||
# 通过节点找到其他记忆
|
||||
for node_id in [target_node_id, source_node_id]:
|
||||
if node_id in graph_store.node_to_memories:
|
||||
for neighbor_id in graph_store.node_to_memories[node_id]:
|
||||
if neighbor_id not in edge_weights or edge_weights[neighbor_id] < edge_weight:
|
||||
edge_weights[neighbor_id] = edge_weight
|
||||
|
||||
# 将权重高的邻居记忆加入候选
|
||||
for neighbor_id, edge_weight in edge_weights.items():
|
||||
neighbor_memory_ids.add((neighbor_id, edge_weight))
|
||||
|
||||
# 过滤掉已访问的和自己
|
||||
filtered_neighbors = []
|
||||
for neighbor_id, edge_weight in neighbor_memory_ids:
|
||||
if neighbor_id != memory_id and neighbor_id not in visited_memories:
|
||||
filtered_neighbors.append((neighbor_id, edge_weight))
|
||||
|
||||
# 批量评估邻居记忆
|
||||
for neighbor_mem_id, edge_weight in filtered_neighbors:
|
||||
candidates_checked += 1
|
||||
|
||||
neighbor_memory = graph_store.get_memory_by_id(neighbor_mem_id)
|
||||
if not neighbor_memory:
|
||||
continue
|
||||
|
||||
# 获取邻居记忆的主题节点向量
|
||||
topic_node = next(
|
||||
(n for n in neighbor_memory.nodes if n.has_embedding()),
|
||||
None
|
||||
)
|
||||
|
||||
if not topic_node or topic_node.embedding is None:
|
||||
continue
|
||||
|
||||
# 计算语义相似度
|
||||
semantic_sim = cosine_similarity(query_embedding, topic_node.embedding)
|
||||
|
||||
# 🆕 计算边的重要性(结合边类型权重和记忆重要性)
|
||||
edge_importance = neighbor_memory.importance * edge_weight * 0.5
|
||||
|
||||
# 🆕 综合评分:语义相似度(60%) + 边权重(20%) + 重要性(10%) + 深度衰减(10%)
|
||||
depth_decay = 1.0 / (depth + 2) # 深度衰减
|
||||
relevance_score = (
|
||||
semantic_sim * 0.60 + # 语义相似度主导 ⬆️
|
||||
edge_weight * 0.20 + # 边类型权重 🆕
|
||||
edge_importance * 0.10 + # 重要性降权 ⬇️
|
||||
depth_decay * 0.10 # 深度衰减
|
||||
)
|
||||
|
||||
# 只保留超过阈值的
|
||||
if relevance_score < semantic_threshold:
|
||||
continue
|
||||
|
||||
candidates_passed += 1
|
||||
|
||||
# 记录扩展的记忆
|
||||
if neighbor_mem_id not in expanded_memories:
|
||||
expanded_memories[neighbor_mem_id] = relevance_score
|
||||
visited_memories.add(neighbor_mem_id)
|
||||
next_level_memories.append(neighbor_mem_id)
|
||||
else:
|
||||
# 如果已存在,取最高分
|
||||
expanded_memories[neighbor_mem_id] = max(
|
||||
expanded_memories[neighbor_mem_id], relevance_score
|
||||
)
|
||||
|
||||
# 早停:达到最大扩展数量
|
||||
if len(expanded_memories) >= max_expanded:
|
||||
logger.debug(f"⏹️ 提前停止:已达到最大扩展数量 {max_expanded}")
|
||||
break
|
||||
|
||||
# 早停检查
|
||||
if len(expanded_memories) >= max_expanded:
|
||||
break
|
||||
|
||||
# 记录本层统计
|
||||
depth_stats.append({
|
||||
"depth": depth + 1,
|
||||
"checked": candidates_checked,
|
||||
"passed": candidates_passed,
|
||||
"expanded_total": len(expanded_memories)
|
||||
})
|
||||
|
||||
# 如果没有新记忆或已达到数量限制,提前终止
|
||||
if not next_level_memories or len(expanded_memories) >= max_expanded:
|
||||
logger.debug(f"⏹️ 停止扩展:{'无新记忆' if not next_level_memories else '达到上限'}")
|
||||
break
|
||||
|
||||
# 限制下一层的记忆数量,避免爆炸性增长
|
||||
current_level_memories = next_level_memories[:max_expanded]
|
||||
|
||||
# 每层让出控制权
|
||||
await asyncio.sleep(0.001)
|
||||
|
||||
# 排序并返回
|
||||
sorted_results = sorted(expanded_memories.items(), key=lambda x: x[1], reverse=True)[:max_expanded]
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
logger.info(
|
||||
f"✅ 图扩展完成: 初始{len(initial_memory_ids)}个 → "
|
||||
f"扩展{len(sorted_results)}个新记忆 "
|
||||
f"(深度={max_depth}, 阈值={semantic_threshold:.2f}, 耗时={elapsed:.3f}s)"
|
||||
)
|
||||
|
||||
# 输出每层统计
|
||||
for stat in depth_stats:
|
||||
logger.debug(
|
||||
f" 深度{stat['depth']}: 检查{stat['checked']}个, "
|
||||
f"通过{stat['passed']}个, 累计扩展{stat['expanded_total']}个"
|
||||
)
|
||||
|
||||
return sorted_results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"语义图扩展失败: {e}", exc_info=True)
|
||||
return []
|
||||
|
||||
|
||||
__all__ = ["expand_memories_with_semantic_filter"]
|
||||
@@ -1,223 +0,0 @@
|
||||
"""
|
||||
记忆去重与聚合工具
|
||||
|
||||
用于在检索结果中识别并合并相似的记忆,提高结果质量
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from src.common.logger import get_logger
|
||||
from src.memory_graph.utils.similarity import cosine_similarity
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
async def deduplicate_memories_by_similarity(
|
||||
memories: list[tuple[Any, float, Any]], # [(Memory, score, extra_data), ...]
|
||||
similarity_threshold: float = 0.85,
|
||||
keep_top_n: int | None = None,
|
||||
) -> list[tuple[Any, float, Any]]:
|
||||
"""
|
||||
基于相似度对记忆进行去重聚合
|
||||
|
||||
策略:
|
||||
1. 计算所有记忆对之间的相似度
|
||||
2. 当相似度 > threshold 时,合并为一条记忆
|
||||
3. 保留分数更高的记忆,丢弃分数较低的
|
||||
4. 合并后的记忆分数为原始分数的加权平均
|
||||
|
||||
Args:
|
||||
memories: 记忆列表 [(Memory, score, extra_data), ...]
|
||||
similarity_threshold: 相似度阈值(0.85 表示 85% 相似即视为重复)
|
||||
keep_top_n: 去重后保留的最大数量(None 表示不限制)
|
||||
|
||||
Returns:
|
||||
去重后的记忆列表 [(Memory, adjusted_score, extra_data), ...]
|
||||
"""
|
||||
if len(memories) <= 1:
|
||||
return memories
|
||||
|
||||
logger.info(f"开始记忆去重: {len(memories)} 条记忆 (阈值={similarity_threshold})")
|
||||
|
||||
# 准备数据结构
|
||||
memory_embeddings = []
|
||||
for memory, score, extra in memories:
|
||||
# 获取记忆的向量表示
|
||||
embedding = await _get_memory_embedding(memory)
|
||||
memory_embeddings.append((memory, score, extra, embedding))
|
||||
|
||||
# 构建相似度矩阵并找出重复组
|
||||
duplicate_groups = _find_duplicate_groups(memory_embeddings, similarity_threshold)
|
||||
|
||||
# 合并每个重复组
|
||||
deduplicated = []
|
||||
processed_indices = set()
|
||||
|
||||
for group_indices in duplicate_groups:
|
||||
if any(i in processed_indices for i in group_indices):
|
||||
continue # 已经处理过
|
||||
|
||||
# 标记为已处理
|
||||
processed_indices.update(group_indices)
|
||||
|
||||
# 合并组内记忆
|
||||
group_memories = [memory_embeddings[i] for i in group_indices]
|
||||
merged_memory = _merge_memory_group(group_memories)
|
||||
deduplicated.append(merged_memory)
|
||||
|
||||
# 添加未被合并的记忆
|
||||
for i, (memory, score, extra, _) in enumerate(memory_embeddings):
|
||||
if i not in processed_indices:
|
||||
deduplicated.append((memory, score, extra))
|
||||
|
||||
# 按分数排序
|
||||
deduplicated.sort(key=lambda x: x[1], reverse=True)
|
||||
|
||||
# 限制数量
|
||||
if keep_top_n is not None:
|
||||
deduplicated = deduplicated[:keep_top_n]
|
||||
|
||||
logger.info(
|
||||
f"去重完成: {len(memories)} → {len(deduplicated)} 条记忆 "
|
||||
f"(合并了 {len(memories) - len(deduplicated)} 条重复)"
|
||||
)
|
||||
|
||||
return deduplicated
|
||||
|
||||
|
||||
async def _get_memory_embedding(memory: Any) -> list[float] | None:
|
||||
"""
|
||||
获取记忆的向量表示
|
||||
|
||||
策略:
|
||||
1. 如果记忆有节点,使用第一个节点的 ID 查询向量存储
|
||||
2. 返回节点的 embedding
|
||||
3. 如果无法获取,返回 None
|
||||
"""
|
||||
# 尝试从节点获取 embedding
|
||||
if hasattr(memory, "nodes") and memory.nodes:
|
||||
# nodes 是 MemoryNode 对象列表
|
||||
first_node = memory.nodes[0]
|
||||
node_id = getattr(first_node, "id", None)
|
||||
|
||||
if node_id:
|
||||
# 直接从 embedding 属性获取(如果存在)
|
||||
if hasattr(first_node, "embedding") and first_node.embedding is not None:
|
||||
embedding = first_node.embedding
|
||||
# 转换为列表
|
||||
if hasattr(embedding, "tolist"):
|
||||
return embedding.tolist()
|
||||
elif isinstance(embedding, list):
|
||||
return embedding
|
||||
|
||||
# 无法获取 embedding
|
||||
return None
|
||||
|
||||
|
||||
def _find_duplicate_groups(
|
||||
memory_embeddings: list[tuple[Any, float, Any, list[float] | None]],
|
||||
threshold: float
|
||||
) -> list[list[int]]:
|
||||
"""
|
||||
找出相似度超过阈值的记忆组
|
||||
|
||||
Returns:
|
||||
List of groups, each group is a list of indices
|
||||
例如: [[0, 3, 7], [1, 4], [2, 5, 6]] 表示 3 个重复组
|
||||
"""
|
||||
n = len(memory_embeddings)
|
||||
similarity_matrix = [[0.0] * n for _ in range(n)]
|
||||
|
||||
# 计算相似度矩阵
|
||||
for i in range(n):
|
||||
for j in range(i + 1, n):
|
||||
embedding_i = memory_embeddings[i][3]
|
||||
embedding_j = memory_embeddings[j][3]
|
||||
|
||||
# 跳过 None 或零向量
|
||||
if (embedding_i is None or embedding_j is None or
|
||||
all(x == 0.0 for x in embedding_i) or all(x == 0.0 for x in embedding_j)):
|
||||
similarity = 0.0
|
||||
else:
|
||||
# cosine_similarity 会自动转换为 numpy 数组
|
||||
similarity = float(cosine_similarity(embedding_i, embedding_j)) # type: ignore
|
||||
|
||||
similarity_matrix[i][j] = similarity
|
||||
similarity_matrix[j][i] = similarity
|
||||
|
||||
# 使用并查集找出连通分量
|
||||
parent = list(range(n))
|
||||
|
||||
def find(x):
|
||||
if parent[x] != x:
|
||||
parent[x] = find(parent[x])
|
||||
return parent[x]
|
||||
|
||||
def union(x, y):
|
||||
px, py = find(x), find(y)
|
||||
if px != py:
|
||||
parent[px] = py
|
||||
|
||||
# 合并相似的记忆
|
||||
for i in range(n):
|
||||
for j in range(i + 1, n):
|
||||
if similarity_matrix[i][j] >= threshold:
|
||||
union(i, j)
|
||||
|
||||
# 构建组
|
||||
groups_dict: dict[int, list[int]] = {}
|
||||
for i in range(n):
|
||||
root = find(i)
|
||||
if root not in groups_dict:
|
||||
groups_dict[root] = []
|
||||
groups_dict[root].append(i)
|
||||
|
||||
# 只返回大小 > 1 的组(真正的重复组)
|
||||
duplicate_groups = [group for group in groups_dict.values() if len(group) > 1]
|
||||
|
||||
return duplicate_groups
|
||||
|
||||
|
||||
def _merge_memory_group(
|
||||
group: list[tuple[Any, float, Any, list[float] | None]]
|
||||
) -> tuple[Any, float, Any]:
|
||||
"""
|
||||
合并一组相似的记忆
|
||||
|
||||
策略:
|
||||
1. 保留分数最高的记忆作为代表
|
||||
2. 合并后的分数 = 所有记忆分数的加权平均(权重随排名递减)
|
||||
3. 在 extra_data 中记录合并信息
|
||||
"""
|
||||
# 按分数排序
|
||||
sorted_group = sorted(group, key=lambda x: x[1], reverse=True)
|
||||
|
||||
# 保留分数最高的记忆
|
||||
best_memory, best_score, best_extra, _ = sorted_group[0]
|
||||
|
||||
# 计算合并后的分数(加权平均,权重递减)
|
||||
total_weight = 0.0
|
||||
weighted_sum = 0.0
|
||||
for i, (_, score, _, _) in enumerate(sorted_group):
|
||||
weight = 1.0 / (i + 1) # 第1名权重1.0,第2名0.5,第3名0.33...
|
||||
weighted_sum += score * weight
|
||||
total_weight += weight
|
||||
|
||||
merged_score = weighted_sum / total_weight if total_weight > 0 else best_score
|
||||
|
||||
# 增强 extra_data
|
||||
merged_extra = best_extra if isinstance(best_extra, dict) else {}
|
||||
merged_extra["merged_count"] = len(sorted_group)
|
||||
merged_extra["original_scores"] = [score for _, score, _, _ in sorted_group]
|
||||
|
||||
logger.debug(
|
||||
f"合并 {len(sorted_group)} 条相似记忆: "
|
||||
f"分数 {best_score:.3f} → {merged_score:.3f}"
|
||||
)
|
||||
|
||||
return (best_memory, merged_score, merged_extra)
|
||||
@@ -1,320 +0,0 @@
|
||||
"""
|
||||
记忆格式化工具
|
||||
|
||||
用于将记忆图系统的Memory对象转换为适合提示词的自然语言描述
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from src.memory_graph.models import EdgeType, Memory, MemoryType, NodeType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def format_memory_for_prompt(memory: Memory, include_metadata: bool = False) -> str:
|
||||
"""
|
||||
将记忆对象格式化为适合提示词的自然语言描述
|
||||
|
||||
根据记忆的图结构,构建完整的主谓宾描述,包含:
|
||||
- 主语(subject node)
|
||||
- 谓语/动作(topic node)
|
||||
- 宾语/对象(object node,如果存在)
|
||||
- 属性信息(attributes,如时间、地点等)
|
||||
- 关系信息(记忆之间的关系)
|
||||
|
||||
Args:
|
||||
memory: 记忆对象
|
||||
include_metadata: 是否包含元数据(时间、重要性等)
|
||||
|
||||
Returns:
|
||||
格式化后的自然语言描述
|
||||
"""
|
||||
try:
|
||||
# 1. 获取主体节点(主语)
|
||||
subject_node = memory.get_subject_node()
|
||||
if not subject_node:
|
||||
logger.warning(f"记忆 {memory.id} 缺少主体节点")
|
||||
return "(记忆格式错误:缺少主体)"
|
||||
|
||||
subject_text = subject_node.content
|
||||
|
||||
# 2. 查找主题节点(谓语/动作)
|
||||
topic_node = None
|
||||
for edge in memory.edges:
|
||||
if edge.edge_type == EdgeType.MEMORY_TYPE and edge.source_id == memory.subject_id:
|
||||
topic_node = memory.get_node_by_id(edge.target_id)
|
||||
break
|
||||
|
||||
if not topic_node:
|
||||
logger.warning(f"记忆 {memory.id} 缺少主题节点")
|
||||
return f"{subject_text}(记忆格式错误:缺少主题)"
|
||||
|
||||
topic_text = topic_node.content
|
||||
|
||||
# 3. 查找客体节点(宾语)和核心关系
|
||||
object_node = None
|
||||
core_relation = None
|
||||
for edge in memory.edges:
|
||||
if edge.edge_type == EdgeType.CORE_RELATION and edge.source_id == topic_node.id:
|
||||
object_node = memory.get_node_by_id(edge.target_id)
|
||||
core_relation = edge.relation if edge.relation else ""
|
||||
break
|
||||
|
||||
# 4. 收集属性节点
|
||||
attributes: dict[str, str] = {}
|
||||
for edge in memory.edges:
|
||||
if edge.edge_type == EdgeType.ATTRIBUTE:
|
||||
# 查找属性节点和值节点
|
||||
attr_node = memory.get_node_by_id(edge.target_id)
|
||||
if attr_node and attr_node.node_type == NodeType.ATTRIBUTE:
|
||||
# 查找这个属性的值
|
||||
for value_edge in memory.edges:
|
||||
if (value_edge.edge_type == EdgeType.ATTRIBUTE
|
||||
and value_edge.source_id == attr_node.id):
|
||||
value_node = memory.get_node_by_id(value_edge.target_id)
|
||||
if value_node and value_node.node_type == NodeType.VALUE:
|
||||
attributes[attr_node.content] = value_node.content
|
||||
break
|
||||
|
||||
# 5. 构建自然语言描述
|
||||
parts = []
|
||||
|
||||
# 主谓宾结构
|
||||
if object_node is not None:
|
||||
# 有完整的主谓宾
|
||||
if core_relation:
|
||||
parts.append(f"{subject_text}-{topic_text}{core_relation}{object_node.content}")
|
||||
else:
|
||||
parts.append(f"{subject_text}-{topic_text}{object_node.content}")
|
||||
else:
|
||||
# 只有主谓
|
||||
parts.append(f"{subject_text}-{topic_text}")
|
||||
|
||||
# 添加属性信息
|
||||
if attributes:
|
||||
attr_parts = []
|
||||
# 优先显示时间和地点
|
||||
if "时间" in attributes:
|
||||
attr_parts.append(f"于{attributes['时间']}")
|
||||
if "地点" in attributes:
|
||||
attr_parts.append(f"在{attributes['地点']}")
|
||||
# 其他属性
|
||||
for key, value in attributes.items():
|
||||
if key not in ["时间", "地点"]:
|
||||
attr_parts.append(f"{key}:{value}")
|
||||
|
||||
if attr_parts:
|
||||
parts.append(f"({' '.join(attr_parts)})")
|
||||
|
||||
description = "".join(parts)
|
||||
|
||||
# 6. 添加元数据(可选)
|
||||
if include_metadata:
|
||||
metadata_parts = []
|
||||
|
||||
# 记忆类型
|
||||
if memory.memory_type:
|
||||
metadata_parts.append(f"类型:{memory.memory_type.value}")
|
||||
|
||||
# 重要性
|
||||
if memory.importance >= 0.8:
|
||||
metadata_parts.append("重要")
|
||||
elif memory.importance >= 0.6:
|
||||
metadata_parts.append("一般")
|
||||
|
||||
# 时间(如果没有在属性中)
|
||||
if "时间" not in attributes:
|
||||
time_str = _format_relative_time(memory.created_at)
|
||||
if time_str:
|
||||
metadata_parts.append(time_str)
|
||||
|
||||
if metadata_parts:
|
||||
description += f" [{', '.join(metadata_parts)}]"
|
||||
|
||||
return description
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"格式化记忆失败: {e}", exc_info=True)
|
||||
return f"(记忆格式化错误: {str(e)[:50]})"
|
||||
|
||||
|
||||
def format_memories_for_prompt(
|
||||
memories: list[Memory],
|
||||
max_count: int | None = None,
|
||||
include_metadata: bool = False,
|
||||
group_by_type: bool = False
|
||||
) -> str:
|
||||
"""
|
||||
批量格式化多条记忆为提示词文本
|
||||
|
||||
Args:
|
||||
memories: 记忆列表
|
||||
max_count: 最大记忆数量(可选)
|
||||
include_metadata: 是否包含元数据
|
||||
group_by_type: 是否按类型分组
|
||||
|
||||
Returns:
|
||||
格式化后的文本,包含标题和列表
|
||||
"""
|
||||
if not memories:
|
||||
return ""
|
||||
|
||||
# 限制数量
|
||||
if max_count:
|
||||
memories = memories[:max_count]
|
||||
|
||||
# 按类型分组
|
||||
if group_by_type:
|
||||
type_groups: dict[MemoryType, list[Memory]] = {}
|
||||
for memory in memories:
|
||||
if memory.memory_type not in type_groups:
|
||||
type_groups[memory.memory_type] = []
|
||||
type_groups[memory.memory_type].append(memory)
|
||||
|
||||
# 构建分组文本
|
||||
parts = ["### 🧠 相关记忆 (Relevant Memories)", ""]
|
||||
|
||||
type_order = [MemoryType.FACT, MemoryType.EVENT, MemoryType.RELATION, MemoryType.OPINION]
|
||||
for mem_type in type_order:
|
||||
if mem_type in type_groups:
|
||||
parts.append(f"#### {mem_type.value}")
|
||||
for memory in type_groups[mem_type]:
|
||||
desc = format_memory_for_prompt(memory, include_metadata)
|
||||
parts.append(f"- {desc}")
|
||||
parts.append("")
|
||||
|
||||
return "\n".join(parts)
|
||||
|
||||
else:
|
||||
# 不分组,直接列出
|
||||
parts = ["### 🧠 相关记忆 (Relevant Memories)", ""]
|
||||
|
||||
for memory in memories:
|
||||
# 获取类型标签
|
||||
type_label = memory.memory_type.value if memory.memory_type else "未知"
|
||||
|
||||
# 格式化记忆内容
|
||||
desc = format_memory_for_prompt(memory, include_metadata)
|
||||
|
||||
# 添加类型标签
|
||||
parts.append(f"- **[{type_label}]** {desc}")
|
||||
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
def get_memory_type_label(memory_type: str) -> str:
|
||||
"""
|
||||
获取记忆类型的中文标签
|
||||
|
||||
Args:
|
||||
memory_type: 记忆类型(可能是英文或中文)
|
||||
|
||||
Returns:
|
||||
中文标签
|
||||
"""
|
||||
# 映射表
|
||||
type_mapping = {
|
||||
# 英文到中文
|
||||
"event": "事件",
|
||||
"fact": "事实",
|
||||
"relation": "关系",
|
||||
"opinion": "观点",
|
||||
"preference": "偏好",
|
||||
"emotion": "情绪",
|
||||
"knowledge": "知识",
|
||||
"skill": "技能",
|
||||
"goal": "目标",
|
||||
"experience": "经历",
|
||||
"contextual": "情境",
|
||||
# 中文(保持不变)
|
||||
"事件": "事件",
|
||||
"事实": "事实",
|
||||
"关系": "关系",
|
||||
"观点": "观点",
|
||||
"偏好": "偏好",
|
||||
"情绪": "情绪",
|
||||
"知识": "知识",
|
||||
"技能": "技能",
|
||||
"目标": "目标",
|
||||
"经历": "经历",
|
||||
"情境": "情境",
|
||||
}
|
||||
|
||||
# 转换为小写进行匹配
|
||||
memory_type_lower = memory_type.lower() if memory_type else ""
|
||||
|
||||
return type_mapping.get(memory_type_lower, "未知")
|
||||
|
||||
|
||||
def _format_relative_time(timestamp: datetime) -> str | None:
|
||||
"""
|
||||
格式化相对时间(如"2天前"、"刚才")
|
||||
|
||||
Args:
|
||||
timestamp: 时间戳
|
||||
|
||||
Returns:
|
||||
相对时间描述,如果太久远则返回None
|
||||
"""
|
||||
try:
|
||||
now = datetime.now()
|
||||
delta = now - timestamp
|
||||
|
||||
if delta.total_seconds() < 60:
|
||||
return "刚才"
|
||||
elif delta.total_seconds() < 3600:
|
||||
minutes = int(delta.total_seconds() / 60)
|
||||
return f"{minutes}分钟前"
|
||||
elif delta.total_seconds() < 86400:
|
||||
hours = int(delta.total_seconds() / 3600)
|
||||
return f"{hours}小时前"
|
||||
elif delta.days < 7:
|
||||
return f"{delta.days}天前"
|
||||
elif delta.days < 30:
|
||||
weeks = delta.days // 7
|
||||
return f"{weeks}周前"
|
||||
elif delta.days < 365:
|
||||
months = delta.days // 30
|
||||
return f"{months}个月前"
|
||||
else:
|
||||
# 超过一年不显示相对时间
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def format_memory_summary(memory: Memory) -> str:
|
||||
"""
|
||||
生成记忆的简短摘要(用于日志和调试)
|
||||
|
||||
Args:
|
||||
memory: 记忆对象
|
||||
|
||||
Returns:
|
||||
简短摘要
|
||||
"""
|
||||
try:
|
||||
subject_node = memory.get_subject_node()
|
||||
subject_text = subject_node.content if subject_node else "?"
|
||||
|
||||
topic_text = "?"
|
||||
for edge in memory.edges:
|
||||
if edge.edge_type == EdgeType.MEMORY_TYPE and edge.source_id == memory.subject_id:
|
||||
topic_node = memory.get_node_by_id(edge.target_id)
|
||||
if topic_node:
|
||||
topic_text = topic_node.content
|
||||
break
|
||||
|
||||
return f"{subject_text} - {memory.memory_type.value if memory.memory_type else '?'}: {topic_text}"
|
||||
except Exception:
|
||||
return f"记忆 {memory.id[:8]}"
|
||||
|
||||
|
||||
# 导出主要函数
|
||||
__all__ = [
|
||||
"format_memories_for_prompt",
|
||||
"format_memory_for_prompt",
|
||||
"format_memory_summary",
|
||||
"get_memory_type_label",
|
||||
]
|
||||
Reference in New Issue
Block a user