feat(cache): 增强嵌入向量处理的健壮性和验证

- 新增 `_validate_embedding` 方法,用于在存入缓存前对嵌入向量进行严格的格式检查、维度验证和数值有效性校验。
- 在缓存查询 (`get`) 和写入 (`set`) 流程中,集成此验证逻辑,确保只有合规的向量才能被处理和存储。
- 增加了在L1和L2向量索引操作中的异常捕获,防止因向量处理失败导致缓存功能中断,提升了系统的整体稳定性。
This commit is contained in:
minecraft1024a
2025-08-18 18:08:14 +08:00
parent ceb8150914
commit 584b552415

View File

@@ -65,6 +65,43 @@ class CacheManager:
""")
conn.commit()
def _validate_embedding(self, embedding_result: Any) -> Optional[np.ndarray]:
"""
验证和标准化嵌入向量格式
"""
try:
if embedding_result is None:
return None
# 确保embedding_result是一维数组或列表
if isinstance(embedding_result, (list, tuple, np.ndarray)):
# 转换为numpy数组进行处理
embedding_array = np.array(embedding_result)
# 如果是多维数组,展平它
if embedding_array.ndim > 1:
embedding_array = embedding_array.flatten()
# 检查维度是否符合预期
expected_dim = global_config.lpmm_knowledge.embedding_dimension
if embedding_array.shape[0] != expected_dim:
logger.warning(f"嵌入向量维度不匹配: 期望 {expected_dim}, 实际 {embedding_array.shape[0]}")
return None
# 检查是否包含有效的数值
if np.isnan(embedding_array).any() or np.isinf(embedding_array).any():
logger.warning("嵌入向量包含无效的数值 (NaN 或 Inf)")
return None
return embedding_array.astype('float32')
else:
logger.warning(f"嵌入结果格式不支持: {type(embedding_result)}")
return None
except Exception as e:
logger.error(f"验证嵌入向量时发生错误: {e}")
return None
def _generate_key(self, tool_name: str, function_args: Dict[str, Any], tool_class: Any) -> str:
"""生成确定性的缓存键,包含代码哈希以实现自动失效。"""
try:
@@ -102,7 +139,9 @@ class CacheManager:
if semantic_query and self.embedding_model:
embedding_result = await self.embedding_model.get_embedding(semantic_query)
if embedding_result:
query_embedding = np.array([embedding_result], dtype='float32')
validated_embedding = self._validate_embedding(embedding_result)
if validated_embedding is not None:
query_embedding = np.array([validated_embedding], dtype='float32')
# 步骤 2a: L1 语义缓存 (FAISS)
if query_embedding is not None and self.l1_vector_index.ntotal > 0:
@@ -153,10 +192,13 @@ class CacheManager:
# 回填 L1
self.l1_kv_cache[key] = {"data": data, "expires_at": expires_at}
if query_embedding is not None:
new_id = self.l1_vector_index.ntotal
faiss.normalize_L2(query_embedding)
self.l1_vector_index.add(x=query_embedding)
self.l1_vector_id_to_key[new_id] = key
try:
new_id = self.l1_vector_index.ntotal
faiss.normalize_L2(query_embedding)
self.l1_vector_index.add(x=query_embedding)
self.l1_vector_id_to_key[new_id] = key
except Exception as e:
logger.error(f"回填L1向量索引时发生错误: {e}")
return data
logger.debug(f"缓存未命中: {key}")
@@ -186,14 +228,20 @@ class CacheManager:
if semantic_query and self.embedding_model:
embedding_result = await self.embedding_model.get_embedding(semantic_query)
if embedding_result:
embedding = np.array([embedding_result], dtype='float32')
# 写入 L1 Vector
new_id = self.l1_vector_index.ntotal
faiss.normalize_L2(embedding)
self.l1_vector_index.add(x=embedding)
self.l1_vector_id_to_key[new_id] = key
# 写入 L2 Vector
self.chroma_collection.add(embeddings=embedding.tolist(), ids=[key])
validated_embedding = self._validate_embedding(embedding_result)
if validated_embedding is not None:
try:
embedding = np.array([validated_embedding], dtype='float32')
# 写入 L1 Vector
new_id = self.l1_vector_index.ntotal
faiss.normalize_L2(embedding)
self.l1_vector_index.add(x=embedding)
self.l1_vector_id_to_key[new_id] = key
# 写入 L2 Vector
self.chroma_collection.add(embeddings=embedding.tolist(), ids=[key])
except Exception as e:
logger.error(f"写入语义缓存时发生错误: {e}")
# 继续执行,不影响主要缓存功能
logger.info(f"已缓存条目: {key}, TTL: {ttl}s")