diff --git a/src/chat/energy_system/energy_manager.py b/src/chat/energy_system/energy_manager.py index 6dc5ad8e4..b07222b4b 100644 --- a/src/chat/energy_system/energy_manager.py +++ b/src/chat/energy_system/energy_manager.py @@ -351,7 +351,7 @@ class EnergyManager: / total_calculations ) - logger.info(f"聊天流 {stream_id} 最终能量: {final_energy:.3f} (原始: {total_energy:.3f}, 耗时: {calculation_time:.3f}s)") + logger.debug(f"聊天流 {stream_id} 最终能量: {final_energy:.3f} (原始: {total_energy:.3f}, 耗时: {calculation_time:.3f}s)") return final_energy def _apply_threshold_adjustment(self, energy: float) -> float: diff --git a/src/chat/memory_system/enhanced_memory_activator.py b/src/chat/memory_system/enhanced_memory_activator.py index 64bbdd64e..26e5de6c4 100644 --- a/src/chat/memory_system/enhanced_memory_activator.py +++ b/src/chat/memory_system/enhanced_memory_activator.py @@ -104,7 +104,6 @@ class EnhancedMemoryActivator: response, (reasoning_content, model_name, _) = await self.key_words_model.generate_response_async( prompt, temperature=0.5 ) - keywords = list(get_keywords_from_json(response)) # 更新关键词缓存 diff --git a/src/chat/memory_system/enhanced_memory_adapter.py b/src/chat/memory_system/enhanced_memory_adapter.py index b48a723fb..7d34df8d9 100644 --- a/src/chat/memory_system/enhanced_memory_adapter.py +++ b/src/chat/memory_system/enhanced_memory_adapter.py @@ -12,6 +12,7 @@ from dataclasses import dataclass from src.common.logger import get_logger from src.chat.memory_system.integration_layer import MemoryIntegrationLayer, IntegrationConfig, IntegrationMode from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType +from src.chat.memory_system.memory_formatter import MemoryFormatter, FormatterConfig, format_memories_for_llm from src.llm_models.utils_model import LLMRequest logger = get_logger(__name__) @@ -189,15 +190,21 @@ class EnhancedMemoryAdapter: if not memories: return "" - # 格式化记忆为提示词友好的Markdown结构 - lines: List[str] = ["### 🧠 相关记忆 (Relevant Memories)", ""] - - for memory in memories: - type_label = MEMORY_TYPE_LABELS.get(memory.memory_type, memory.memory_type.value) - display_text = memory.display or memory.text_content - lines.append(f"- **[{type_label}]** {display_text}") - - return "\n".join(lines) + # 使用新的记忆格式化器 + formatter_config = FormatterConfig( + include_timestamps=True, + include_memory_types=True, + include_confidence=False, + use_emoji_icons=True, + group_by_type=False, + max_display_length=150 + ) + + return format_memories_for_llm( + memories=memories, + query_context=query, + config=formatter_config + ) async def get_enhanced_memory_summary(self, user_id: str) -> Dict[str, Any]: """获取增强记忆系统摘要""" diff --git a/src/chat/memory_system/enhanced_memory_core.py b/src/chat/memory_system/enhanced_memory_core.py index 08bebe2f0..06749b961 100644 --- a/src/chat/memory_system/enhanced_memory_core.py +++ b/src/chat/memory_system/enhanced_memory_core.py @@ -22,7 +22,7 @@ from src.chat.memory_system.memory_chunk import MemoryChunk from src.chat.memory_system.memory_builder import MemoryBuilder, MemoryExtractionError from src.chat.memory_system.memory_fusion import MemoryFusionEngine from src.chat.memory_system.vector_storage import VectorStorageManager, VectorStorageConfig -from src.chat.memory_system.metadata_index import MetadataIndexManager +from src.chat.memory_system.metadata_index import MetadataIndexManager, IndexType from src.chat.memory_system.multi_stage_retrieval import MultiStageRetrieval, RetrievalConfig from src.chat.memory_system.memory_query_planner import MemoryQueryPlanner @@ -199,6 +199,22 @@ class EnhancedMemorySystem: similarity_threshold=self.config.similarity_threshold ) self.vector_storage = VectorStorageManager(vector_config) + + # 尝试加载现有的向量数据 + try: + await self.vector_storage.load_storage() + loaded_count = self.vector_storage.storage_stats.get("total_vectors", 0) + logger.info(f"✅ 向量存储数据加载完成,向量数量: {loaded_count}") + + # 如果没有加载到向量,尝试重建索引 + if loaded_count == 0: + logger.info("向量存储为空,尝试从缓存重建...") + await self._rebuild_vector_storage_if_needed() + + except Exception as e: + logger.warning(f"向量存储数据加载失败: {e},将使用空索引") + await self._rebuild_vector_storage_if_needed() + self.metadata_index = MetadataIndexManager() # 创建检索配置 retrieval_config = RetrievalConfig( @@ -354,8 +370,12 @@ class EnhancedMemorySystem: self.status = original_status return [] - # 3. 记忆融合与去重 - fused_chunks = await self.fusion_engine.fuse_memories(memory_chunks) + # 3. 记忆融合与去重(包含与历史记忆的融合) + existing_candidates = await self._collect_fusion_candidates(memory_chunks) + fused_chunks = await self.fusion_engine.fuse_memories( + memory_chunks, + existing_candidates + ) # 4. 存储记忆 stored_count = await self._store_memories(fused_chunks) @@ -375,6 +395,11 @@ class EnhancedMemorySystem: len(fused_chunks), stored_count, build_time, + extra={ + "generated_count": len(fused_chunks), + "stored_count": stored_count, + "build_duration_seconds": round(build_time, 4), + }, ) self.status = original_status @@ -415,6 +440,101 @@ class EnhancedMemorySystem: f"置信度={memory.metadata.confidence.name} | 内容={text}" ) + async def _collect_fusion_candidates(self, new_memories: List[MemoryChunk]) -> List[MemoryChunk]: + """收集与新记忆相似的现有记忆,便于融合去重""" + if not new_memories: + return [] + + candidate_ids: Set[str] = set() + new_memory_ids = { + memory.memory_id + for memory in new_memories + if memory and getattr(memory, "memory_id", None) + } + + # 基于指纹的直接匹配 + for memory in new_memories: + try: + fingerprint = self._build_memory_fingerprint(memory) + fingerprint_key = self._fingerprint_key(memory.user_id, fingerprint) + existing_id = self._memory_fingerprints.get(fingerprint_key) + if existing_id and existing_id not in new_memory_ids: + candidate_ids.add(existing_id) + except Exception as exc: + logger.debug("构建记忆指纹失败,跳过候选收集: %s", exc) + + # 基于主体索引的候选 + subject_index = None + if self.metadata_index and hasattr(self.metadata_index, "indices"): + subject_index = self.metadata_index.indices.get(IndexType.SUBJECT) + + if subject_index: + for memory in new_memories: + for subject in memory.subjects: + normalized = subject.strip().lower() if isinstance(subject, str) else "" + if not normalized: + continue + subject_candidates = subject_index.get(normalized) + if subject_candidates: + candidate_ids.update(subject_candidates) + + # 基于向量搜索的候选 + total_vectors = 0 + if self.vector_storage and hasattr(self.vector_storage, "storage_stats"): + total_vectors = self.vector_storage.storage_stats.get("total_vectors", 0) or 0 + + if self.vector_storage and total_vectors > 0: + search_tasks = [] + for memory in new_memories: + display_text = (memory.display or "").strip() + if not display_text: + continue + search_tasks.append( + self.vector_storage.search_similar_memories( + query_text=display_text, + limit=8, + scope_id=GLOBAL_MEMORY_SCOPE + ) + ) + + if search_tasks: + search_results = await asyncio.gather(*search_tasks, return_exceptions=True) + similarity_threshold = getattr( + self.fusion_engine, + "similarity_threshold", + self.config.similarity_threshold, + ) + min_threshold = max(0.0, min(1.0, similarity_threshold * 0.8)) + + for result in search_results: + if isinstance(result, Exception): + logger.warning("融合候选向量搜索失败: %s", result) + continue + for memory_id, similarity in result: + if memory_id in new_memory_ids: + continue + if similarity is None or similarity < min_threshold: + continue + candidate_ids.add(memory_id) + + existing_candidates: List[MemoryChunk] = [] + cache = self.vector_storage.memory_cache if self.vector_storage else {} + for candidate_id in candidate_ids: + if candidate_id in new_memory_ids: + continue + candidate_memory = cache.get(candidate_id) + if candidate_memory: + existing_candidates.append(candidate_memory) + + if existing_candidates: + logger.debug( + "融合候选收集完成,新记忆=%d,候选=%d", + len(new_memories), + len(existing_candidates), + ) + + return existing_candidates + async def process_conversation_memory( self, context: Dict[str, Any] @@ -527,6 +647,29 @@ class EnhancedMemorySystem: effective_limit = max(1, min(effective_limit, self.config.final_recall_limit)) normalized_context["resolved_query_text"] = resolved_query_text + query_debug_payload = { + "raw_query": raw_query, + "semantic_query": resolved_query_text, + "limit": effective_limit, + "planner_used": planner_ran, + "memory_types": [mt.value for mt in (query_plan.memory_types if query_plan else [])], + "subjects": getattr(query_plan, "subject_includes", []) if query_plan else [], + "objects": getattr(query_plan, "object_includes", []) if query_plan else [], + "recency": getattr(query_plan, "recency_preference", None) if query_plan else None, + "optional_keywords": getattr(query_plan, "optional_keywords", []) if query_plan else [], + } + + try: + logger.info( + f"🔍 记忆检索指令 | raw='{raw_query}' | semantic='{resolved_query_text}' | limit={effective_limit}", + extra={"memory_query": query_debug_payload}, + ) + except Exception: + logger.info( + "🔍 记忆检索指令: %s", + orjson.dumps(query_debug_payload, ensure_ascii=False).decode("utf-8"), + ) + if normalized_context.get("__memory_building__"): logger.debug("当前处于记忆构建流程,跳过查询规划并进行降级检索") self.status = MemorySystemStatus.BUILDING @@ -1108,6 +1251,57 @@ class EnhancedMemorySystem: except Exception as e: logger.error(f"❌ 记忆系统关闭失败: {e}", exc_info=True) + async def _rebuild_vector_storage_if_needed(self): + """重建向量存储(如果需要)""" + try: + # 检查是否有记忆缓存数据 + if not hasattr(self.vector_storage, 'memory_cache') or not self.vector_storage.memory_cache: + logger.info("无记忆缓存数据,跳过向量存储重建") + return + + logger.info(f"开始重建向量存储,记忆数量: {len(self.vector_storage.memory_cache)}") + + # 收集需要重建向量的记忆 + memories_to_rebuild = [] + for memory_id, memory in self.vector_storage.memory_cache.items(): + # 检查记忆是否有有效的 display 文本 + if memory.display and memory.display.strip(): + memories_to_rebuild.append(memory) + elif memory.text_content and memory.text_content.strip(): + memories_to_rebuild.append(memory) + + if not memories_to_rebuild: + logger.warning("没有找到可重建向量的记忆") + return + + logger.info(f"准备为 {len(memories_to_rebuild)} 条记忆重建向量") + + # 批量重建向量 + batch_size = 10 + rebuild_count = 0 + + for i in range(0, len(memories_to_rebuild), batch_size): + batch = memories_to_rebuild[i:i + batch_size] + try: + await self.vector_storage.store_memories(batch) + rebuild_count += len(batch) + + if rebuild_count % 50 == 0: + logger.info(f"已重建向量: {rebuild_count}/{len(memories_to_rebuild)}") + + except Exception as e: + logger.error(f"批量重建向量失败: {e}") + continue + + # 保存重建的向量存储 + await self.vector_storage.save_storage() + + final_count = self.vector_storage.storage_stats.get("total_vectors", 0) + logger.info(f"✅ 向量存储重建完成,最终向量数量: {final_count}") + + except Exception as e: + logger.error(f"❌ 向量存储重建失败: {e}", exc_info=True) + # 全局记忆系统实例 enhanced_memory_system: EnhancedMemorySystem = None diff --git a/src/chat/memory_system/enhanced_reranker.py b/src/chat/memory_system/enhanced_reranker.py new file mode 100644 index 000000000..a6dfafb01 --- /dev/null +++ b/src/chat/memory_system/enhanced_reranker.py @@ -0,0 +1,307 @@ +# -*- coding: utf-8 -*- +""" +增强重排序器 +实现文档设计的多维度评分模型 +""" + +import math +import time +from typing import Dict, List, Optional, Any, Tuple +from dataclasses import dataclass +from enum import Enum + +from src.common.logger import get_logger +from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType + +logger = get_logger(__name__) + + +class IntentType(Enum): + """对话意图类型""" + FACT_QUERY = "fact_query" # 事实查询 + EVENT_RECALL = "event_recall" # 事件回忆 + PREFERENCE_CHECK = "preference_check" # 偏好检查 + GENERAL_CHAT = "general_chat" # 一般对话 + UNKNOWN = "unknown" # 未知意图 + + +@dataclass +class ReRankingConfig: + """重排序配置""" + # 权重配置 (w1 + w2 + w3 + w4 = 1.0) + semantic_weight: float = 0.5 # 语义相似度权重 + recency_weight: float = 0.2 # 时效性权重 + usage_freq_weight: float = 0.2 # 使用频率权重 + type_match_weight: float = 0.1 # 类型匹配权重 + + # 时效性衰减参数 + recency_decay_rate: float = 0.1 # 时效性衰减率 (天) + + # 使用频率计算参数 + freq_log_base: float = 2.0 # 对数底数 + freq_max_score: float = 5.0 # 最大频率得分 + + # 类型匹配权重映射 + type_match_weights: Dict[str, Dict[str, float]] = None + + def __post_init__(self): + """初始化类型匹配权重""" + if self.type_match_weights is None: + self.type_match_weights = { + IntentType.FACT_QUERY.value: { + MemoryType.PERSONAL_FACT.value: 1.0, + MemoryType.KNOWLEDGE.value: 0.8, + MemoryType.PREFERENCE.value: 0.5, + MemoryType.EVENT.value: 0.3, + "default": 0.3 + }, + IntentType.EVENT_RECALL.value: { + MemoryType.EVENT.value: 1.0, + MemoryType.EXPERIENCE.value: 0.8, + MemoryType.EMOTION.value: 0.6, + MemoryType.PERSONAL_FACT.value: 0.5, + "default": 0.5 + }, + IntentType.PREFERENCE_CHECK.value: { + MemoryType.PREFERENCE.value: 1.0, + MemoryType.OPINION.value: 0.8, + MemoryType.GOAL.value: 0.6, + MemoryType.PERSONAL_FACT.value: 0.4, + "default": 0.4 + }, + IntentType.GENERAL_CHAT.value: { + "default": 0.8 + }, + IntentType.UNKNOWN.value: { + "default": 0.8 + } + } + + +class IntentClassifier: + """轻量级意图识别器""" + + def __init__(self): + # 关键词模式匹配规则 + self.patterns = { + IntentType.FACT_QUERY: [ + # 中文模式 + "我是", "我的", "我叫", "我在", "我住在", "我的职业", "我的工作", + "什么时候", "在哪里", "是什么", "多少", "几岁", "年龄", + # 英文模式 + "what is", "where is", "when is", "how old", "my name", "i am", "i live" + ], + IntentType.EVENT_RECALL: [ + # 中文模式 + "记得", "想起", "还记得", "那次", "上次", "之前", "以前", "曾经", + "发生过", "经历", "做过", "去过", "见过", + # 英文模式 + "remember", "recall", "last time", "before", "previously", "happened", "experience" + ], + IntentType.PREFERENCE_CHECK: [ + # 中文模式 + "喜欢", "不喜欢", "偏好", "爱好", "兴趣", "讨厌", "最爱", "最喜欢", + "习惯", "通常", "一般", "倾向于", "更喜欢", + # 英文模式 + "like", "love", "hate", "prefer", "favorite", "usually", "tend to", "interest" + ] + } + + def classify_intent(self, query: str, context: Dict[str, Any]) -> IntentType: + """识别对话意图""" + if not query: + return IntentType.UNKNOWN + + query_lower = query.lower() + + # 统计各意图的匹配分数 + intent_scores = {intent: 0 for intent in IntentType} + + for intent, patterns in self.patterns.items(): + for pattern in patterns: + if pattern in query_lower: + intent_scores[intent] += 1 + + # 返回得分最高的意图 + max_score = max(intent_scores.values()) + if max_score == 0: + return IntentType.GENERAL_CHAT + + for intent, score in intent_scores.items(): + if score == max_score: + return intent + + return IntentType.GENERAL_CHAT + + +class EnhancedReRanker: + """增强重排序器 - 实现文档设计的多维度评分模型""" + + def __init__(self, config: Optional[ReRankingConfig] = None): + self.config = config or ReRankingConfig() + self.intent_classifier = IntentClassifier() + + # 验证权重和为1.0 + total_weight = ( + self.config.semantic_weight + + self.config.recency_weight + + self.config.usage_freq_weight + + self.config.type_match_weight + ) + + if abs(total_weight - 1.0) > 0.01: + logger.warning(f"重排序权重和不为1.0: {total_weight}, 将进行归一化") + # 归一化权重 + self.config.semantic_weight /= total_weight + self.config.recency_weight /= total_weight + self.config.usage_freq_weight /= total_weight + self.config.type_match_weight /= total_weight + + def rerank_memories( + self, + query: str, + candidate_memories: List[Tuple[str, MemoryChunk, float]], # (memory_id, memory, vector_similarity) + context: Dict[str, Any], + limit: int = 10 + ) -> List[Tuple[str, MemoryChunk, float]]: + """ + 对候选记忆进行重排序 + + Args: + query: 查询文本 + candidate_memories: 候选记忆列表 [(memory_id, memory, vector_similarity)] + context: 上下文信息 + limit: 返回数量限制 + + Returns: + 重排序后的记忆列表 [(memory_id, memory, final_score)] + """ + if not candidate_memories: + return [] + + # 识别查询意图 + intent = self.intent_classifier.classify_intent(query, context) + logger.debug(f"识别到查询意图: {intent.value}") + + # 计算每个候选记忆的最终得分 + scored_memories = [] + current_time = time.time() + + for memory_id, memory, vector_sim in candidate_memories: + try: + # 1. 语义相似度得分 (已归一化到[0,1]) + semantic_score = self._normalize_similarity(vector_sim) + + # 2. 时效性得分 + recency_score = self._calculate_recency_score(memory, current_time) + + # 3. 使用频率得分 + usage_freq_score = self._calculate_usage_frequency_score(memory) + + # 4. 类型匹配得分 + type_match_score = self._calculate_type_match_score(memory, intent) + + # 计算最终得分 + final_score = ( + self.config.semantic_weight * semantic_score + + self.config.recency_weight * recency_score + + self.config.usage_freq_weight * usage_freq_score + + self.config.type_match_weight * type_match_score + ) + + scored_memories.append((memory_id, memory, final_score)) + + # 记录调试信息 + logger.debug( + f"记忆评分 {memory_id[:8]}: semantic={semantic_score:.3f}, " + f"recency={recency_score:.3f}, freq={usage_freq_score:.3f}, " + f"type={type_match_score:.3f}, final={final_score:.3f}" + ) + + except Exception as e: + logger.error(f"计算记忆 {memory_id} 得分时出错: {e}") + # 使用向量相似度作为后备得分 + scored_memories.append((memory_id, memory, vector_sim)) + + # 按最终得分降序排序 + scored_memories.sort(key=lambda x: x[2], reverse=True) + + # 返回前N个结果 + result = scored_memories[:limit] + + highest_score = result[0][2] if result else 0.0 + logger.info( + f"重排序完成: 候选={len(candidate_memories)}, 返回={len(result)}, " + f"意图={intent.value}, 最高分={highest_score:.3f}" + ) + + return result + + def _normalize_similarity(self, raw_similarity: float) -> float: + """归一化相似度到[0,1]区间""" + # 假设原始相似度已经在[-1,1]或[0,1]区间 + if raw_similarity < 0: + return (raw_similarity + 1) / 2 # 从[-1,1]映射到[0,1] + return min(1.0, max(0.0, raw_similarity)) # 确保在[0,1]区间 + + def _calculate_recency_score(self, memory: MemoryChunk, current_time: float) -> float: + """ + 计算时效性得分 + 公式: Recency = 1 / (1 + decay_rate * days_old) + """ + last_accessed = memory.metadata.last_accessed or memory.metadata.created_at + days_old = (current_time - last_accessed) / (24 * 3600) # 转换为天数 + + if days_old < 0: + days_old = 0 # 处理时间异常 + + score = 1 / (1 + self.config.recency_decay_rate * days_old) + return min(1.0, max(0.0, score)) + + def _calculate_usage_frequency_score(self, memory: MemoryChunk) -> float: + """ + 计算使用频率得分 + 公式: Usage_Freq = min(1.0, log2(access_count + 1) / max_score) + """ + access_count = memory.metadata.access_count + if access_count <= 0: + return 0.0 + + log_count = math.log2(access_count + 1) + score = log_count / self.config.freq_max_score + return min(1.0, max(0.0, score)) + + def _calculate_type_match_score(self, memory: MemoryChunk, intent: IntentType) -> float: + """计算类型匹配得分""" + memory_type = memory.memory_type.value + intent_value = intent.value + + # 获取对应意图的类型权重映射 + type_weights = self.config.type_match_weights.get(intent_value, {}) + + # 查找具体类型的权重,如果没有则使用默认权重 + score = type_weights.get(memory_type, type_weights.get("default", 0.8)) + + return min(1.0, max(0.0, score)) + + +# 创建默认的重排序器实例 +default_reranker = EnhancedReRanker() + + +def rerank_candidate_memories( + query: str, + candidate_memories: List[Tuple[str, MemoryChunk, float]], + context: Dict[str, Any], + limit: int = 10, + config: Optional[ReRankingConfig] = None +) -> List[Tuple[str, MemoryChunk, float]]: + """ + 便捷函数:对候选记忆进行重排序 + """ + if config: + reranker = EnhancedReRanker(config) + else: + reranker = default_reranker + + return reranker.rerank_memories(query, candidate_memories, context, limit) \ No newline at end of file diff --git a/src/chat/memory_system/memory_builder.py b/src/chat/memory_system/memory_builder.py index 592172e33..774c26f10 100644 --- a/src/chat/memory_system/memory_builder.py +++ b/src/chat/memory_system/memory_builder.py @@ -3,11 +3,11 @@ 记忆构建模块 从对话流中提取高质量、结构化记忆单元 输出格式要求: -{{ +{ "memories": [ - {{ + { "type": "记忆类型", - "display": "用于直接展示和检索的自然语言描述", + "display": "一句优雅自然的中文描述,用于直接展示及提示词拼接", "subject": ["主体1", "主体2"], "predicate": "谓语(动作/状态)", "object": "宾语(对象/属性或结构体)", @@ -15,16 +15,17 @@ "importance": "重要性等级(1-4)", "confidence": "置信度(1-4)", "reasoning": "提取理由" - }} + } ] -}} +} 注意: 1. `subject` 可包含多个主体,请用数组表示;若主体不明确,请根据上下文给出最合理的称呼 -2. `display` 必须是一句完整流畅的中文描述,可直接用于用户展示和向量搜索 -3. 只提取确实值得记忆的信息,不要提取琐碎内容 -4. 确保信息准确、具体、有价值 -5. 重要性: 1=低, 2=一般, 3=高, 4=关键;置信度: 1=低, 2=中等, 3=高, 4=已验证 +2. `display` 字段必填,必须是完整顺畅的自然语言,禁止依赖字符串拼接 +3. 主谓宾用于索引和检索结构化信息,提示词构建仅使用 `display` +4. 只提取确实值得记忆的信息,不要提取琐碎内容 +5. 确保信息准确、具体、有价值 +6. 重要性: 1=低, 2=一般, 3=高, 4=关键;置信度: 1=低, 2=中等, 3=高, 4=已验证 """ import re @@ -397,6 +398,7 @@ class MemoryBuilder: "memories": [ {{ "type": "记忆类型", + "display": "一句自然流畅的中文描述,用于直接展示和提示词构建", "subject": "主语(通常是用户)", "predicate": "谓语(动作/状态)", "object": "宾语(对象/属性)", @@ -409,11 +411,16 @@ class MemoryBuilder: }} 注意: -1. 只提取确实值得记忆的信息,不要提取琐碎内容 -2. 确保提取的信息准确、具体、有价值 -3. 使用主谓宾结构确保信息清晰 -4. 重要性等级: 1=低, 2=一般, 3=高, 4=关键 -5. 置信度: 1=低, 2=中等, 3=高, 4=已验证 +1. `display` 字段必填,必须是完整顺畅的自然语言,禁止依赖字符串拼接 +2. **display 字段格式要求**: 使用自然流畅的中文描述,格式示例: + - 用户养了一只名叫Whiskers的猫。 + - 用户特别喜欢拿铁咖啡。 + - 在2024年5月15日,用户提到对新项目感到很有压力。 + - 用户认为这个电影很有趣。 +3. 主谓宾用于索引和检索,提示词构建仅使用 `display` 的自然语言描述 +4. 只提取确实值得记忆的信息,不要提取琐碎内容 +5. 确保提取的信息准确、具体、有价值 +6. 重要性等级: 1=低, 2=一般, 3=高, 4=关键;置信度: 1=低, 2=中等, 3=高, 4=已验证 ## 🚨 时间处理要求(强制): - **绝对时间优先**:任何涉及时间的记忆都必须使用绝对日期格式 @@ -532,19 +539,38 @@ class MemoryBuilder: "confidence" ) + predicate_value = mem_data.get("predicate", "") + object_value = mem_data.get("object", "") + + display_text = self._sanitize_display_text(mem_data.get("display")) + used_fallback_display = False + if not display_text: + display_text = self._compose_display_text(normalized_subject, predicate_value, object_value) + used_fallback_display = True + memory = create_memory_chunk( user_id=user_id, subject=normalized_subject, - predicate=mem_data.get("predicate", ""), - obj=mem_data.get("object", ""), + predicate=predicate_value, + obj=object_value, memory_type=MemoryType(mem_data.get("type", "contextual")), chat_id=context.get("chat_id"), source_context=mem_data.get("reasoning", ""), importance=importance_level, confidence=confidence_level, - display=mem_data.get("display") + display=display_text ) + if used_fallback_display: + logger.warning( + "LLM 记忆缺少自然语言 display 字段,已基于主谓宾临时生成描述", + fallback_generated=True, + memory_type=memory.memory_type.value, + subjects=memory.content.to_subject_list(), + predicate=predicate_value, + object_payload=object_value, + ) + # 添加关键词 keywords = mem_data.get("keywords", []) for keyword in keywords: @@ -755,6 +781,23 @@ class MemoryBuilder: cleaned = re.sub(r"[、,,;;]+$", "", cleaned) return cleaned + def _sanitize_display_text(self, value: Any) -> str: + if value is None: + return "" + + if isinstance(value, (list, dict)): + try: + value = orjson.dumps(value, ensure_ascii=False).decode("utf-8") + except Exception: + value = str(value) + + text = str(value).strip() + if not text or text.lower() in {"null", "none", "undefined"}: + return "" + + text = re.sub(r"[\s\u3000]+", " ", text) + return text.strip("\n ") + def _looks_like_system_identifier(self, value: str) -> bool: if not value: return False diff --git a/src/chat/memory_system/memory_formatter.py b/src/chat/memory_system/memory_formatter.py new file mode 100644 index 000000000..bc626d6d6 --- /dev/null +++ b/src/chat/memory_system/memory_formatter.py @@ -0,0 +1,337 @@ +# -*- coding: utf-8 -*- +""" +记忆格式化器 +将召回的记忆转化为LLM友好的Markdown格式 +""" + +from typing import List, Dict, Any, Optional +from datetime import datetime +from dataclasses import dataclass + +from src.common.logger import get_logger +from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType + +logger = get_logger(__name__) + + +@dataclass +class FormatterConfig: + """格式化器配置""" + include_timestamps: bool = True # 是否包含时间信息 + include_memory_types: bool = True # 是否包含记忆类型 + include_confidence: bool = False # 是否包含置信度信息 + max_display_length: int = 200 # 单条记忆最大显示长度 + datetime_format: str = "%Y年%m月%d日" # 时间格式 + use_emoji_icons: bool = True # 是否使用emoji图标 + group_by_type: bool = False # 是否按类型分组 + use_bracket_format: bool = False # 是否使用方括号格式 [类型] 内容 + compact_format: bool = False # 是否使用紧凑格式 + + +class MemoryFormatter: + """记忆格式化器 - 将记忆转化为提示词友好的格式""" + + # 记忆类型对应的emoji图标 + TYPE_EMOJI_MAP = { + MemoryType.PERSONAL_FACT: "👤", + MemoryType.EVENT: "📅", + MemoryType.PREFERENCE: "❤️", + MemoryType.OPINION: "💭", + MemoryType.RELATIONSHIP: "👥", + MemoryType.EMOTION: "😊", + MemoryType.KNOWLEDGE: "📚", + MemoryType.SKILL: "🛠️", + MemoryType.GOAL: "🎯", + MemoryType.EXPERIENCE: "🌟", + MemoryType.CONTEXTUAL: "💬" + } + + # 记忆类型的中文标签 - 优化格式 + TYPE_LABELS = { + MemoryType.PERSONAL_FACT: "个人事实", + MemoryType.EVENT: "事件", + MemoryType.PREFERENCE: "偏好", + MemoryType.OPINION: "观点", + MemoryType.RELATIONSHIP: "关系", + MemoryType.EMOTION: "情感", + MemoryType.KNOWLEDGE: "知识", + MemoryType.SKILL: "技能", + MemoryType.GOAL: "目标", + MemoryType.EXPERIENCE: "经验", + MemoryType.CONTEXTUAL: "上下文" + } + + def __init__(self, config: Optional[FormatterConfig] = None): + self.config = config or FormatterConfig() + + def format_memories_for_prompt( + self, + memories: List[MemoryChunk], + query_context: Optional[str] = None + ) -> str: + """ + 将记忆列表格式化为LLM提示词 + + Args: + memories: 记忆列表 + query_context: 查询上下文(可选) + + Returns: + 格式化的Markdown文本 + """ + if not memories: + return "" + + lines = ["## 🧠 相关记忆回顾", ""] + + if query_context: + lines.extend([ + f"*查询上下文: {query_context}*", + "" + ]) + + if self.config.group_by_type: + lines.extend(self._format_memories_by_type(memories)) + else: + lines.extend(self._format_memories_chronologically(memories)) + + return "\n".join(lines) + + def _format_memories_by_type(self, memories: List[MemoryChunk]) -> List[str]: + """按类型分组格式化记忆""" + # 按类型分组 + grouped_memories = {} + for memory in memories: + memory_type = memory.memory_type + if memory_type not in grouped_memories: + grouped_memories[memory_type] = [] + grouped_memories[memory_type].append(memory) + + lines = [] + + # 为每个类型生成格式化文本 + for memory_type, type_memories in grouped_memories.items(): + emoji = self.TYPE_EMOJI_MAP.get(memory_type, "📝") + label = self.TYPE_LABELS.get(memory_type, memory_type.value) + + lines.extend([ + f"### {emoji} {label}", + "" + ]) + + for memory in type_memories: + formatted_item = self._format_single_memory(memory, include_type=False) + lines.append(formatted_item) + + lines.append("") # 类型间空行 + + return lines + + def _format_memories_chronologically(self, memories: List[MemoryChunk]) -> List[str]: + """按时间顺序格式化记忆""" + lines = [] + + for i, memory in enumerate(memories, 1): + formatted_item = self._format_single_memory(memory, include_type=True, index=i) + lines.append(formatted_item) + + return lines + + def _format_single_memory( + self, + memory: MemoryChunk, + include_type: bool = True, + index: Optional[int] = None + ) -> str: + """格式化单条记忆""" + # 如果启用方括号格式,使用新格式 + if self.config.use_bracket_format: + return self._format_single_memory_bracket(memory) + + # 获取显示文本 + display_text = memory.display or memory.text_content + if len(display_text) > self.config.max_display_length: + display_text = display_text[:self.config.max_display_length - 3] + "..." + + # 构建前缀 + prefix_parts = [] + + # 添加序号 + if index is not None: + prefix_parts.append(f"{index}.") + + # 添加类型标签 + if include_type and self.config.include_memory_types: + if self.config.use_emoji_icons: + emoji = self.TYPE_EMOJI_MAP.get(memory.memory_type, "📝") + prefix_parts.append(f"**{emoji}") + else: + label = self.TYPE_LABELS.get(memory.memory_type, memory.memory_type.value) + prefix_parts.append(f"**[{label}]") + + # 添加时间信息 + if self.config.include_timestamps: + timestamp = memory.metadata.created_at + if timestamp > 0: + dt = datetime.fromtimestamp(timestamp) + time_str = dt.strftime(self.config.datetime_format) + if self.config.use_emoji_icons: + prefix_parts.append(f"⏰ {time_str}") + else: + prefix_parts.append(f"({time_str})") + + # 添加置信度信息 + if self.config.include_confidence: + confidence = memory.metadata.confidence.value + confidence_stars = "★" * confidence + "☆" * (4 - confidence) + prefix_parts.append(f"信度:{confidence_stars}") + + # 构建完整格式 + if prefix_parts: + if self.config.include_memory_types and self.config.use_emoji_icons: + prefix = " ".join(prefix_parts) + "** " + else: + prefix = " ".join(prefix_parts) + " " + return f"- {prefix}{display_text}" + else: + return f"- {display_text}" + + def _format_single_memory_bracket(self, memory: MemoryChunk) -> str: + """格式化单条记忆 - 使用方括号格式 [类型] 内容""" + # 获取显示文本 + display_text = memory.display or memory.text_content + + # 如果启用紧凑格式,只显示核心内容 + if self.config.compact_format: + if len(display_text) > self.config.max_display_length: + display_text = display_text[:self.config.max_display_length - 3] + "..." + else: + # 非紧凑格式可以包含时间信息 + if self.config.include_timestamps: + timestamp = memory.metadata.created_at + if timestamp > 0: + dt = datetime.fromtimestamp(timestamp) + time_str = dt.strftime("%Y年%m月%d日") + # 将时间信息自然地整合到内容中 + if "在" not in display_text and "当" not in display_text: + display_text = f"在{time_str},{display_text}" + + # 获取类型标签 + label = self.TYPE_LABELS.get(memory.memory_type, memory.memory_type.value) + + # 构建方括号格式: **[类型]** 内容 + return f"- **[{label}]** {display_text}" + + def format_memory_summary(self, memories: List[MemoryChunk]) -> str: + """生成记忆摘要统计""" + if not memories: + return "暂无相关记忆。" + + # 统计信息 + total_count = len(memories) + type_counts = {} + + for memory in memories: + memory_type = memory.memory_type + type_counts[memory_type] = type_counts.get(memory_type, 0) + 1 + + # 生成摘要 + lines = [f"**记忆摘要**: 共找到 {total_count} 条相关记忆"] + + if len(type_counts) > 1: + type_summaries = [] + for memory_type, count in type_counts.items(): + emoji = self.TYPE_EMOJI_MAP.get(memory_type, "📝") + label = self.TYPE_LABELS.get(memory_type, memory_type.value) + type_summaries.append(f"{emoji}{label} {count}条") + + lines.append(f"包括: {', '.join(type_summaries)}") + + return " | ".join(lines) + + def format_for_debug(self, memories: List[MemoryChunk]) -> str: + """生成调试格式的记忆列表""" + if not memories: + return "无记忆数据" + + lines = ["### 记忆调试信息", ""] + + for i, memory in enumerate(memories, 1): + lines.extend([ + f"**记忆 {i}** (ID: {memory.memory_id[:8]})", + f"- 类型: {memory.memory_type.value}", + f"- 内容: {memory.display[:100]}{'...' if len(memory.display) > 100 else ''}", + f"- 访问次数: {memory.metadata.access_count}", + f"- 置信度: {memory.metadata.confidence.value}/4", + f"- 重要性: {memory.metadata.importance.value}/4", + f"- 创建时间: {datetime.fromtimestamp(memory.metadata.created_at).strftime('%Y-%m-%d %H:%M')}", + "" + ]) + + return "\n".join(lines) + + +# 创建默认格式化器实例 +default_formatter = MemoryFormatter() + + +def format_memories_for_llm( + memories: List[MemoryChunk], + query_context: Optional[str] = None, + config: Optional[FormatterConfig] = None +) -> str: + """ + 便捷函数:将记忆格式化为LLM提示词 + """ + if config: + formatter = MemoryFormatter(config) + else: + formatter = default_formatter + + return formatter.format_memories_for_prompt(memories, query_context) + + +def format_memory_summary( + memories: List[MemoryChunk], + config: Optional[FormatterConfig] = None +) -> str: + """ + 便捷函数:生成记忆摘要 + """ + if config: + formatter = MemoryFormatter(config) + else: + formatter = default_formatter + + return formatter.format_memory_summary(memories) + + +def format_memories_bracket_style( + memories: List[MemoryChunk], + query_context: Optional[str] = None, + compact: bool = True, + include_timestamps: bool = True +) -> str: + """ + 便捷函数:使用方括号格式格式化记忆 + + Args: + memories: 记忆列表 + query_context: 查询上下文 + compact: 是否使用紧凑格式 + include_timestamps: 是否包含时间信息 + + Returns: + 格式化的Markdown文本 + """ + config = FormatterConfig( + use_bracket_format=True, + compact_format=compact, + include_timestamps=include_timestamps, + include_memory_types=True, + use_emoji_icons=False, + group_by_type=False + ) + + formatter = MemoryFormatter(config) + return formatter.format_memories_for_prompt(memories, query_context) \ No newline at end of file diff --git a/src/chat/memory_system/memory_query_planner.py b/src/chat/memory_system/memory_query_planner.py index d2e80a4a5..e6f64c97b 100644 --- a/src/chat/memory_system/memory_query_planner.py +++ b/src/chat/memory_system/memory_query_planner.py @@ -136,25 +136,23 @@ class MemoryQueryPlanner: persona = context.get("bot_personality") or context.get("bot_identity") or "未知" return f""" -你是一名记忆检索分析师,将根据对话查询生成结构化的检索计划。 -请结合提供的上下文,输出一个JSON对象,字段含义如下: -- semantic_query: 提供给向量检索的自然语言查询,要求清晰具体; -- memory_types: 建议检索的记忆类型数组,取值范围参见 MemoryType 枚举 (personal_fact,event,preference,opinion,relationship,emotion,knowledge,skill,goal,experience,contextual); -- subject_includes: 需要出现在记忆主语中的人或角色列表; -- object_includes: 记忆中需要提到的重要对象或主题关键词列表; -- required_keywords: 检索时必须包含的关键词; -- optional_keywords: 可以提升相关性的附加关键词; -- owner_filters: 如果需要限制检索所属主体,请列出用户ID或其它标识; -- recency: 建议的时间偏好,可选 recent/any/historical; -- emphasis: 检索策略倾向,可选 precision/recall/balanced; -- limit: 推荐的最大返回数量(1-15之间); -- notes: 额外说明,可选。 +你是一名记忆检索规划助手,请基于输入生成一个简洁的 JSON 检索计划。 +仅需提供以下字段: +- semantic_query: 用于向量召回的自然语言描述,要求具体且贴合当前查询; +- memory_types: 建议检索的记忆类型列表,取值范围来自 MemoryType 枚举 (personal_fact,event,preference,opinion,relationship,emotion,knowledge,skill,goal,experience,contextual); +- subject_includes: 建议出现在记忆主语中的人物或角色; +- object_includes: 建议关注的对象、主题或关键信息; +- recency: 推荐的时间偏好,可选 recent/any/historical; +- limit: 推荐的最大返回数量 (1-15); +- notes: 额外补充说明(可选)。 + +请不要生成谓语字段,也不要额外补充其它参数。 当前查询: "{query_text}" 已知的对话参与者: {participant_preview} 机器人设定: {persona} -请输出符合要求的JSON,禁止添加额外说明或Markdown代码块。 +请直接输出符合要求的 JSON 对象,禁止添加额外文本或 Markdown 代码块。 """ def _extract_json_payload(self, response: str) -> Optional[str]: diff --git a/src/chat/memory_system/multi_stage_retrieval.py b/src/chat/memory_system/multi_stage_retrieval.py index 44bb5e62e..21b99f7f4 100644 --- a/src/chat/memory_system/multi_stage_retrieval.py +++ b/src/chat/memory_system/multi_stage_retrieval.py @@ -7,12 +7,14 @@ import time import asyncio from typing import Dict, List, Optional, Tuple, Set, Any -from dataclasses import dataclass +from dataclasses import dataclass, field from enum import Enum import numpy as np +import orjson from src.common.logger import get_logger from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType, ConfidenceLevel, ImportanceLevel +from src.chat.memory_system.enhanced_reranker import EnhancedReRanker, ReRankingConfig logger = get_logger(__name__) @@ -28,15 +30,15 @@ class RetrievalStage(Enum): @dataclass class RetrievalConfig: """检索配置""" - # 各阶段配置 - metadata_filter_limit: int = 100 # 元数据过滤阶段返回数量 - vector_search_limit: int = 50 # 向量搜索阶段返回数量 - semantic_rerank_limit: int = 20 # 语义重排序阶段返回数量 + # 各阶段配置 - 优化召回率 + metadata_filter_limit: int = 150 # 元数据过滤阶段返回数量(增加) + vector_search_limit: int = 80 # 向量搜索阶段返回数量(增加) + semantic_rerank_limit: int = 30 # 语义重排序阶段返回数量(增加) final_result_limit: int = 10 # 最终结果数量 - # 相似度阈值 - vector_similarity_threshold: float = 0.7 # 向量相似度阈值 - semantic_similarity_threshold: float = 0.6 # 语义相似度阈值 + # 相似度阈值 - 优化召回率 + vector_similarity_threshold: float = 0.5 # 向量相似度阈值(降低以提升召回率) + semantic_similarity_threshold: float = 0.05 # 语义相似度阈值(保持较低以获得更多相关记忆) # 权重配置 vector_weight: float = 0.4 # 向量相似度权重 @@ -50,15 +52,15 @@ class RetrievalConfig: from src.config.config import global_config return cls( - # 各阶段配置 - metadata_filter_limit=global_config.memory.metadata_filter_limit, - vector_search_limit=global_config.memory.vector_search_limit, - semantic_rerank_limit=global_config.memory.semantic_rerank_limit, + # 各阶段配置 - 优化召回率 + metadata_filter_limit=max(150, global_config.memory.metadata_filter_limit), # 增加候选池 + vector_search_limit=max(80, global_config.memory.vector_search_limit), # 增加向量搜索结果 + semantic_rerank_limit=max(30, global_config.memory.semantic_rerank_limit), # 增加重排序候选 final_result_limit=global_config.memory.final_result_limit, - # 相似度阈值 - vector_similarity_threshold=global_config.memory.vector_similarity_threshold, - semantic_similarity_threshold=0.6, # 保持默认值 + # 相似度阈值 - 优化召回率 + vector_similarity_threshold=max(0.5, global_config.memory.vector_similarity_threshold), # 确保不低于0.5 + semantic_similarity_threshold=0.05, # 进一步降低以提升召回率 # 权重配置 vector_weight=global_config.memory.vector_weight, @@ -76,6 +78,7 @@ class StageResult: processing_time: float filtered_count: int score_threshold: float + details: List[Dict[str, Any]] = field(default_factory=list) @dataclass @@ -95,6 +98,16 @@ class MultiStageRetrieval: def __init__(self, config: Optional[RetrievalConfig] = None): self.config = config or RetrievalConfig.from_global_config() + + # 初始化增强重排序器 + reranker_config = ReRankingConfig( + semantic_weight=self.config.vector_weight, + recency_weight=self.config.recency_weight, + usage_freq_weight=0.2, # 新增的使用频率权重 + type_match_weight=0.1 # 新增的类型匹配权重 + ) + self.reranker = EnhancedReRanker(reranker_config) + self.retrieval_stats = { "total_queries": 0, "average_retrieval_time": 0.0, @@ -102,7 +115,8 @@ class MultiStageRetrieval: "metadata_filtering": {"calls": 0, "avg_time": 0.0}, "vector_search": {"calls": 0, "avg_time": 0.0}, "semantic_reranking": {"calls": 0, "avg_time": 0.0}, - "contextual_filtering": {"calls": 0, "avg_time": 0.0} + "contextual_filtering": {"calls": 0, "avg_time": 0.0}, + "enhanced_reranking": {"calls": 0, "avg_time": 0.0} # 新增统计 } } @@ -122,41 +136,68 @@ class MultiStageRetrieval: stage_results = [] current_memory_ids = set() + memory_debug_info: Dict[str, Dict[str, Any]] = {} try: logger.debug(f"开始多阶段检索:query='{query}', user_id='{user_id}'") # 阶段1:元数据过滤 stage1_result = await self._metadata_filtering_stage( - query, user_id, context, metadata_index, all_memories_cache + query, user_id, context, metadata_index, all_memories_cache, + debug_log=memory_debug_info ) stage_results.append(stage1_result) current_memory_ids.update(stage1_result.memory_ids) # 阶段2:向量搜索 stage2_result = await self._vector_search_stage( - query, user_id, context, vector_storage, current_memory_ids, all_memories_cache + query, user_id, context, vector_storage, current_memory_ids, all_memories_cache, + debug_log=memory_debug_info ) stage_results.append(stage2_result) current_memory_ids.update(stage2_result.memory_ids) # 阶段3:语义重排序 stage3_result = await self._semantic_reranking_stage( - query, user_id, context, current_memory_ids, all_memories_cache + query, user_id, context, current_memory_ids, all_memories_cache, + debug_log=memory_debug_info ) stage_results.append(stage3_result) # 阶段4:上下文过滤 stage4_result = await self._contextual_filtering_stage( - query, user_id, context, stage3_result.memory_ids, all_memories_cache, limit + query, user_id, context, stage3_result.memory_ids, all_memories_cache, limit, + debug_log=memory_debug_info ) stage_results.append(stage4_result) + # 检查是否需要回退机制 + if len(stage4_result.memory_ids) < min(3, limit): + logger.debug(f"上下文过滤结果过少({len(stage4_result.memory_ids)}),启用回退机制") + # 回退到更宽松的检索策略 + fallback_result = await self._fallback_retrieval_stage( + query, user_id, context, all_memories_cache, limit, + excluded_ids=set(stage4_result.memory_ids), + debug_log=memory_debug_info + ) + if fallback_result.memory_ids: + stage4_result.memory_ids.extend(fallback_result.memory_ids[:limit - len(stage4_result.memory_ids)]) + logger.debug(f"回退机制补充了 {len(fallback_result.memory_ids)} 条记忆") + + # 阶段5:增强重排序 (新增) + stage5_result = await self._enhanced_reranking_stage( + query, user_id, context, stage4_result.memory_ids, all_memories_cache, limit, + debug_log=memory_debug_info + ) + stage_results.append(stage5_result) + # 获取最终记忆对象 final_memories = [] - for memory_id in stage4_result.memory_ids: + for memory_id in stage5_result.memory_ids: # 使用重排序后的结果 if memory_id in all_memories_cache: - final_memories.append(all_memories_cache[memory_id]) + memory = all_memories_cache[memory_id] + memory.update_access() # 更新访问统计 + final_memories.append(memory) # 更新统计 total_time = time.time() - start_time @@ -166,6 +207,58 @@ class MultiStageRetrieval: logger.debug(f"多阶段检索完成:返回 {len(final_memories)} 条记忆,耗时 {total_time:.3f}s") + if memory_debug_info: + final_ids_set = set(stage5_result.memory_ids) # 使用重排序后的结果 + debug_entries = [] + for memory_id, trace in memory_debug_info.items(): + memory_obj = all_memories_cache.get(memory_id) + display_text = "" + if memory_obj: + display_text = (memory_obj.display or memory_obj.text_content or "").strip() + if len(display_text) > 80: + display_text = display_text[:77] + "..." + + entry = { + "memory_id": memory_id, + "display": display_text, + "memory_type": memory_obj.memory_type.value if memory_obj else None, + "vector_similarity": trace.get("vector_stage", {}).get("similarity"), + "semantic_score": trace.get("semantic_stage", {}).get("score"), + "context_score": trace.get("context_stage", {}).get("context_score"), + "final_score": trace.get("context_stage", {}).get("final_score"), + "status": trace.get("context_stage", {}).get("status") or trace.get("vector_stage", {}).get("status") or trace.get("semantic_stage", {}).get("status"), + "is_final": memory_id in final_ids_set, + } + debug_entries.append(entry) + + # 限制日志输出数量 + debug_entries.sort(key=lambda item: (item.get("is_final", False), item.get("final_score") or item.get("vector_similarity") or 0.0), reverse=True) + debug_payload = { + "query": query, + "semantic_query": context.get("resolved_query_text", query), + "user_id": user_id, + "stage_summaries": [ + { + "stage": result.stage.value, + "returned": len(result.memory_ids), + "filtered": result.filtered_count, + "duration": round(result.processing_time, 4), + "details": result.details, + } + for result in stage_results + ], + "candidates": debug_entries[:20], + } + try: + logger.info( + f"🧭 记忆检索调试 | query='{query}' | final={len(stage5_result.memory_ids)}", + extra={"memory_debug": debug_payload}, + ) + except Exception: + logger.info( + f"🧭 记忆检索调试详情: {orjson.dumps(debug_payload, ensure_ascii=False).decode('utf-8')}", + ) + return RetrievalResult( query=query, user_id=user_id, @@ -195,7 +288,9 @@ class MultiStageRetrieval: user_id: str, context: Dict[str, Any], metadata_index, - all_memories_cache: Dict[str, MemoryChunk] + all_memories_cache: Dict[str, MemoryChunk], + *, + debug_log: Optional[Dict[str, Dict[str, Any]]] = None ) -> StageResult: """阶段1:元数据过滤""" start_time = time.time() @@ -223,6 +318,7 @@ class MultiStageRetrieval: result = await metadata_index.query_memories(index_query) result_ids = list(result.memory_ids) filtered_count = max(0, len(all_memories_cache) - len(result_ids)) + details: List[Dict[str, Any]] = [] # 如果未命中任何索引且未指定所有者过滤,则回退到最近访问的记忆 if not result_ids: @@ -271,6 +367,12 @@ class MultiStageRetrieval: bool(subjects), bool(keywords), ) + details.append({ + "note": "fallback_recent", + "requested_types": [mt.value for mt in memory_types] if memory_types else [], + "subjects": subjects or [], + "keywords": keywords or [], + }) logger.debug( "元数据过滤:候选=%d, 返回=%d", @@ -278,12 +380,23 @@ class MultiStageRetrieval: len(result_ids), ) + for memory_id in result_ids[:20]: + detail_entry = { + "memory_id": memory_id, + "status": "candidate", + } + details.append(detail_entry) + if debug_log is not None: + stage_entry = debug_log.setdefault(memory_id, {}).setdefault("metadata_stage", {}) + stage_entry["status"] = "candidate" + return StageResult( stage=RetrievalStage.METADATA_FILTERING, memory_ids=result_ids, processing_time=time.time() - start_time, filtered_count=filtered_count, - score_threshold=0.0 + score_threshold=0.0, + details=details, ) except Exception as e: @@ -293,7 +406,8 @@ class MultiStageRetrieval: memory_ids=[], processing_time=time.time() - start_time, filtered_count=0, - score_threshold=0.0 + score_threshold=0.0, + details=[{"error": str(e)}], ) async def _vector_search_stage( @@ -303,7 +417,9 @@ class MultiStageRetrieval: context: Dict[str, Any], vector_storage, candidate_ids: Set[str], - all_memories_cache: Dict[str, MemoryChunk] + all_memories_cache: Dict[str, MemoryChunk], + *, + debug_log: Optional[Dict[str, Dict[str, Any]]] = None ) -> StageResult: """阶段2:向量搜索""" start_time = time.time() @@ -313,12 +429,14 @@ class MultiStageRetrieval: query_embedding = await self._generate_query_embedding(query, context, vector_storage) if not query_embedding: + logger.warning("向量搜索阶段:查询向量生成失败") return StageResult( stage=RetrievalStage.VECTOR_SEARCH, memory_ids=[], processing_time=time.time() - start_time, filtered_count=0, - score_threshold=self.config.vector_similarity_threshold + score_threshold=self.config.vector_similarity_threshold, + details=[{"note": "query_embedding_unavailable"}], ) # 执行向量搜索 @@ -327,17 +445,69 @@ class MultiStageRetrieval: limit=self.config.vector_search_limit ) + if not search_result: + logger.warning("向量搜索阶段:搜索返回空结果,尝试回退到文本匹配") + # 向量搜索失败时的回退策略 + return self._create_text_search_fallback(candidate_ids, all_memories_cache, query, start_time) + candidate_pool = candidate_ids or set(all_memories_cache.keys()) # 过滤候选记忆 filtered_memories = [] + details: List[Dict[str, Any]] = [] + raw_details: List[Dict[str, Any]] = [] + threshold = self.config.vector_similarity_threshold + for memory_id, similarity in search_result: - if memory_id in candidate_pool and similarity >= self.config.vector_similarity_threshold: + in_metadata_candidates = memory_id in candidate_pool + above_threshold = similarity >= threshold + if in_metadata_candidates and above_threshold: filtered_memories.append((memory_id, similarity)) + raw_details.append({ + "memory_id": memory_id, + "similarity": similarity, + "in_metadata": in_metadata_candidates, + "above_threshold": above_threshold, + }) + # 按相似度排序 filtered_memories.sort(key=lambda x: x[1], reverse=True) result_ids = [memory_id for memory_id, _ in filtered_memories[:self.config.vector_search_limit]] + kept_ids = set(result_ids) + + for entry in raw_details: + memory_id = entry["memory_id"] + similarity = entry["similarity"] + in_metadata = entry["in_metadata"] + above_threshold = entry["above_threshold"] + + status = "kept" + reason = None + if not in_metadata: + status = "excluded" + reason = "not_in_metadata_candidates" + elif not above_threshold: + status = "excluded" + reason = "below_threshold" + elif memory_id not in kept_ids: + status = "excluded" + reason = "limit_pruned" + + detail_entry = { + "memory_id": memory_id, + "similarity": round(similarity, 4), + "status": status, + "reason": reason, + } + details.append(detail_entry) + + if debug_log is not None: + stage_entry = debug_log.setdefault(memory_id, {}).setdefault("vector_stage", {}) + stage_entry["similarity"] = round(similarity, 4) + stage_entry["status"] = status + if reason: + stage_entry["reason"] = reason filtered_count = max(0, len(candidate_pool) - len(result_ids)) @@ -348,7 +518,8 @@ class MultiStageRetrieval: memory_ids=result_ids, processing_time=time.time() - start_time, filtered_count=filtered_count, - score_threshold=self.config.vector_similarity_threshold + score_threshold=self.config.vector_similarity_threshold, + details=details, ) except Exception as e: @@ -358,7 +529,68 @@ class MultiStageRetrieval: memory_ids=[], processing_time=time.time() - start_time, filtered_count=0, - score_threshold=self.config.vector_similarity_threshold + score_threshold=self.config.vector_similarity_threshold, + details=[{"error": str(e)}], + ) + + def _create_text_search_fallback( + self, + candidate_ids: Set[str], + all_memories_cache: Dict[str, MemoryChunk], + query_text: str, + start_time: float + ) -> StageResult: + """当向量搜索失败时,使用文本搜索作为回退策略""" + try: + query_lower = query_text.lower() + query_words = set(query_lower.split()) + + text_matches = [] + for memory_id in candidate_ids: + if memory_id not in all_memories_cache: + continue + + memory = all_memories_cache[memory_id] + memory_text = (memory.display or memory.text_content or "").lower() + + # 简单的文本匹配评分 + word_matches = sum(1 for word in query_words if word in memory_text) + if word_matches > 0: + score = word_matches / len(query_words) + text_matches.append((memory_id, score)) + + # 按匹配度排序 + text_matches.sort(key=lambda x: x[1], reverse=True) + result_ids = [memory_id for memory_id, _ in text_matches[:self.config.vector_search_limit]] + + details = [] + for memory_id, score in text_matches[:self.config.vector_search_limit]: + details.append({ + "memory_id": memory_id, + "text_match_score": round(score, 4), + "status": "text_match_fallback" + }) + + logger.debug(f"向量搜索回退到文本匹配:找到 {len(result_ids)} 条匹配记忆") + + return StageResult( + stage=RetrievalStage.VECTOR_SEARCH, + memory_ids=result_ids, + processing_time=time.time() - start_time, + filtered_count=len(candidate_ids) - len(result_ids), + score_threshold=0.0, # 文本匹配无严格阈值 + details=details + ) + + except Exception as e: + logger.error(f"文本搜索回退失败: {e}") + return StageResult( + stage=RetrievalStage.VECTOR_SEARCH, + memory_ids=list(candidate_ids)[:self.config.vector_search_limit], + processing_time=time.time() - start_time, + filtered_count=0, + score_threshold=0.0, + details=[{"error": str(e), "note": "text_fallback_failed"}] ) async def _semantic_reranking_stage( @@ -367,13 +599,17 @@ class MultiStageRetrieval: user_id: str, context: Dict[str, Any], candidate_ids: Set[str], - all_memories_cache: Dict[str, MemoryChunk] + all_memories_cache: Dict[str, MemoryChunk], + *, + debug_log: Optional[Dict[str, Dict[str, Any]]] = None ) -> StageResult: """阶段3:语义重排序""" start_time = time.time() try: reranked_memories = [] + details: List[Dict[str, Any]] = [] + threshold = self.config.semantic_similarity_threshold for memory_id in candidate_ids: if memory_id not in all_memories_cache: @@ -384,15 +620,43 @@ class MultiStageRetrieval: # 计算综合语义相似度 semantic_score = await self._calculate_semantic_similarity(query, memory, context) - if semantic_score >= self.config.semantic_similarity_threshold: + if semantic_score >= threshold: reranked_memories.append((memory_id, semantic_score)) + status = "kept" if semantic_score >= threshold else "excluded" + reason = None if status == "kept" else "below_threshold" + + detail_entry = { + "memory_id": memory_id, + "score": round(semantic_score, 4), + "status": status, + "reason": reason, + } + details.append(detail_entry) + + if debug_log is not None: + stage_entry = debug_log.setdefault(memory_id, {}).setdefault("semantic_stage", {}) + stage_entry["score"] = round(semantic_score, 4) + stage_entry["status"] = status + if reason: + stage_entry["reason"] = reason + # 按语义相似度排序 reranked_memories.sort(key=lambda x: x[1], reverse=True) result_ids = [memory_id for memory_id, _ in reranked_memories[:self.config.semantic_rerank_limit]] + kept_ids = set(result_ids) filtered_count = len(candidate_ids) - len(result_ids) + for detail in details: + if detail["status"] == "kept" and detail["memory_id"] not in kept_ids: + detail["status"] = "excluded" + detail["reason"] = "limit_pruned" + if debug_log is not None: + stage_entry = debug_log.setdefault(detail["memory_id"], {}).setdefault("semantic_stage", {}) + stage_entry["status"] = "excluded" + stage_entry["reason"] = "limit_pruned" + logger.debug(f"语义重排序:{len(candidate_ids)} -> {len(result_ids)} 条记忆") return StageResult( @@ -400,7 +664,8 @@ class MultiStageRetrieval: memory_ids=result_ids, processing_time=time.time() - start_time, filtered_count=filtered_count, - score_threshold=self.config.semantic_similarity_threshold + score_threshold=self.config.semantic_similarity_threshold, + details=details, ) except Exception as e: @@ -410,7 +675,8 @@ class MultiStageRetrieval: memory_ids=list(candidate_ids), # 失败时返回原候选集 processing_time=time.time() - start_time, filtered_count=0, - score_threshold=self.config.semantic_similarity_threshold + score_threshold=self.config.semantic_similarity_threshold, + details=[{"error": str(e)}], ) async def _contextual_filtering_stage( @@ -420,13 +686,16 @@ class MultiStageRetrieval: context: Dict[str, Any], candidate_ids: List[str], all_memories_cache: Dict[str, MemoryChunk], - limit: int + limit: int, + *, + debug_log: Optional[Dict[str, Dict[str, Any]]] = None ) -> StageResult: """阶段4:上下文过滤""" start_time = time.time() try: final_memories = [] + details: List[Dict[str, Any]] = [] for memory_id in candidate_ids: if memory_id not in all_memories_cache: @@ -442,9 +711,38 @@ class MultiStageRetrieval: final_memories.append((memory_id, final_score)) + detail_entry = { + "memory_id": memory_id, + "context_score": round(context_score, 4), + "final_score": round(final_score, 4), + "status": "candidate", + } + details.append(detail_entry) + + if debug_log is not None: + stage_entry = debug_log.setdefault(memory_id, {}).setdefault("context_stage", {}) + stage_entry["context_score"] = round(context_score, 4) + stage_entry["final_score"] = round(final_score, 4) + # 按最终评分排序 final_memories.sort(key=lambda x: x[1], reverse=True) result_ids = [memory_id for memory_id, _ in final_memories[:limit]] + kept_ids = set(result_ids) + + for detail in details: + memory_id = detail["memory_id"] + if memory_id in kept_ids: + detail["status"] = "final" + if debug_log is not None: + stage_entry = debug_log.setdefault(memory_id, {}).setdefault("context_stage", {}) + stage_entry["status"] = "final" + else: + detail["status"] = "excluded" + detail["reason"] = "ranked_out" + if debug_log is not None: + stage_entry = debug_log.setdefault(memory_id, {}).setdefault("context_stage", {}) + stage_entry["status"] = "excluded" + stage_entry["reason"] = "ranked_out" filtered_count = len(candidate_ids) - len(result_ids) @@ -455,7 +753,8 @@ class MultiStageRetrieval: memory_ids=result_ids, processing_time=time.time() - start_time, filtered_count=filtered_count, - score_threshold=0.0 # 动态阈值 + score_threshold=0.0, # 动态阈值 + details=details, ) except Exception as e: @@ -465,7 +764,97 @@ class MultiStageRetrieval: memory_ids=candidate_ids[:limit], # 失败时返回前limit个 processing_time=time.time() - start_time, filtered_count=0, - score_threshold=0.0 + score_threshold=0.0, + details=[{"error": str(e)}], + ) + + async def _fallback_retrieval_stage( + self, + query: str, + user_id: str, + context: Dict[str, Any], + all_memories_cache: Dict[str, MemoryChunk], + limit: int, + *, + excluded_ids: Optional[Set[str]] = None, + debug_log: Optional[Dict[str, Dict[str, Any]]] = None + ) -> StageResult: + """回退检索阶段 - 当主检索失败时使用更宽松的策略""" + start_time = time.time() + + try: + excluded_ids = excluded_ids or set() + fallback_candidates = [] + + # 策略1:基于关键词的简单匹配 + query_lower = query.lower() + query_words = set(query_lower.split()) + + for memory_id, memory in all_memories_cache.items(): + if memory_id in excluded_ids: + continue + + memory_text = (memory.display or memory.text_content or "").lower() + + # 简单的关键词匹配 + word_matches = sum(1 for word in query_words if word in memory_text) + if word_matches > 0: + score = word_matches / len(query_words) + fallback_candidates.append((memory_id, score)) + + # 策略2:如果没有关键词匹配,使用时序最近的原则 + if not fallback_candidates: + logger.debug("关键词匹配无结果,使用时序最近策略") + recent_memories = sorted( + [(mid, mem.metadata.last_accessed or mem.metadata.created_at) + for mid, mem in all_memories_cache.items() + if mid not in excluded_ids], + key=lambda x: x[1], + reverse=True + ) + fallback_candidates = [(mid, 0.5) for mid, _ in recent_memories[:limit*2]] + + # 按分数排序 + fallback_candidates.sort(key=lambda x: x[1], reverse=True) + result_ids = [memory_id for memory_id, _ in fallback_candidates[:limit]] + + # 记录调试信息 + details = [] + for memory_id, score in fallback_candidates[:limit]: + detail_entry = { + "memory_id": memory_id, + "fallback_score": round(score, 4), + "status": "fallback_candidate", + } + details.append(detail_entry) + + if debug_log is not None: + stage_entry = debug_log.setdefault(memory_id, {}).setdefault("fallback_stage", {}) + stage_entry["score"] = round(score, 4) + stage_entry["status"] = "fallback_candidate" + + filtered_count = len(all_memories_cache) - len(result_ids) + + logger.debug(f"回退检索完成:返回 {len(result_ids)} 条记忆") + + return StageResult( + stage=RetrievalStage.CONTEXTUAL_FILTERING, # 复用现有枚举 + memory_ids=result_ids, + processing_time=time.time() - start_time, + filtered_count=filtered_count, + score_threshold=0.0, # 回退机制无阈值 + details=details, + ) + + except Exception as e: + logger.error(f"回退检索阶段失败: {e}") + return StageResult( + stage=RetrievalStage.CONTEXTUAL_FILTERING, + memory_ids=[], + processing_time=time.time() - start_time, + filtered_count=0, + score_threshold=0.0, + details=[{"error": str(e)}], ) async def _generate_query_embedding(self, query: str, context: Dict[str, Any], vector_storage) -> Optional[List[float]]: @@ -477,37 +866,162 @@ class MultiStageRetrieval: query_text = query_plan.semantic_query if not query_text: + logger.debug("查询文本为空,无法生成查询向量") return None - if hasattr(vector_storage, "generate_query_embedding"): - return await vector_storage.generate_query_embedding(query_text) + if not hasattr(vector_storage, "generate_query_embedding"): + logger.warning("向量存储对象缺少 generate_query_embedding 方法") + return None + + logger.debug(f"正在生成查询向量,文本: '{query_text[:100]}'") + embedding = await vector_storage.generate_query_embedding(query_text) + + if embedding is None: + logger.warning("向量存储返回空的查询向量") + return None + + if len(embedding) == 0: + logger.warning("向量存储返回空列表作为查询向量") + return None + + logger.debug(f"查询向量生成成功,维度: {len(embedding)}") + return embedding - return None except Exception as e: - logger.warning(f"生成查询向量失败: {e}") + logger.error(f"生成查询向量时发生异常: {e}", exc_info=True) return None async def _calculate_semantic_similarity(self, query: str, memory: MemoryChunk, context: Dict[str, Any]) -> float: - """计算语义相似度""" + """计算语义相似度 - 简化优化版本,提升召回率""" try: query_plan = context.get("query_plan") query_text = query if query_plan and getattr(query_plan, "semantic_query", None): query_text = query_plan.semantic_query - # 简单的文本相似度计算 - query_words = set(query_text.lower().split()) - memory_text = (memory.display or memory.text_content or "").lower() - memory_words = set(memory_text.split()) + # 预处理:清理和标准化文本 + memory_text = (memory.display or memory.text_content or "").strip() + query_text = query_text.strip() - if not query_words or not memory_words: + if not query_text or not memory_text: return 0.0 - intersection = query_words & memory_words - union = query_words | memory_words + # 创建小写版本用于匹配 + query_lower = query_text.lower() + memory_lower = memory_text.lower() - jaccard_similarity = len(intersection) / len(union) - return jaccard_similarity + # 核心匹配策略1:精确子串匹配(最重要) + exact_score = 0.0 + if query_text in memory_text: + exact_score = 1.0 + elif query_lower in memory_lower: + exact_score = 0.9 + elif any(word in memory_lower for word in query_lower.split() if len(word) > 1): + exact_score = 0.4 + + # 核心匹配策略2:词汇匹配 + word_score = 0.0 + try: + import jieba + import re + + # 分词处理 + query_words = list(jieba.cut(query_text)) + re.findall(r'[a-zA-Z]+', query_text) + memory_words = list(jieba.cut(memory_text)) + re.findall(r'[a-zA-Z]+', memory_text) + + # 清理和标准化 + query_words = [w.strip().lower() for w in query_words if w.strip() and len(w.strip()) > 1] + memory_words = [w.strip().lower() for w in memory_words if w.strip() and len(w.strip()) > 1] + + if query_words and memory_words: + query_set = set(query_words) + memory_set = set(memory_words) + + # 精确匹配 + exact_matches = query_set & memory_set + exact_ratio = len(exact_matches) / len(query_set) if query_set else 0 + + # 部分匹配(包含关系) + partial_matches = 0 + for q_word in query_set: + if any(q_word in m_word or m_word in q_word for m_word in memory_set if len(q_word) >= 2): + partial_matches += 1 + + partial_ratio = partial_matches / len(query_set) if query_set else 0 + word_score = exact_ratio * 0.8 + partial_ratio * 0.3 + + except ImportError: + # 如果jieba不可用,使用简单分词 + import re + query_words = re.findall(r'[\w\u4e00-\u9fa5]+', query_lower) + memory_words = re.findall(r'[\w\u4e00-\u9fa5]+', memory_lower) + + if query_words and memory_words: + query_set = set(w for w in query_words if len(w) > 1) + memory_set = set(w for w in memory_words if len(w) > 1) + + if query_set: + intersection = query_set & memory_set + word_score = len(intersection) / len(query_set) + + # 核心匹配策略3:语义概念匹配 + concept_score = 0.0 + concept_groups = { + "饮食": ["吃", "饭", "菜", "餐", "饿", "饱", "食", "dinner", "eat", "food", "meal"], + "天气": ["天气", "阳光", "雨", "晴", "阴", "温度", "weather", "sunny", "rain"], + "编程": ["编程", "代码", "程序", "开发", "语言", "programming", "code", "develop", "python"], + "时间": ["今天", "昨天", "明天", "现在", "时间", "today", "yesterday", "tomorrow", "time"], + "情感": ["好", "坏", "开心", "难过", "有趣", "good", "bad", "happy", "sad", "fun"] + } + + query_concepts = {concept for concept, keywords in concept_groups.items() + if any(keyword in query_lower for keyword in keywords)} + memory_concepts = {concept for concept, keywords in concept_groups.items() + if any(keyword in memory_lower for keyword in keywords)} + + if query_concepts and memory_concepts: + concept_overlap = query_concepts & memory_concepts + concept_score = len(concept_overlap) / len(query_concepts) * 0.5 + + # 核心匹配策略4:查询计划增强 + plan_bonus = 0.0 + if query_plan: + # 主体匹配 + if hasattr(query_plan, 'subjects') and query_plan.subjects: + for subject in query_plan.subjects: + if subject.lower() in memory_lower: + plan_bonus += 0.15 + + # 对象匹配 + if hasattr(query_plan, 'objects') and query_plan.objects: + for obj in query_plan.objects: + if obj.lower() in memory_lower: + plan_bonus += 0.1 + + # 记忆类型匹配 + if hasattr(query_plan, 'memory_types') and query_plan.memory_types: + if memory.memory_type in query_plan.memory_types: + plan_bonus += 0.1 + + # 综合评分计算 - 简化权重分配 + if exact_score >= 0.9: + # 精确匹配为主 + final_score = exact_score * 0.6 + word_score * 0.2 + concept_score + plan_bonus + else: + # 综合评分 + final_score = exact_score * 0.3 + word_score * 0.3 + concept_score + plan_bonus + + # 基础分数保障:避免过低分数 + if final_score > 0: + if exact_score > 0 or word_score > 0.1: + final_score = max(final_score, 0.1) # 有实际匹配的最小分数 + else: + final_score = max(final_score, 0.05) # 仅概念匹配的最小分数 + + # 确保分数在合理范围 + final_score = min(1.0, max(0.0, final_score)) + + return final_score except Exception as e: logger.warning(f"计算语义相似度失败: {e}") @@ -761,6 +1275,123 @@ class MultiStageRetrieval: "metadata_filtering": {"calls": 0, "avg_time": 0.0}, "vector_search": {"calls": 0, "avg_time": 0.0}, "semantic_reranking": {"calls": 0, "avg_time": 0.0}, - "contextual_filtering": {"calls": 0, "avg_time": 0.0} + "contextual_filtering": {"calls": 0, "avg_time": 0.0}, + "enhanced_reranking": {"calls": 0, "avg_time": 0.0} } - } \ No newline at end of file + } + + async def _enhanced_reranking_stage( + self, + query: str, + user_id: str, + context: Dict[str, Any], + candidate_ids: List[str], + all_memories_cache: Dict[str, MemoryChunk], + limit: int, + *, + debug_log: Optional[Dict[str, Dict[str, Any]]] = None + ) -> StageResult: + """阶段5:增强重排序 - 使用多维度评分模型""" + start_time = time.time() + + try: + if not candidate_ids: + return StageResult( + stage=RetrievalStage.CONTEXTUAL_FILTERING, # 保持与原有枚举兼容 + memory_ids=[], + processing_time=time.time() - start_time, + filtered_count=0, + score_threshold=0.0, + details=[{"note": "no_candidates"}], + ) + + # 准备候选记忆数据 + candidate_memories = [] + for memory_id in candidate_ids: + memory = all_memories_cache.get(memory_id) + if memory: + # 使用原始向量相似度作为基础分数 + vector_similarity = 0.8 # 默认分数,实际应该从前面阶段传递 + candidate_memories.append((memory_id, memory, vector_similarity)) + + if not candidate_memories: + return StageResult( + stage=RetrievalStage.CONTEXTUAL_FILTERING, + memory_ids=[], + processing_time=time.time() - start_time, + filtered_count=len(candidate_ids), + score_threshold=0.0, + details=[{"note": "candidates_not_found_in_cache"}], + ) + + # 使用增强重排序器 + reranked_memories = self.reranker.rerank_memories( + query=query, + candidate_memories=candidate_memories, + context=context, + limit=limit + ) + + # 提取重排序后的记忆ID + result_ids = [memory_id for memory_id, _, _ in reranked_memories] + + # 生成调试详情 + details = [] + for memory_id, memory, final_score in reranked_memories: + detail_entry = { + "memory_id": memory_id, + "final_score": round(final_score, 4), + "status": "reranked", + "memory_type": memory.memory_type.value, + "access_count": memory.metadata.access_count, + } + details.append(detail_entry) + + if debug_log is not None: + stage_entry = debug_log.setdefault(memory_id, {}).setdefault("enhanced_rerank_stage", {}) + stage_entry["final_score"] = round(final_score, 4) + stage_entry["status"] = "reranked" + stage_entry["rank"] = len(details) + + # 记录被过滤的记忆 + kept_ids = set(result_ids) + for memory_id in candidate_ids: + if memory_id not in kept_ids: + detail_entry = { + "memory_id": memory_id, + "status": "filtered_out", + "reason": "ranked_below_limit" + } + details.append(detail_entry) + + if debug_log is not None: + stage_entry = debug_log.setdefault(memory_id, {}).setdefault("enhanced_rerank_stage", {}) + stage_entry["status"] = "filtered_out" + stage_entry["reason"] = "ranked_below_limit" + + filtered_count = len(candidate_ids) - len(result_ids) + + logger.debug( + f"增强重排序完成:候选={len(candidate_ids)}, 返回={len(result_ids)}, " + f"过滤={filtered_count}" + ) + + return StageResult( + stage=RetrievalStage.CONTEXTUAL_FILTERING, # 保持与原有枚举兼容 + memory_ids=result_ids, + processing_time=time.time() - start_time, + filtered_count=filtered_count, + score_threshold=0.0, # 动态阈值,由重排序器决定 + details=details, + ) + + except Exception as e: + logger.error(f"增强重排序阶段失败: {e}", exc_info=True) + return StageResult( + stage=RetrievalStage.CONTEXTUAL_FILTERING, + memory_ids=candidate_ids[:limit], # 失败时返回前limit个 + processing_time=time.time() - start_time, + filtered_count=0, + score_threshold=0.0, + details=[{"error": str(e)}], + ) \ No newline at end of file diff --git a/src/chat/memory_system/vector_storage.py b/src/chat/memory_system/vector_storage.py index 5fd2357f3..73ddbb6a6 100644 --- a/src/chat/memory_system/vector_storage.py +++ b/src/chat/memory_system/vector_storage.py @@ -130,24 +130,32 @@ class VectorStorageManager: async def generate_query_embedding(self, query_text: str) -> Optional[List[float]]: """生成查询向量,用于记忆召回""" if not query_text: + logger.warning("查询文本为空,无法生成向量") return None try: await self.initialize_embedding_model() + logger.debug(f"开始生成查询向量,文本: '{query_text[:50]}{'...' if len(query_text) > 50 else ''}'") + embedding, _ = await self.embedding_model.get_embedding(query_text) if not embedding: + logger.warning("嵌入模型返回空向量") return None + logger.debug(f"生成的向量维度: {len(embedding)}, 期望维度: {self.config.dimension}") + if len(embedding) != self.config.dimension: - logger.warning( + logger.error( "查询向量维度不匹配: 期望 %d, 实际 %d", self.config.dimension, len(embedding) ) return None - return self._normalize_vector(embedding) + normalized_vector = self._normalize_vector(embedding) + logger.debug(f"查询向量生成成功,向量范围: [{min(normalized_vector):.4f}, {max(normalized_vector):.4f}]") + return normalized_vector except Exception as exc: logger.error(f"❌ 生成查询向量失败: {exc}", exc_info=True) @@ -195,19 +203,39 @@ class VectorStorageManager: logger.error(f"❌ 向量存储失败: {e}", exc_info=True) def _prepare_embedding_text(self, memory: MemoryChunk) -> str: - """准备用于嵌入的文本""" - # 构建包含丰富信息的文本 - text_parts = [ - memory.text_content, - f"类型: {memory.memory_type.value}", - f"关键词: {', '.join(memory.keywords)}", - f"标签: {', '.join(memory.tags)}" - ] + """准备用于嵌入的文本,仅使用自然语言展示内容""" + display_text = (memory.display or "").strip() + if display_text: + return display_text - if memory.metadata.emotional_context: - text_parts.append(f"情感: {memory.metadata.emotional_context}") + fallback_text = (memory.text_content or "").strip() + if fallback_text: + return fallback_text - return " | ".join(text_parts) + subjects = "、".join(s.strip() for s in memory.subjects if s and isinstance(s, str)) + predicate = (memory.content.predicate or "").strip() + + obj = memory.content.object + if isinstance(obj, dict): + object_parts = [] + for key, value in obj.items(): + if value is None: + continue + if isinstance(value, (list, tuple)): + preview = "、".join(str(item) for item in value[:3]) + object_parts.append(f"{key}:{preview}") + else: + object_parts.append(f"{key}:{value}") + object_text = ", ".join(object_parts) + else: + object_text = str(obj or "").strip() + + composite_parts = [part for part in [subjects, predicate, object_text] if part] + if composite_parts: + return " ".join(composite_parts) + + logger.debug("记忆 %s 缺少可用展示文本,使用占位符生成嵌入输入", memory.memory_id) + return memory.memory_id async def _batch_generate_and_store_embeddings(self, memory_texts: List[Tuple[str, str]]): """批量生成和存储嵌入向量""" @@ -345,12 +373,16 @@ class VectorStorageManager: start_time = time.time() try: + logger.debug(f"开始向量搜索: query_text='{query_text[:30] if query_text else 'None'}', limit={limit}") + if query_vector is None: if not query_text: + logger.warning("查询向量和查询文本都为空") return [] query_vector = await self.generate_query_embedding(query_text) if not query_vector: + logger.warning("查询向量生成失败") return [] scope_filter: Optional[str] = None @@ -363,6 +395,25 @@ class VectorStorageManager: # 规范化查询向量 query_vector = self._normalize_vector(query_vector) + + logger.debug(f"查询向量维度: {len(query_vector)}, 存储总向量数: {self.storage_stats['total_vectors']}") + + # 检查向量索引状态 + if not self.vector_index: + logger.error("向量索引未初始化") + return [] + + total_vectors = 0 + if hasattr(self.vector_index, 'ntotal'): + total_vectors = self.vector_index.ntotal + elif hasattr(self.vector_index, 'vectors'): + total_vectors = len(self.vector_index.vectors) + + logger.debug(f"向量索引中实际向量数: {total_vectors}") + + if total_vectors == 0: + logger.warning("向量索引为空,无法执行搜索") + return [] # 执行向量搜索 with self._lock: @@ -377,31 +428,55 @@ class VectorStorageManager: # 设置IVF搜索参数 nprobe = min(self.vector_index.nlist, 10) self.vector_index.nprobe = nprobe + logger.debug(f"IVF搜索参数: nprobe={nprobe}") - distances, indices = self.vector_index.search(query_array, min(limit, self.storage_stats["total_vectors"])) + search_limit = min(limit, total_vectors) + logger.debug(f"执行FAISS搜索,搜索限制: {search_limit}") + + distances, indices = self.vector_index.search(query_array, search_limit) distances = distances.flatten().tolist() indices = indices.flatten().tolist() + + logger.debug(f"FAISS搜索结果: {len(distances)} 个距离值, {len(indices)} 个索引") else: # 简单索引 + logger.debug("使用简单向量索引执行搜索") results = self.vector_index.search(query_vector, limit) distances = [score for _, score in results] indices = [idx for idx, _ in results] + logger.debug(f"简单索引搜索结果: {len(results)} 个结果") # 处理搜索结果 results = [] + valid_results = 0 + invalid_indices = 0 + filtered_by_scope = 0 + for distance, index in zip(distances, indices): if index == -1: # FAISS的无效索引标记 + invalid_indices += 1 continue memory_id = self.index_to_memory_id.get(index) - if memory_id: - if scope_filter: - memory = self.memory_cache.get(memory_id) - if memory and str(memory.user_id) != scope_filter: - continue + if not memory_id: + logger.debug(f"索引 {index} 没有对应的记忆ID") + invalid_indices += 1 + continue + + if scope_filter: + memory = self.memory_cache.get(memory_id) + if memory and str(memory.user_id) != scope_filter: + filtered_by_scope += 1 + continue - similarity = max(0.0, min(1.0, distance)) # 确保在0-1范围内 - results.append((memory_id, similarity)) + similarity = max(0.0, min(1.0, distance)) # 确保在0-1范围内 + results.append((memory_id, similarity)) + valid_results += 1 + + logger.debug( + f"搜索结果处理: 总距离={len(distances)}, 有效结果={valid_results}, " + f"无效索引={invalid_indices}, 作用域过滤={filtered_by_scope}" + ) # 更新统计 search_time = time.time() - start_time @@ -411,10 +486,16 @@ class VectorStorageManager: self.storage_stats["total_searches"] ) - return results[:limit] + final_results = results[:limit] + logger.info( + f"向量搜索完成: 查询='{query_text[:20] if query_text else 'vector'}' " + f"耗时={search_time:.3f}s, 返回={len(final_results)}个结果" + ) + + return final_results except Exception as e: - logger.error(f"❌ 向量搜索失败: {e}") + logger.error(f"❌ 向量搜索失败: {e}", exc_info=True) return [] async def get_memory_by_id(self, memory_id: str) -> Optional[MemoryChunk]: @@ -601,21 +682,28 @@ class VectorStorageManager: } # 加载FAISS索引(如果可用) + index_loaded = False if FAISS_AVAILABLE: index_file = self.storage_path / "vector_index.faiss" - if index_file.exists() and hasattr(self.vector_index, 'load'): + if index_file.exists(): try: loaded_index = faiss.read_index(str(index_file)) # 如果索引类型匹配,则替换 if type(loaded_index) == type(self.vector_index): self.vector_index = loaded_index - logger.info("✅ FAISS索引加载完成") + index_loaded = True + logger.info("✅ FAISS索引文件加载完成") else: logger.warning("索引类型不匹配,重新构建索引") - await self._rebuild_index() except Exception as e: logger.warning(f"加载FAISS索引失败: {e},重新构建") - await self._rebuild_index() + else: + logger.info("FAISS索引文件不存在,将重新构建") + + # 如果索引没有成功加载且有向量数据,则重建索引 + if not index_loaded and self.vector_cache: + logger.info(f"检测到 {len(self.vector_cache)} 个向量缓存,重建索引") + await self._rebuild_index() # 加载统计信息 stats_file = self.storage_path / "storage_stats.json" @@ -634,22 +722,69 @@ class VectorStorageManager: async def _rebuild_index(self): """重建向量索引""" try: - logger.info("正在重建向量索引...") + logger.info(f"正在重建向量索引...向量数量: {len(self.vector_cache)}") # 重新初始化索引 self._initialize_index() - # 重新添加所有向量 - for memory_id, embedding in self.vector_cache.items(): - if embedding: - memory = self.memory_cache.get(memory_id) - if memory: - await self._add_single_memory(memory, embedding) + # 清空映射关系 + self.memory_id_to_index.clear() + self.index_to_memory_id.clear() - logger.info("✅ 向量索引重建完成") + if not self.vector_cache: + logger.warning("没有向量缓存数据,跳过重建") + return + + # 准备向量数据 + memory_ids = [] + vectors = [] + + for memory_id, embedding in self.vector_cache.items(): + if embedding and len(embedding) == self.config.dimension: + memory_ids.append(memory_id) + vectors.append(self._normalize_vector(embedding)) + else: + logger.debug(f"跳过无效向量: {memory_id}, 维度: {len(embedding) if embedding else 0}") + + if not vectors: + logger.warning("没有有效的向量数据") + return + + logger.info(f"准备重建 {len(vectors)} 个向量到索引") + + # 批量添加向量到FAISS索引 + if hasattr(self.vector_index, 'add'): + # FAISS索引 + vector_array = np.array(vectors, dtype='float32') + + # 特殊处理IVF索引 + if self.config.index_type == "ivf" and hasattr(self.vector_index, 'train'): + logger.info("训练IVF索引...") + self.vector_index.train(vector_array) + + # 添加向量 + self.vector_index.add(vector_array) + + # 重建映射关系 + for i, memory_id in enumerate(memory_ids): + self.memory_id_to_index[memory_id] = i + self.index_to_memory_id[i] = memory_id + + else: + # 简单索引 + for i, (memory_id, vector) in enumerate(zip(memory_ids, vectors)): + index_id = self.vector_index.add_vector(vector) + self.memory_id_to_index[memory_id] = index_id + self.index_to_memory_id[index_id] = memory_id + + # 更新统计 + self.storage_stats["total_vectors"] = len(self.memory_id_to_index) + + final_count = getattr(self.vector_index, 'ntotal', len(self.memory_id_to_index)) + logger.info(f"✅ 向量索引重建完成,索引中向量数: {final_count}") except Exception as e: - logger.error(f"❌ 重建向量索引失败: {e}") + logger.error(f"❌ 重建向量索引失败: {e}", exc_info=True) async def optimize_storage(self): """优化存储""" diff --git a/src/chat/message_manager/context_manager.py b/src/chat/message_manager/context_manager.py index c05a2ecb6..707830fef 100644 --- a/src/chat/message_manager/context_manager.py +++ b/src/chat/message_manager/context_manager.py @@ -60,7 +60,7 @@ class SingleStreamContextManager: self.last_access_time = time.time() # 启动流的循环任务(如果还未启动) await stream_loop_manager.start_stream_loop(self.stream_id) - logger.info(f"添加消息到单流上下文: {self.stream_id} (兴趣度待计算)") + logger.info(f"添加消息{message.processed_plain_text}到单流上下文: {self.stream_id}") return True except Exception as e: logger.error(f"添加消息到单流上下文失败 {self.stream_id}: {e}", exc_info=True) diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 5aaef4a3b..c7bb53da3 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -329,6 +329,13 @@ class DefaultReplyer: logger.error(f"LLM 生成失败: {llm_e}") return False, None, prompt # LLM 调用失败则无法生成回复 + # 回复生成成功后,异步存储聊天记忆(不阻塞返回) + try: + await self._store_chat_memory_async(reply_to, reply_message) + except Exception as memory_e: + # 记忆存储失败不应该影响回复生成的成功返回 + logger.warning(f"记忆存储失败,但不影响回复生成: {memory_e}") + return True, llm_response, prompt except UserWarning as uw: @@ -571,15 +578,7 @@ class DefaultReplyer: context=memory_context ) - # 异步存储聊天历史(非阻塞) - asyncio.create_task( - remember_message( - message=chat_history, - user_id=memory_user_id, - chat_id=stream.stream_id, - context=memory_context - ) - ) + # 注意:记忆存储已迁移到回复生成完成后进行,不在查询阶段执行 # 转换格式以兼容现有代码 running_memories = [] @@ -1676,6 +1675,143 @@ class DefaultReplyer: logger.error(f"获取AFC关系信息失败: {e}") return f"你与{sender}是普通朋友关系。" + async def _store_chat_memory_async(self, reply_to: str, reply_message: Optional[Dict[str, Any]] = None): + """ + 异步存储聊天记忆(从build_memory_block迁移而来) + + Args: + reply_to: 回复对象 + reply_message: 回复的原始消息 + """ + try: + if not global_config.memory.enable_memory or not global_config.memory.enable_instant_memory: + return + + # 使用增强记忆系统存储记忆 + from src.chat.memory_system.enhanced_memory_integration import remember_message + + stream = self.chat_stream + user_info_obj = getattr(stream, "user_info", None) + group_info_obj = getattr(stream, "group_info", None) + + memory_user_id = str(stream.stream_id) + memory_user_display = None + memory_aliases = [] + user_info_dict = {} + + if user_info_obj is not None: + raw_user_id = getattr(user_info_obj, "user_id", None) + if raw_user_id: + memory_user_id = str(raw_user_id) + + if hasattr(user_info_obj, "to_dict"): + try: + user_info_dict = user_info_obj.to_dict() # type: ignore[attr-defined] + except Exception: + user_info_dict = {} + + candidate_keys = [ + "user_cardname", + "user_nickname", + "nickname", + "remark", + "display_name", + "user_name", + ] + + for key in candidate_keys: + value = user_info_dict.get(key) + if isinstance(value, str) and value.strip(): + stripped = value.strip() + if memory_user_display is None: + memory_user_display = stripped + elif stripped not in memory_aliases: + memory_aliases.append(stripped) + + attr_keys = [ + "user_cardname", + "user_nickname", + "nickname", + "remark", + "display_name", + "name", + ] + + for attr in attr_keys: + value = getattr(user_info_obj, attr, None) + if isinstance(value, str) and value.strip(): + stripped = value.strip() + if memory_user_display is None: + memory_user_display = stripped + elif stripped not in memory_aliases: + memory_aliases.append(stripped) + + alias_values = ( + user_info_dict.get("aliases") + or user_info_dict.get("alias_names") + or user_info_dict.get("alias") + ) + if isinstance(alias_values, (list, tuple, set)): + for alias in alias_values: + if isinstance(alias, str) and alias.strip(): + stripped = alias.strip() + if stripped not in memory_aliases and stripped != memory_user_display: + memory_aliases.append(stripped) + + memory_context = { + "user_id": memory_user_id, + "user_display_name": memory_user_display or "", + "user_name": memory_user_display or "", + "nickname": memory_user_display or "", + "sender_name": memory_user_display or "", + "platform": getattr(stream, "platform", None), + "chat_id": stream.stream_id, + "stream_id": stream.stream_id, + } + + if memory_aliases: + memory_context["user_aliases"] = memory_aliases + + if group_info_obj is not None: + group_name = getattr(group_info_obj, "group_name", None) or getattr(group_info_obj, "group_nickname", None) + if group_name: + memory_context["group_name"] = str(group_name) + group_id = getattr(group_info_obj, "group_id", None) + if group_id: + memory_context["group_id"] = str(group_id) + + memory_context = {key: value for key, value in memory_context.items() if value} + + # 构建聊天历史用于存储 + message_list_before_short = await get_raw_msg_before_timestamp_with_chat( + chat_id=stream.stream_id, + timestamp=time.time(), + limit=int(global_config.chat.max_context_size * 0.33), + ) + chat_history = await build_readable_messages( + message_list_before_short, + replace_bot_name=True, + merge_messages=False, + timestamp_mode="relative", + read_mark=0.0, + show_actions=True, + ) + + # 异步存储聊天历史(完全非阻塞) + asyncio.create_task( + remember_message( + message=chat_history, + user_id=memory_user_id, + chat_id=stream.stream_id, + context=memory_context + ) + ) + + logger.debug(f"已启动记忆存储任务,用户: {memory_user_display or memory_user_id}") + + except Exception as e: + logger.error(f"存储聊天记忆失败: {e}") + def weighted_sample_no_replacement(items, weights, k) -> list: """ diff --git a/src/chat/utils/prompt.py b/src/chat/utils/prompt.py index 3a499766a..7f241ad86 100644 --- a/src/chat/utils/prompt.py +++ b/src/chat/utils/prompt.py @@ -339,6 +339,7 @@ class Prompt: pre_built_params["relation_info_block"] = self.parameters.relation_info_block if self.parameters.memory_block: pre_built_params["memory_block"] = self.parameters.memory_block + logger.debug("使用预构建的memory_block,跳过实时构建") if self.parameters.tool_info_block: pre_built_params["tool_info_block"] = self.parameters.tool_info_block if self.parameters.knowledge_prompt: @@ -351,8 +352,11 @@ class Prompt: tasks.append(self._build_expression_habits()) task_names.append("expression_habits") + # 记忆块应该在回复前预构建,这里优先使用预构建的结果 if self.parameters.enable_memory and not pre_built_params.get("memory_block"): - tasks.append(self._build_memory_block()) + # 如果没有预构建的记忆块,则快速构建一个简化版本 + logger.debug("memory_block未预构建,执行快速构建作为后备方案") + tasks.append(self._build_memory_block_fast()) task_names.append("memory_block") if self.parameters.enable_relation and not pre_built_params.get("relation_info_block"): @@ -373,7 +377,7 @@ class Prompt: # 性能优化 - 为不同任务设置不同的超时时间 task_timeouts = { - "memory_block": 15.0, # 记忆系统 + "memory_block": 15.0, # 记忆系统 - 降低超时时间,鼓励预构建 "tool_info": 15.0, # 工具信息 "relation_info": 10.0, # 关系信息 "knowledge_info": 10.0, # 知识库查询 @@ -575,21 +579,42 @@ class Prompt: instant_memory = None # 构建记忆块 - memory_parts = [] - existing_contents = set() - if running_memories: - memory_parts.append("以下是当前在聊天中,你回忆起的记忆:") - for memory in running_memories: - content = memory["content"] - memory_parts.append(f"- {content}") - existing_contents.add(content) + try: + # 使用记忆格式化器进行格式化 + from src.chat.memory_system.memory_formatter import format_memories_bracket_style + # 转换记忆数据格式 + formatted_memories = [] + for memory in running_memories: + formatted_memories.append({ + "display": memory.get("display", memory.get("content", "")), + "memory_type": memory.get("memory_type", "personal_fact"), + "metadata": memory.get("metadata", {}) + }) + + # 使用方括号格式格式化记忆 + memory_block = format_memories_bracket_style( + formatted_memories, + query_context=self.parameters.target + ) + except Exception as e: + logger.warning(f"记忆格式化失败,使用简化格式: {e}") + # 备用简化格式 + memory_parts = ["以下是当前在聊天中,你回忆起的记忆:"] + for memory in running_memories: + content = memory["content"] + memory_parts.append(f"- {content}") + memory_block = "\n".join(memory_parts) + else: + memory_block = "" + + # 添加即时记忆 if instant_memory: - if instant_memory not in existing_contents: - memory_parts.append(f"- 最相关记忆:{instant_memory}") - - memory_block = "\n".join(memory_parts) if memory_parts else "" + if memory_block: + memory_block += f"\n- 最相关记忆:{instant_memory}" + else: + memory_block = f"- 最相关记忆:{instant_memory}" return {"memory_block": memory_block} @@ -597,6 +622,30 @@ class Prompt: logger.error(f"构建记忆块失败: {e}") return {"memory_block": ""} + async def _build_memory_block_fast(self) -> Dict[str, Any]: + """快速构建记忆块(简化版本,用于未预构建时的后备方案)""" + if not global_config.memory.enable_memory: + return {"memory_block": ""} + + try: + from src.chat.memory_system.enhanced_memory_activator import enhanced_memory_activator + + # 简化的快速查询,只获取即时记忆 + instant_memory = await enhanced_memory_activator.get_instant_memory( + target_message=self.parameters.target, chat_id=self.parameters.chat_id + ) + + if instant_memory: + memory_block = f"- 相关记忆:{instant_memory}" + else: + memory_block = "" + + return {"memory_block": memory_block} + + except Exception as e: + logger.warning(f"快速构建记忆块失败: {e}") + return {"memory_block": ""} + async def _build_relation_info(self) -> Dict[str, Any]: """构建关系信息""" try: diff --git a/src/main.py b/src/main.py index cbb4ddc7b..e0066e137 100644 --- a/src/main.py +++ b/src/main.py @@ -76,59 +76,78 @@ class MainSystem: def signal_handler(signum, frame): logger.info("收到退出信号,正在优雅关闭系统...") - self._cleanup() - sys.exit(0) + + import asyncio + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + # 如果事件循环正在运行,创建任务并设置回调 + async def cleanup_and_exit(): + await self._async_cleanup() + sys.exit(0) + + task = asyncio.create_task(cleanup_and_exit()) + # 添加任务完成回调,确保程序退出 + task.add_done_callback(lambda t: None) + else: + # 如果事件循环未运行,使用同步清理 + self._cleanup() + sys.exit(0) + except Exception as e: + logger.error(f"信号处理失败: {e}") + sys.exit(1) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - def _cleanup(self): - """清理资源""" + async def _async_cleanup(self): + """异步清理资源""" try: # 停止消息管理器 - from src.chat.message_manager import message_manager - import asyncio + try: + from src.chat.message_manager import message_manager + await message_manager.stop() + logger.info("🛑 消息管理器已停止") + except Exception as e: + logger.error(f"停止消息管理器时出错: {e}") - loop = asyncio.get_event_loop() - if loop.is_running(): - asyncio.create_task(message_manager.stop()) - else: - loop.run_until_complete(message_manager.stop()) - logger.info("🛑 消息管理器已停止") - except Exception as e: - logger.error(f"停止消息管理器时出错: {e}") - - try: # 停止消息重组器 - from src.plugin_system.core.event_manager import event_manager - from src.plugin_system import EventType - asyncio.run(event_manager.trigger_event(EventType.ON_STOP, permission_group="SYSTEM")) - - from src.utils.message_chunker import reassembler + try: + from src.plugin_system.core.event_manager import event_manager + from src.plugin_system import EventType + from src.utils.message_chunker import reassembler + await event_manager.trigger_event(EventType.ON_STOP, permission_group="SYSTEM") + await reassembler.stop_cleanup_task() + logger.info("🛑 消息重组器已停止") + except Exception as e: + logger.error(f"停止消息重组器时出错: {e}") + + # 停止增强记忆系统 + try: + if global_config.memory.enable_memory: + await self.enhanced_memory_manager.shutdown() + logger.info("🛑 增强记忆系统已停止") + except Exception as e: + logger.error(f"停止增强记忆系统时出错: {e}") + + except Exception as e: + logger.error(f"异步清理资源时出错: {e}") + + def _cleanup(self): + """同步清理资源(向后兼容)""" + import asyncio + + try: loop = asyncio.get_event_loop() if loop.is_running(): - asyncio.create_task(reassembler.stop_cleanup_task()) + # 如果循环正在运行,创建异步清理任务 + asyncio.create_task(self._async_cleanup()) else: - loop.run_until_complete(reassembler.stop_cleanup_task()) - logger.info("🛑 消息重组器已停止") + # 如果循环未运行,直接运行异步清理 + loop.run_until_complete(self._async_cleanup()) except Exception as e: - logger.error(f"停止消息重组器时出错: {e}") - - - try: - # 停止增强记忆系统 - if global_config.memory.enable_memory: - import asyncio - - loop = asyncio.get_event_loop() - if loop.is_running(): - asyncio.create_task(self.enhanced_memory_manager.shutdown()) - else: - loop.run_until_complete(self.enhanced_memory_manager.shutdown()) - logger.info("🛑 增强记忆系统已停止") - except Exception as e: - logger.error(f"停止增强记忆系统时出错: {e}") + logger.error(f"同步清理资源时出错: {e}") async def _message_process_wrapper(self, message_data: Dict[str, Any]): """并行处理消息的包装器""" diff --git a/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py b/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py index 44566b86d..3a652e2f4 100644 --- a/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py +++ b/src/plugins/built_in/affinity_flow_chatter/affinity_chatter.py @@ -77,7 +77,7 @@ class AffinityChatter(BaseChatter): # 执行动作(如果规划器返回了动作) execution_result = {"executed_count": len(actions) if actions else 0} if actions: - logger.info(f"聊天流 {self.stream_id} 生成了 {len(actions)} 个动作") + logger.debug(f"聊天流 {self.stream_id} 生成了 {len(actions)} 个动作") # 更新统计 self.stats["messages_processed"] += 1 diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index a8755c09e..c4f2b00e1 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -263,9 +263,9 @@ enhanced_memory_auto_save = true # 是否自动保存增强记忆 min_memory_length = 10 # 最小记忆长度 max_memory_length = 500 # 最大记忆长度 -memory_value_threshold = 0.7 # 记忆价值阈值,低于该值的记忆会被丢弃 -vector_similarity_threshold = 0.8 # 向量相似度阈值 -semantic_similarity_threshold = 0.6 # 语义重排阶段的最低匹配阈值 +memory_value_threshold = 0.5 # 记忆价值阈值,低于该值的记忆会被丢弃 +vector_similarity_threshold = 0.4 # 向量相似度阈值 +semantic_similarity_threshold = 0.4 # 语义重排阶段的最低匹配阈值 metadata_filter_limit = 100 # 元数据过滤阶段返回数量上限 vector_search_limit = 50 # 向量搜索阶段返回数量上限 @@ -277,7 +277,7 @@ semantic_weight = 0.3 # 综合评分中语义匹配的权重 context_weight = 0.2 # 综合评分中上下文关联的权重 recency_weight = 0.1 # 综合评分中时效性的权重 -fusion_similarity_threshold = 0.85 # 记忆融合时的相似度阈值 +fusion_similarity_threshold = 0.6 # 记忆融合时的相似度阈值 deduplication_window_hours = 24 # 记忆去重窗口(小时) enable_memory_cache = true # 是否启用本地记忆缓存