feat(perceptual_manager): 添加向量化相似度计算,优化记忆块召回逻辑(改了这个计算方法,不知道具体有没有提高运行速度)

This commit is contained in:
LuiKlee
2025-12-13 14:20:07 +08:00
parent cf227d2fb0
commit 1b0acc3188

View File

@@ -21,7 +21,6 @@ import numpy as np
from src.common.logger import get_logger from src.common.logger import get_logger
from src.memory_graph.models import MemoryBlock, PerceptualMemory from src.memory_graph.models import MemoryBlock, PerceptualMemory
from src.memory_graph.utils.embeddings import EmbeddingGenerator from src.memory_graph.utils.embeddings import EmbeddingGenerator
from src.memory_graph.utils.similarity import batch_cosine_similarity_async
logger = get_logger(__name__) logger = get_logger(__name__)
@@ -208,6 +207,7 @@ class PerceptualMemoryManager:
# 生成向量 # 生成向量
embedding = await self._generate_embedding(combined_text) embedding = await self._generate_embedding(combined_text)
embedding_norm = float(np.linalg.norm(embedding)) if embedding is not None else 0.0
# 创建记忆块 # 创建记忆块
block = MemoryBlock( block = MemoryBlock(
@@ -215,7 +215,10 @@ class PerceptualMemoryManager:
messages=messages, messages=messages,
combined_text=combined_text, combined_text=combined_text,
embedding=embedding, embedding=embedding,
metadata={"stream_id": stream_id} # 添加 stream_id 元数据 metadata={
"stream_id": stream_id,
"embedding_norm": embedding_norm,
}, # stream_id 便于调试embedding_norm 用于快速相似度
) )
# 添加到记忆堆顶部 # 添加到记忆堆顶部
@@ -395,6 +398,55 @@ class PerceptualMemoryManager:
logger.error(f"批量生成向量失败: {e}") logger.error(f"批量生成向量失败: {e}")
return [None] * len(texts) return [None] * len(texts)
async def _compute_similarities(
self,
query_embedding: np.ndarray,
block_embeddings: list[np.ndarray],
block_norms: list[float] | None = None,
) -> np.ndarray:
"""在后台线程中向量化计算相似度,避免阻塞事件循环。"""
return await asyncio.to_thread(
self._compute_similarities_sync, query_embedding, block_embeddings, block_norms
)
@staticmethod
def _compute_similarities_sync(
query_embedding: np.ndarray,
block_embeddings: list[np.ndarray],
block_norms: list[float] | None = None,
) -> np.ndarray:
import numpy as np
if not block_embeddings:
return np.zeros(0, dtype=np.float32)
query = np.asarray(query_embedding, dtype=np.float32)
blocks = np.asarray(block_embeddings, dtype=np.float32)
if blocks.ndim == 1:
blocks = blocks.reshape(1, -1)
query_norm = np.linalg.norm(query)
if query_norm == 0.0:
return np.zeros(blocks.shape[0], dtype=np.float32)
if block_norms is None:
block_norms_array = np.linalg.norm(blocks, axis=1)
else:
block_norms_array = np.asarray(block_norms, dtype=np.float32)
if block_norms_array.shape[0] != blocks.shape[0]:
block_norms_array = np.linalg.norm(blocks, axis=1)
valid_mask = block_norms_array > 0
similarities = np.zeros(blocks.shape[0], dtype=np.float32)
if valid_mask.any():
normalized_blocks = blocks[valid_mask] / block_norms_array[valid_mask][:, None]
normalized_query = query / query_norm
similarities[valid_mask] = normalized_blocks @ normalized_query
return np.clip(similarities, 0.0, 1.0)
async def recall_blocks( async def recall_blocks(
self, self,
query_text: str, query_text: str,
@@ -425,7 +477,7 @@ class PerceptualMemoryManager:
logger.warning("查询向量生成失败,返回空列表") logger.warning("查询向量生成失败,返回空列表")
return [] return []
# 批量计算所有块的相似度(使用异步版本 # 批量计算所有块的相似度(使用向量化计算 + 后台线程
blocks_with_embeddings = [ blocks_with_embeddings = [
block for block in self.perceptual_memory.blocks block for block in self.perceptual_memory.blocks
if block.embedding is not None if block.embedding is not None
@@ -434,26 +486,39 @@ class PerceptualMemoryManager:
if not blocks_with_embeddings: if not blocks_with_embeddings:
return [] return []
# 批量计算相似度 block_embeddings: list[np.ndarray] = []
block_embeddings = [block.embedding for block in blocks_with_embeddings] block_norms: list[float] = []
similarities = await batch_cosine_similarity_async(query_embedding, block_embeddings)
# 过滤和排序 for block in blocks_with_embeddings:
scored_blocks = [] block_embeddings.append(block.embedding)
for block, similarity in zip(blocks_with_embeddings, similarities): norm = block.metadata.get("embedding_norm") if block.metadata else None
# 过滤低于阈值的块 if norm is None and block.embedding is not None:
if similarity >= similarity_threshold: norm = float(np.linalg.norm(block.embedding))
scored_blocks.append((block, similarity)) block.metadata["embedding_norm"] = norm
block_norms.append(norm if norm is not None else 0.0)
# 按相似度降序排序 similarities = await self._compute_similarities(query_embedding, block_embeddings, block_norms)
scored_blocks.sort(key=lambda x: x[1], reverse=True) similarities = np.asarray(similarities, dtype=np.float32)
# 取 TopK candidate_indices = np.nonzero(similarities >= similarity_threshold)[0]
top_blocks = scored_blocks[:top_k] if candidate_indices.size == 0:
return []
if candidate_indices.size > top_k:
# argpartition 将复杂度降为 O(n)
top_indices = candidate_indices[
np.argpartition(similarities[candidate_indices], -top_k)[-top_k:]
]
else:
top_indices = candidate_indices
# 保持按相似度降序
top_indices = top_indices[np.argsort(similarities[top_indices])[::-1]]
# 更新召回计数和位置 # 更新召回计数和位置
recalled_blocks = [] recalled_blocks = []
for block, similarity in top_blocks: for idx in top_indices[:top_k]:
block = blocks_with_embeddings[int(idx)]
block.increment_recall() block.increment_recall()
recalled_blocks.append(block) recalled_blocks.append(block)
@@ -663,6 +728,7 @@ class PerceptualMemoryManager:
for block, embedding in zip(blocks_to_process, embeddings): for block, embedding in zip(blocks_to_process, embeddings):
if embedding is not None: if embedding is not None:
block.embedding = embedding block.embedding = embedding
block.metadata["embedding_norm"] = float(np.linalg.norm(embedding))
success_count += 1 success_count += 1
logger.debug(f"向量重新生成完成(成功: {success_count}/{len(blocks_to_process)}") logger.debug(f"向量重新生成完成(成功: {success_count}/{len(blocks_to_process)}")