From 1b0acc31880c5f4adc738bd2fa7fef1381c40c2b Mon Sep 17 00:00:00 2001 From: LuiKlee Date: Sat, 13 Dec 2025 14:20:07 +0800 Subject: [PATCH] =?UTF-8?q?feat(perceptual=5Fmanager):=20=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E5=90=91=E9=87=8F=E5=8C=96=E7=9B=B8=E4=BC=BC=E5=BA=A6?= =?UTF-8?q?=E8=AE=A1=E7=AE=97=EF=BC=8C=E4=BC=98=E5=8C=96=E8=AE=B0=E5=BF=86?= =?UTF-8?q?=E5=9D=97=E5=8F=AC=E5=9B=9E=E9=80=BB=E8=BE=91=EF=BC=88=E6=94=B9?= =?UTF-8?q?=E4=BA=86=E8=BF=99=E4=B8=AA=E8=AE=A1=E7=AE=97=E6=96=B9=E6=B3=95?= =?UTF-8?q?=EF=BC=8C=E4=B8=8D=E7=9F=A5=E9=81=93=E5=85=B7=E4=BD=93=E6=9C=89?= =?UTF-8?q?=E6=B2=A1=E6=9C=89=E6=8F=90=E9=AB=98=E8=BF=90=E8=A1=8C=E9=80=9F?= =?UTF-8?q?=E5=BA=A6=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/memory_graph/perceptual_manager.py | 100 ++++++++++++++++++++----- 1 file changed, 83 insertions(+), 17 deletions(-) diff --git a/src/memory_graph/perceptual_manager.py b/src/memory_graph/perceptual_manager.py index 76085d193..6d58f8c26 100644 --- a/src/memory_graph/perceptual_manager.py +++ b/src/memory_graph/perceptual_manager.py @@ -21,7 +21,6 @@ import numpy as np from src.common.logger import get_logger from src.memory_graph.models import MemoryBlock, PerceptualMemory from src.memory_graph.utils.embeddings import EmbeddingGenerator -from src.memory_graph.utils.similarity import batch_cosine_similarity_async logger = get_logger(__name__) @@ -208,6 +207,7 @@ class PerceptualMemoryManager: # 生成向量 embedding = await self._generate_embedding(combined_text) + embedding_norm = float(np.linalg.norm(embedding)) if embedding is not None else 0.0 # 创建记忆块 block = MemoryBlock( @@ -215,7 +215,10 @@ class PerceptualMemoryManager: messages=messages, combined_text=combined_text, embedding=embedding, - metadata={"stream_id": stream_id} # 添加 stream_id 元数据 + metadata={ + "stream_id": stream_id, + "embedding_norm": embedding_norm, + }, # stream_id 便于调试,embedding_norm 用于快速相似度 ) # 添加到记忆堆顶部 @@ -395,6 +398,55 @@ class PerceptualMemoryManager: logger.error(f"批量生成向量失败: {e}") return [None] * len(texts) + async def _compute_similarities( + self, + query_embedding: np.ndarray, + block_embeddings: list[np.ndarray], + block_norms: list[float] | None = None, + ) -> np.ndarray: + """在后台线程中向量化计算相似度,避免阻塞事件循环。""" + return await asyncio.to_thread( + self._compute_similarities_sync, query_embedding, block_embeddings, block_norms + ) + + @staticmethod + def _compute_similarities_sync( + query_embedding: np.ndarray, + block_embeddings: list[np.ndarray], + block_norms: list[float] | None = None, + ) -> np.ndarray: + import numpy as np + + if not block_embeddings: + return np.zeros(0, dtype=np.float32) + + query = np.asarray(query_embedding, dtype=np.float32) + blocks = np.asarray(block_embeddings, dtype=np.float32) + + if blocks.ndim == 1: + blocks = blocks.reshape(1, -1) + + query_norm = np.linalg.norm(query) + if query_norm == 0.0: + return np.zeros(blocks.shape[0], dtype=np.float32) + + if block_norms is None: + block_norms_array = np.linalg.norm(blocks, axis=1) + else: + block_norms_array = np.asarray(block_norms, dtype=np.float32) + if block_norms_array.shape[0] != blocks.shape[0]: + block_norms_array = np.linalg.norm(blocks, axis=1) + + valid_mask = block_norms_array > 0 + similarities = np.zeros(blocks.shape[0], dtype=np.float32) + + if valid_mask.any(): + normalized_blocks = blocks[valid_mask] / block_norms_array[valid_mask][:, None] + normalized_query = query / query_norm + similarities[valid_mask] = normalized_blocks @ normalized_query + + return np.clip(similarities, 0.0, 1.0) + async def recall_blocks( self, query_text: str, @@ -425,7 +477,7 @@ class PerceptualMemoryManager: logger.warning("查询向量生成失败,返回空列表") return [] - # 批量计算所有块的相似度(使用异步版本) + # 批量计算所有块的相似度(使用向量化计算 + 后台线程) blocks_with_embeddings = [ block for block in self.perceptual_memory.blocks if block.embedding is not None @@ -434,26 +486,39 @@ class PerceptualMemoryManager: if not blocks_with_embeddings: return [] - # 批量计算相似度 - block_embeddings = [block.embedding for block in blocks_with_embeddings] - similarities = await batch_cosine_similarity_async(query_embedding, block_embeddings) + block_embeddings: list[np.ndarray] = [] + block_norms: list[float] = [] - # 过滤和排序 - scored_blocks = [] - for block, similarity in zip(blocks_with_embeddings, similarities): - # 过滤低于阈值的块 - if similarity >= similarity_threshold: - scored_blocks.append((block, similarity)) + for block in blocks_with_embeddings: + block_embeddings.append(block.embedding) + norm = block.metadata.get("embedding_norm") if block.metadata else None + if norm is None and block.embedding is not None: + norm = float(np.linalg.norm(block.embedding)) + block.metadata["embedding_norm"] = norm + block_norms.append(norm if norm is not None else 0.0) - # 按相似度降序排序 - scored_blocks.sort(key=lambda x: x[1], reverse=True) + similarities = await self._compute_similarities(query_embedding, block_embeddings, block_norms) + similarities = np.asarray(similarities, dtype=np.float32) - # 取 TopK - top_blocks = scored_blocks[:top_k] + candidate_indices = np.nonzero(similarities >= similarity_threshold)[0] + if candidate_indices.size == 0: + return [] + + if candidate_indices.size > top_k: + # argpartition 将复杂度降为 O(n) + top_indices = candidate_indices[ + np.argpartition(similarities[candidate_indices], -top_k)[-top_k:] + ] + else: + top_indices = candidate_indices + + # 保持按相似度降序 + top_indices = top_indices[np.argsort(similarities[top_indices])[::-1]] # 更新召回计数和位置 recalled_blocks = [] - for block, similarity in top_blocks: + for idx in top_indices[:top_k]: + block = blocks_with_embeddings[int(idx)] block.increment_recall() recalled_blocks.append(block) @@ -663,6 +728,7 @@ class PerceptualMemoryManager: for block, embedding in zip(blocks_to_process, embeddings): if embedding is not None: block.embedding = embedding + block.metadata["embedding_norm"] = float(np.linalg.norm(embedding)) success_count += 1 logger.debug(f"向量重新生成完成(成功: {success_count}/{len(blocks_to_process)})")