diff --git a/src/memory_graph/perceptual_manager.py b/src/memory_graph/perceptual_manager.py index 76085d193..6d58f8c26 100644 --- a/src/memory_graph/perceptual_manager.py +++ b/src/memory_graph/perceptual_manager.py @@ -21,7 +21,6 @@ import numpy as np from src.common.logger import get_logger from src.memory_graph.models import MemoryBlock, PerceptualMemory from src.memory_graph.utils.embeddings import EmbeddingGenerator -from src.memory_graph.utils.similarity import batch_cosine_similarity_async logger = get_logger(__name__) @@ -208,6 +207,7 @@ class PerceptualMemoryManager: # 生成向量 embedding = await self._generate_embedding(combined_text) + embedding_norm = float(np.linalg.norm(embedding)) if embedding is not None else 0.0 # 创建记忆块 block = MemoryBlock( @@ -215,7 +215,10 @@ class PerceptualMemoryManager: messages=messages, combined_text=combined_text, embedding=embedding, - metadata={"stream_id": stream_id} # 添加 stream_id 元数据 + metadata={ + "stream_id": stream_id, + "embedding_norm": embedding_norm, + }, # stream_id 便于调试,embedding_norm 用于快速相似度 ) # 添加到记忆堆顶部 @@ -395,6 +398,55 @@ class PerceptualMemoryManager: logger.error(f"批量生成向量失败: {e}") return [None] * len(texts) + async def _compute_similarities( + self, + query_embedding: np.ndarray, + block_embeddings: list[np.ndarray], + block_norms: list[float] | None = None, + ) -> np.ndarray: + """在后台线程中向量化计算相似度,避免阻塞事件循环。""" + return await asyncio.to_thread( + self._compute_similarities_sync, query_embedding, block_embeddings, block_norms + ) + + @staticmethod + def _compute_similarities_sync( + query_embedding: np.ndarray, + block_embeddings: list[np.ndarray], + block_norms: list[float] | None = None, + ) -> np.ndarray: + import numpy as np + + if not block_embeddings: + return np.zeros(0, dtype=np.float32) + + query = np.asarray(query_embedding, dtype=np.float32) + blocks = np.asarray(block_embeddings, dtype=np.float32) + + if blocks.ndim == 1: + blocks = blocks.reshape(1, -1) + + query_norm = np.linalg.norm(query) + if query_norm == 0.0: + return np.zeros(blocks.shape[0], dtype=np.float32) + + if block_norms is None: + block_norms_array = np.linalg.norm(blocks, axis=1) + else: + block_norms_array = np.asarray(block_norms, dtype=np.float32) + if block_norms_array.shape[0] != blocks.shape[0]: + block_norms_array = np.linalg.norm(blocks, axis=1) + + valid_mask = block_norms_array > 0 + similarities = np.zeros(blocks.shape[0], dtype=np.float32) + + if valid_mask.any(): + normalized_blocks = blocks[valid_mask] / block_norms_array[valid_mask][:, None] + normalized_query = query / query_norm + similarities[valid_mask] = normalized_blocks @ normalized_query + + return np.clip(similarities, 0.0, 1.0) + async def recall_blocks( self, query_text: str, @@ -425,7 +477,7 @@ class PerceptualMemoryManager: logger.warning("查询向量生成失败,返回空列表") return [] - # 批量计算所有块的相似度(使用异步版本) + # 批量计算所有块的相似度(使用向量化计算 + 后台线程) blocks_with_embeddings = [ block for block in self.perceptual_memory.blocks if block.embedding is not None @@ -434,26 +486,39 @@ class PerceptualMemoryManager: if not blocks_with_embeddings: return [] - # 批量计算相似度 - block_embeddings = [block.embedding for block in blocks_with_embeddings] - similarities = await batch_cosine_similarity_async(query_embedding, block_embeddings) + block_embeddings: list[np.ndarray] = [] + block_norms: list[float] = [] - # 过滤和排序 - scored_blocks = [] - for block, similarity in zip(blocks_with_embeddings, similarities): - # 过滤低于阈值的块 - if similarity >= similarity_threshold: - scored_blocks.append((block, similarity)) + for block in blocks_with_embeddings: + block_embeddings.append(block.embedding) + norm = block.metadata.get("embedding_norm") if block.metadata else None + if norm is None and block.embedding is not None: + norm = float(np.linalg.norm(block.embedding)) + block.metadata["embedding_norm"] = norm + block_norms.append(norm if norm is not None else 0.0) - # 按相似度降序排序 - scored_blocks.sort(key=lambda x: x[1], reverse=True) + similarities = await self._compute_similarities(query_embedding, block_embeddings, block_norms) + similarities = np.asarray(similarities, dtype=np.float32) - # 取 TopK - top_blocks = scored_blocks[:top_k] + candidate_indices = np.nonzero(similarities >= similarity_threshold)[0] + if candidate_indices.size == 0: + return [] + + if candidate_indices.size > top_k: + # argpartition 将复杂度降为 O(n) + top_indices = candidate_indices[ + np.argpartition(similarities[candidate_indices], -top_k)[-top_k:] + ] + else: + top_indices = candidate_indices + + # 保持按相似度降序 + top_indices = top_indices[np.argsort(similarities[top_indices])[::-1]] # 更新召回计数和位置 recalled_blocks = [] - for block, similarity in top_blocks: + for idx in top_indices[:top_k]: + block = blocks_with_embeddings[int(idx)] block.increment_recall() recalled_blocks.append(block) @@ -663,6 +728,7 @@ class PerceptualMemoryManager: for block, embedding in zip(blocks_to_process, embeddings): if embedding is not None: block.embedding = embedding + block.metadata["embedding_norm"] = float(np.linalg.norm(embedding)) success_count += 1 logger.debug(f"向量重新生成完成(成功: {success_count}/{len(blocks_to_process)})")