diff --git a/src/chat/message_manager/context_manager.py b/src/chat/message_manager/context_manager.py index 97b0792e3..f9e39f019 100644 --- a/src/chat/message_manager/context_manager.py +++ b/src/chat/message_manager/context_manager.py @@ -31,7 +31,7 @@ def _get_unified_memory_manager(): global _unified_memory_manager if _unified_memory_manager is None: try: - from src.memory_graph.three_tier.manager_singleton import get_unified_memory_manager + from src.memory_graph.manager_singleton import get_unified_memory_manager _unified_memory_manager = get_unified_memory_manager() except Exception as e: diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 6818e44b9..953770a4b 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -564,7 +564,7 @@ class DefaultReplyer: return f"{expression_habits_title}\n{expression_habits_block}" async def build_memory_block(self, chat_history: str, target: str) -> str: - """构建记忆块 + """构建记忆块(使用三层记忆系统) Args: chat_history: 聊天历史记录 @@ -573,149 +573,13 @@ class DefaultReplyer: Returns: str: 记忆信息字符串 """ - # 使用新的记忆图系统检索记忆(带智能查询优化) - all_memories = [] - try: - from src.memory_graph.manager_singleton import get_memory_manager, is_initialized - - if is_initialized(): - manager = get_memory_manager() - if manager: - # 构建查询上下文 - stream = self.chat_stream - user_info_obj = getattr(stream, "user_info", None) - sender_name = "" - if user_info_obj: - sender_name = getattr(user_info_obj, "user_nickname", "") or getattr(user_info_obj, "user_cardname", "") - - # 格式化聊天历史为更友好的格式 - formatted_history = "" - if chat_history: - # 移除过长的历史记录,只保留最近部分 - lines = chat_history.strip().split("\n") - recent_lines = lines[-10:] if len(lines) > 10 else lines - formatted_history = "\n".join(recent_lines) - - query_context = { - "chat_history": formatted_history, - "sender": sender_name, - } - - # 使用记忆管理器的智能检索(多查询策略) - memories = [] - if global_config.memory: - memories = [] - if global_config.memory: - top_k = global_config.memory.search_top_k - min_importance = global_config.memory.search_min_importance - memories = await manager.search_memories( - query=target, - top_k=top_k, - min_importance=min_importance, - include_forgotten=False, - use_multi_query=True, - context=query_context, - ) - - if memories: - logger.info(f"[记忆图] 检索到 {len(memories)} 条相关记忆") - - # 使用新的格式化工具构建完整的记忆描述 - from src.memory_graph.utils.memory_formatter import ( - format_memory_for_prompt, - get_memory_type_label, - ) - - for memory in memories: - # 使用格式化工具生成完整的主谓宾描述 - content = format_memory_for_prompt(memory, include_metadata=False) - - # 获取记忆类型 - mem_type = memory.memory_type.value if memory.memory_type else "未知" - - if content: - all_memories.append({ - "content": content, - "memory_type": mem_type, - "importance": memory.importance, - "relevance": 0.7, - "source": "memory_graph", - }) - logger.debug(f"[记忆构建] 格式化记忆: [{mem_type}] {content[:50]}...") - else: - logger.debug("[记忆图] 未找到相关记忆") - except Exception as e: - logger.debug(f"[记忆图] 检索失败: {e}") - all_memories = [] - - # 构建记忆字符串,使用方括号格式 - memory_str = "" - has_any_memory = False - - # 添加长期记忆(来自记忆图系统) - if all_memories: - # 使用方括号格式 - memory_parts = ["### 🧠 相关记忆 (Relevant Memories)", ""] - - # 按相关度排序,并记录相关度信息用于调试 - sorted_memories = sorted(all_memories, key=lambda x: x.get("relevance", 0.0), reverse=True) - - # 调试相关度信息 - relevance_info = [(m.get("memory_type", "unknown"), m.get("relevance", 0.0)) for m in sorted_memories] - logger.debug(f"记忆相关度信息: {relevance_info}") - logger.debug(f"[记忆构建] 准备将 {len(sorted_memories)} 条记忆添加到提示词") - - for idx, running_memory in enumerate(sorted_memories, 1): - content = running_memory.get("content", "") - memory_type = running_memory.get("memory_type", "unknown") - - # 跳过空内容 - if not content or not content.strip(): - logger.warning(f"[记忆构建] 跳过第 {idx} 条记忆:内容为空 (type={memory_type})") - logger.debug(f"[记忆构建] 空记忆详情: {running_memory}") - continue - - # 使用记忆图的类型映射(优先)或全局映射 - try: - from src.memory_graph.utils.memory_formatter import get_memory_type_label - chinese_type = get_memory_type_label(memory_type) - except ImportError: - # 回退到全局映射 - chinese_type = get_memory_type_chinese_label(memory_type) - - # 提取纯净内容(如果包含旧格式的元数据) - clean_content = content - if "(类型:" in content and ")" in content: - clean_content = content.split("(类型:")[0].strip() - - logger.debug(f"[记忆构建] 添加第 {idx} 条记忆: [{chinese_type}] {clean_content[:50]}...") - memory_parts.append(f"- **[{chinese_type}]** {clean_content}") - - memory_str = "\n".join(memory_parts) + "\n" - has_any_memory = True - logger.debug(f"[记忆构建] 成功构建记忆字符串,包含 {len(memory_parts) - 2} 条记忆") - - # 瞬时记忆由另一套系统处理,这里不再添加 - - # 只有当完全没有任何记忆时才返回空字符串 - return memory_str if has_any_memory else "" - - async def build_three_tier_memory_block(self, chat_history: str, target: str) -> str: - """构建三层记忆块(感知记忆 + 短期记忆 + 长期记忆) - - Args: - chat_history: 聊天历史记录 - target: 目标消息内容 - - Returns: - str: 三层记忆信息字符串 - """ # 检查是否启用三层记忆系统 if not (global_config.three_tier_memory and global_config.three_tier_memory.enable): return "" try: - from src.memory_graph.three_tier.manager_singleton import get_unified_memory_manager + from src.memory_graph.manager_singleton import get_unified_memory_manager + from src.memory_graph.utils.memory_formatter import format_memory_for_prompt unified_manager = get_unified_memory_manager() if not unified_manager: @@ -737,45 +601,45 @@ class DefaultReplyer: short_term_memories = search_result.get("short_term_memories", []) long_term_memories = search_result.get("long_term_memories", []) - memory_parts = ["### 🔮 三层记忆系统 (Three-Tier Memory)", ""] + memory_parts = ["### 🧠 相关记忆 (Relevant Memories)", ""] # 添加感知记忆(最近的消息块) if perceptual_blocks: - memory_parts.append("#### 🌊 感知记忆 (Perceptual Memory)") + memory_parts.append("#### 🌊 感知记忆") for block in perceptual_blocks[:2]: # 最多显示2个块 - # MemoryBlock 对象有 messages 属性(列表) messages = block.messages if hasattr(block, 'messages') else [] if messages: - block_content = " → ".join([f"{msg.get('sender_name', msg.get('sender_id', ''))}: {msg.get('content', '')[:30]}" for msg in messages[:3]]) + block_content = " → ".join([ + f"{msg.get('sender_name', msg.get('sender_id', ''))}: {msg.get('content', '')[:30]}" + for msg in messages[:3] + ]) memory_parts.append(f"- {block_content}") memory_parts.append("") # 添加短期记忆(结构化活跃记忆) if short_term_memories: - memory_parts.append("#### 💭 短期记忆 (Short-Term Memory)") + memory_parts.append("#### 💭 短期记忆") for mem in short_term_memories[:3]: # 最多显示3条 - # ShortTermMemory 对象有属性而非字典 - if hasattr(mem, 'subject') and hasattr(mem, 'topic') and hasattr(mem, 'object'): - subject = mem.subject or "" - topic = mem.topic or "" - obj = mem.object or "" - content = f"{subject} {topic} {obj}" if all([subject, topic, obj]) else (mem.content if hasattr(mem, 'content') else str(mem)) - else: - content = mem.content if hasattr(mem, 'content') else str(mem) - memory_parts.append(f"- {content}") + content = format_memory_for_prompt(mem, include_metadata=False) + if content: + memory_parts.append(f"- {content}") memory_parts.append("") # 添加长期记忆(图谱记忆) if long_term_memories: - memory_parts.append("#### 🧠 长期记忆 (Long-Term Memory)") + memory_parts.append("#### 🗄️ 长期记忆") for mem in long_term_memories[:3]: # 最多显示3条 - # Memory 对象有 content 属性 - content = mem.content if hasattr(mem, 'content') else str(mem) - memory_parts.append(f"- {content}") + content = format_memory_for_prompt(mem, include_metadata=False) + if content: + memory_parts.append(f"- {content}") memory_parts.append("") total_count = len(perceptual_blocks) + len(short_term_memories) + len(long_term_memories) - logger.info(f"[三层记忆] 检索到 {total_count} 条记忆 (感知:{len(perceptual_blocks)}, 短期:{len(short_term_memories)}, 长期:{len(long_term_memories)})") + if total_count > 0: + logger.info( + f"[三层记忆] 检索到 {total_count} 条记忆 " + f"(感知:{len(perceptual_blocks)}, 短期:{len(short_term_memories)}, 长期:{len(long_term_memories)})" + ) return "\n".join(memory_parts) if len(memory_parts) > 2 else "" @@ -783,6 +647,8 @@ class DefaultReplyer: logger.error(f"[三层记忆] 检索失败: {e}", exc_info=True) return "" + + async def build_tool_info(self, chat_history: str, sender: str, target: str, enable_tool: bool = True) -> str: """构建工具信息块 @@ -1405,9 +1271,6 @@ class DefaultReplyer: "memory_block": asyncio.create_task( self._time_and_run_task(self.build_memory_block(chat_talking_prompt_short, target), "memory_block") ), - "three_tier_memory": asyncio.create_task( - self._time_and_run_task(self.build_three_tier_memory_block(chat_talking_prompt_short, target), "three_tier_memory") - ), "tool_info": asyncio.create_task( self._time_and_run_task( self.build_tool_info(chat_talking_prompt_short, sender, target, enable_tool=enable_tool), diff --git a/src/config/api_ada_configs.py b/src/config/api_ada_configs.py index d5478f8b4..bc500413a 100644 --- a/src/config/api_ada_configs.py +++ b/src/config/api_ada_configs.py @@ -148,6 +148,12 @@ class ModelTaskConfig(ValidatedConfigBase): relationship_tracker: TaskConfig = Field(..., description="关系追踪模型配置") # 处理配置文件中命名不一致的问题 utils_video: TaskConfig = Field(..., description="视频分析模型配置(兼容配置文件中的命名)") + + # 记忆系统专用模型配置 + memory_short_term_builder: TaskConfig = Field(..., description="短期记忆构建模型配置(感知→短期格式化)") + memory_short_term_decider: TaskConfig = Field(..., description="短期记忆决策模型配置(合并/更新/新建/丢弃)") + memory_long_term_builder: TaskConfig = Field(..., description="长期记忆构建模型配置(短期→长期图结构)") + memory_judge: TaskConfig = Field(..., description="记忆检索裁判模型配置(判断检索是否充足)") @property def video_analysis(self) -> TaskConfig: diff --git a/src/main.py b/src/main.py index 4231b44e2..c6a02e1a7 100644 --- a/src/main.py +++ b/src/main.py @@ -249,7 +249,7 @@ class MainSystem: # 停止增强记忆系统 # 停止三层记忆系统 try: - from src.memory_graph.three_tier.manager_singleton import get_unified_memory_manager, shutdown_unified_memory_manager + from src.memory_graph.manager_singleton import get_unified_memory_manager, shutdown_unified_memory_manager if get_unified_memory_manager(): cleanup_tasks.append(("三层记忆系统", shutdown_unified_memory_manager())) @@ -480,7 +480,7 @@ MoFox_Bot(第三方修改版) # 初始化三层记忆系统(如果启用) try: if global_config.three_tier_memory and global_config.three_tier_memory.enable: - from src.memory_graph.three_tier.manager_singleton import initialize_unified_memory_manager + from src.memory_graph.manager_singleton import initialize_unified_memory_manager logger.info("三层记忆系统已启用,正在初始化...") await initialize_unified_memory_manager() logger.info("三层记忆系统初始化成功") diff --git a/src/memory_graph/three_tier/long_term_manager.py b/src/memory_graph/long_term_manager.py similarity index 98% rename from src/memory_graph/three_tier/long_term_manager.py rename to src/memory_graph/long_term_manager.py index 328d08e5d..9f6a2671f 100644 --- a/src/memory_graph/three_tier/long_term_manager.py +++ b/src/memory_graph/long_term_manager.py @@ -17,7 +17,7 @@ from typing import Any from src.common.logger import get_logger from src.memory_graph.manager import MemoryManager from src.memory_graph.models import Memory, MemoryType, NodeType -from src.memory_graph.three_tier.models import GraphOperation, GraphOperationType, ShortTermMemory +from src.memory_graph.models import GraphOperation, GraphOperationType, ShortTermMemory logger = get_logger(__name__) @@ -249,9 +249,9 @@ class LongTermMemoryManager: # 构建提示词 prompt = self._build_graph_operation_prompt(stm, similar_memories) - # 调用 LLM + # 调用长期记忆构建模型 llm = LLMRequest( - model_set=model_config.model_task_config.utils_small, + model_set=model_config.model_task_config.memory_long_term_builder, request_type="long_term_memory.graph_operations", ) @@ -509,6 +509,10 @@ class LongTermMemoryManager: async def _execute_update_memory(self, op: GraphOperation) -> None: """执行更新记忆操作""" memory_id = op.target_id + if not memory_id: + logger.error("更新操作缺少目标记忆ID") + return + updates = op.parameters.get("updated_fields", {}) success = await self.memory_manager.update_memory(memory_id, **updates) diff --git a/src/memory_graph/manager.py b/src/memory_graph/manager.py index cb8fe8185..5bc0c55d1 100644 --- a/src/memory_graph/manager.py +++ b/src/memory_graph/manager.py @@ -1134,47 +1134,47 @@ class MemoryManager: max_batch_size: int = 50, ) -> dict[str, Any]: """ - 整理记忆:直接合并去重相似记忆(不创建新边) - - 性能优化版本: - 1. 使用 asyncio.create_task 在后台执行,避免阻塞主流程 - 2. 向量计算批量处理,减少重复计算 - 3. 延迟保存,批量写入数据库 - 4. 更频繁的协作式多任务让出 + 简化的记忆整理:仅检查需要遗忘的记忆并清理孤立节点和边 + + 功能: + 1. 检查需要遗忘的记忆(低激活度) + 2. 清理孤立节点和边 + + 注意:记忆的创建、合并、关联等操作已由三级记忆系统自动处理 Args: - similarity_threshold: 相似度阈值(默认0.85,建议提高到0.9减少误判) - time_window_hours: 时间窗口(小时) - max_batch_size: 单次最多处理的记忆数量 + similarity_threshold: (已废弃,保留参数兼容性) + time_window_hours: (已废弃,保留参数兼容性) + max_batch_size: (已废弃,保留参数兼容性) Returns: - 整理结果(如果是异步执行,返回启动状态) + 整理结果 """ if not self._initialized: await self.initialize() try: - logger.info(f"🚀 启动记忆整理任务 (similarity_threshold={similarity_threshold}, time_window={time_window_hours}h, max_batch={max_batch_size})...") + logger.info("🧹 开始记忆整理:检查遗忘 + 清理孤立节点...") - # 创建后台任务执行整理 - task = asyncio.create_task( - self._consolidate_memories_background( - similarity_threshold=similarity_threshold, - time_window_hours=time_window_hours, - max_batch_size=max_batch_size - ) - ) + # 步骤1: 自动遗忘低激活度的记忆 + forgotten_count = await self.auto_forget() - # 返回任务启动状态,不等待完成 - return { - "task_started": True, - "task_id": id(task), - "message": "记忆整理任务已在后台启动" + # 步骤2: 清理孤立节点和边(auto_forget内部已执行,这里再次确保) + orphan_nodes, orphan_edges = await self._cleanup_orphan_nodes_and_edges() + + result = { + "forgotten_count": forgotten_count, + "orphan_nodes_cleaned": orphan_nodes, + "orphan_edges_cleaned": orphan_edges, + "message": "记忆整理完成(仅遗忘和清理孤立节点)" } + logger.info(f"✅ 记忆整理完成: {result}") + return result + except Exception as e: - logger.error(f"启动记忆整理任务失败: {e}", exc_info=True) - return {"error": str(e), "task_started": False} + logger.error(f"记忆整理失败: {e}", exc_info=True) + return {"error": str(e), "forgotten_count": 0} async def _consolidate_memories_background( self, @@ -1183,294 +1183,30 @@ class MemoryManager: max_batch_size: int, ) -> None: """ - 后台执行记忆整理的具体实现 (完整版) - - 流程: - 1. 获取时间窗口内的记忆 - 2. 重要性过滤 - 3. 向量检索关联记忆 - 4. 分批交给LLM分析关系 - 5. 统一更新记忆数据 - - 这个方法会在独立任务中运行,不阻塞主流程 + 后台整理任务(已简化为调用consolidate_memories) + + 保留此方法用于向后兼容 """ - try: - result = { - "merged_count": 0, - "checked_count": 0, - "skipped_count": 0, - "linked_count": 0, - "importance_filtered": 0, - } + await self.consolidate_memories( + similarity_threshold=similarity_threshold, + time_window_hours=time_window_hours, + max_batch_size=max_batch_size + ) - # ===== 步骤1: 获取时间窗口内的记忆 ===== - cutoff_time = datetime.now() - timedelta(hours=time_window_hours) - all_memories = self.graph_store.get_all_memories() + # ==================== 以下方法已废弃 ==================== + # 旧的记忆整理逻辑(去重、自动关联等)已由三级记忆系统取代 + # 保留方法签名用于向后兼容,但不再执行复杂操作 - recent_memories = [ - mem for mem in all_memories - if mem.created_at >= cutoff_time and not mem.metadata.get("forgotten", False) - ] - - if not recent_memories: - logger.info("✅ 记忆整理完成: 没有需要整理的记忆") - return - - logger.info(f"📋 步骤1: 找到 {len(recent_memories)} 条时间窗口内的记忆") - - # ===== 步骤2: 重要性过滤 ===== - min_importance_for_consolidation = getattr(self.config, "consolidation_min_importance", 0.3) - important_memories = [ - mem for mem in recent_memories - if mem.importance >= min_importance_for_consolidation - ] - - result["importance_filtered"] = len(recent_memories) - len(important_memories) - logger.info( - f"📊 步骤2: 重要性过滤 (阈值={min_importance_for_consolidation:.2f}): " - f"{len(recent_memories)} → {len(important_memories)} 条记忆" - ) - - if not important_memories: - logger.info("✅ 记忆整理完成: 没有重要的记忆需要整理") - return - - # 限制批量处理数量 - if len(important_memories) > max_batch_size: - logger.info(f"📊 记忆数量 {len(important_memories)} 超过批量限制 {max_batch_size},仅处理最新的 {max_batch_size} 条") - important_memories = sorted(important_memories, key=lambda m: m.created_at, reverse=True)[:max_batch_size] - result["skipped_count"] = len(important_memories) - max_batch_size - - result["checked_count"] = len(important_memories) - - # ===== 步骤3: 去重(相似记忆合并)===== - # 按记忆类型分组,减少跨类型比较 - memories_by_type: dict[str, list[Memory]] = {} - for mem in important_memories: - mem_type = mem.metadata.get("memory_type", "") - if mem_type not in memories_by_type: - memories_by_type[mem_type] = [] - memories_by_type[mem_type].append(mem) - - # 记录需要删除的记忆,延迟批量删除 - to_delete: list[tuple[Memory, str]] = [] # (memory, reason) - deleted_ids = set() - - # 对每个类型的记忆进行相似度检测(去重) - logger.info("📍 步骤3: 开始相似记忆去重...") - for mem_type, memories in memories_by_type.items(): - if len(memories) < 2: - continue - - logger.debug(f"🔍 检查类型 '{mem_type}' 的 {len(memories)} 条记忆") - - # 预提取所有主题节点的嵌入向量 - embeddings_map: dict[str, "np.ndarray"] = {} - valid_memories = [] - - for mem in memories: - topic_node = next((n for n in mem.nodes if n.node_type == NodeType.TOPIC), None) - if topic_node and topic_node.embedding is not None: - embeddings_map[mem.id] = topic_node.embedding - valid_memories.append(mem) - - # 批量计算相似度矩阵(比逐个计算更高效) - for i in range(len(valid_memories)): - # 更频繁的协作式多任务让出 - if i % 5 == 0: - await asyncio.sleep(0.001) # 1ms让出 - - mem_i = valid_memories[i] - if mem_i.id in deleted_ids: - continue - - for j in range(i + 1, len(valid_memories)): - if valid_memories[j].id in deleted_ids: - continue - - mem_j = valid_memories[j] - - # 快速向量相似度计算 - embedding_i = embeddings_map[mem_i.id] - embedding_j = embeddings_map[mem_j.id] - - # 优化的余弦相似度计算 - similarity = cosine_similarity(embedding_i, embedding_j) - - if similarity >= similarity_threshold: - # 决定保留哪个记忆 - if mem_i.importance >= mem_j.importance: - keep_mem, remove_mem = mem_i, mem_j - else: - keep_mem, remove_mem = mem_j, mem_i - - logger.debug( - f"🔄 标记相似记忆 (similarity={similarity:.3f}): " - f"保留 {keep_mem.id[:8]}, 删除 {remove_mem.id[:8]}" - ) - - # 增强保留记忆的重要性 - keep_mem.importance = min(1.0, keep_mem.importance + 0.05) - - # 累加访问次数 - if hasattr(keep_mem, "access_count") and hasattr(remove_mem, "access_count"): - keep_mem.access_count += remove_mem.access_count - - # 标记为待删除(不立即删除) - to_delete.append((remove_mem, f"与记忆 {keep_mem.id[:8]} 相似度 {similarity:.3f}")) - deleted_ids.add(remove_mem.id) - result["merged_count"] += 1 - - # 每处理完一个类型就让出控制权 - await asyncio.sleep(0.005) # 5ms让出 - - # 批量删除标记的记忆 - if to_delete: - logger.info(f"🗑️ 批量删除 {len(to_delete)} 条相似记忆") - - for memory, reason in to_delete: - try: - # 从向量存储删除节点 - for node in memory.nodes: - if node.embedding is not None: - await self.vector_store.delete_node(node.id) - - # 从图存储删除记忆 - self.graph_store.remove_memory(memory.id) - - except Exception as e: - logger.warning(f"删除记忆 {memory.id[:8]} 失败: {e}") - - # 批量保存(一次性写入,减少I/O,异步执行) - asyncio.create_task(self._async_save_graph_store("记忆去重")) - logger.info("💾 去重保存任务已启动") - - # ===== 步骤4: 向量检索关联记忆 + LLM分析关系 ===== - # 过滤掉已删除的记忆 - remaining_memories = [m for m in important_memories if m.id not in deleted_ids] - - if not remaining_memories: - logger.info("✅ 记忆整理完成: 去重后无剩余记忆") - return - - logger.info(f"📍 步骤4: 开始关联分析 ({len(remaining_memories)} 条记忆)...") - - # 分批处理记忆关联 - llm_batch_size = getattr(self.config, "consolidation_llm_batch_size", 10) - max_candidates_per_memory = getattr(self.config, "consolidation_max_candidates", 5) - min_confidence = getattr(self.config, "consolidation_min_confidence", 0.6) - - all_new_edges = [] # 收集所有新建的边 - - for batch_start in range(0, len(remaining_memories), llm_batch_size): - batch_end = min(batch_start + llm_batch_size, len(remaining_memories)) - batch = remaining_memories[batch_start:batch_end] - - logger.debug(f"处理批次 {batch_start//llm_batch_size + 1}/{(len(remaining_memories)-1)//llm_batch_size + 1}") - - for memory in batch: - # 跳过已经有很多连接的记忆 - existing_edges = len([ - e for e in memory.edges - if e.edge_type == EdgeType.RELATION - ]) - if existing_edges >= 10: - continue - - # 使用向量搜索找候选关联记忆 - candidates = await self._find_link_candidates( - memory, - exclude_ids={memory.id} | deleted_ids, - max_results=max_candidates_per_memory - ) - - if not candidates: - continue - - # 使用LLM分析关系 - relations = await self._analyze_memory_relations( - source_memory=memory, - candidate_memories=candidates, - min_confidence=min_confidence - ) - - # 建立关联边 - for relation in relations: - try: - # 创建关联边 - edge = MemoryEdge( - id=f"edge_{uuid.uuid4().hex[:12]}", - source_id=memory.subject_id, - target_id=relation["target_memory"].subject_id, - relation=relation["relation_type"], - edge_type=EdgeType.RELATION, - importance=relation["confidence"], - metadata={ - "auto_linked": True, - "confidence": relation["confidence"], - "reasoning": relation["reasoning"], - "created_at": datetime.now().isoformat(), - "created_by": "consolidation", - } - ) - - all_new_edges.append((memory, edge, relation)) - result["linked_count"] += 1 - - except Exception as e: - logger.warning(f"创建关联边失败: {e}") - continue - - # 每个批次后让出控制权 - await asyncio.sleep(0.01) - - # ===== 步骤5: 统一更新记忆数据 ===== - if all_new_edges: - logger.info(f"📍 步骤5: 统一更新 {len(all_new_edges)} 条新关联边...") - - for memory, edge, relation in all_new_edges: - try: - # 添加到图 - self.graph_store.graph.add_edge( - edge.source_id, - edge.target_id, - edge_id=edge.id, - relation=edge.relation, - edge_type=edge.edge_type.value, - importance=edge.importance, - metadata=edge.metadata, - ) - - # 同时添加到记忆的边列表 - memory.edges.append(edge) - - logger.debug( - f"✓ {memory.id[:8]} --[{relation['relation_type']}]--> " - f"{relation['target_memory'].id[:8]} (置信度={relation['confidence']:.2f})" - ) - - except Exception as e: - logger.warning(f"添加边到图失败: {e}") - - # 批量保存更新(异步执行) - asyncio.create_task(self._async_save_graph_store("记忆关联边")) - logger.info("💾 关联边保存任务已启动") - - logger.info(f"✅ 记忆整理完成: {result}") - - except Exception as e: - logger.error(f"❌ 记忆整理失败: {e}", exc_info=True) - - async def auto_link_memories( + async def auto_link_memories( # 已废弃 self, time_window_hours: float | None = None, max_candidates: int | None = None, min_confidence: float | None = None, ) -> dict[str, Any]: """ - 自动关联记忆 + 自动关联记忆(已废弃) - 使用LLM分析记忆之间的关系,自动建立关联边。 + 该功能已由三级记忆系统取代。记忆之间的关联现在通过模型自动处理。 Args: time_window_hours: 分析时间窗口(小时) @@ -1478,196 +1214,35 @@ class MemoryManager: min_confidence: 最低置信度阈值 Returns: - 关联结果统计 + 空结果(向后兼容) """ - if not self._initialized: - await self.initialize() + logger.warning("auto_link_memories 已废弃,记忆关联由三级记忆系统自动处理") + return {"checked_count": 0, "linked_count": 0, "deprecated": True} - # 使用配置值或参数覆盖 - time_window_hours = time_window_hours if time_window_hours is not None else 24 - max_candidates = max_candidates if max_candidates is not None else getattr(self.config, "auto_link_max_candidates", 10) - min_confidence = min_confidence if min_confidence is not None else getattr(self.config, "auto_link_min_confidence", 0.7) - - try: - logger.info(f"开始自动关联记忆 (时间窗口={time_window_hours}h)...") - - result = { - "checked_count": 0, - "linked_count": 0, - "relation_stats": {}, # 关系类型统计 {类型: 数量} - "relations": {}, # 详细关系 {source_id: [关系列表]} - } - - # 1. 获取时间窗口内的记忆 - time_threshold = datetime.now() - timedelta(hours=time_window_hours) - all_memories = self.graph_store.get_all_memories() - - recent_memories = [ - mem for mem in all_memories - if mem.created_at >= time_threshold - and not mem.metadata.get("forgotten", False) - ] - - if len(recent_memories) < 2: - logger.info("记忆数量不足,跳过自动关联") - return result - - logger.info(f"找到 {len(recent_memories)} 条待关联记忆") - - # 2. 为每个记忆寻找关联候选 - for memory in recent_memories: - result["checked_count"] += 1 - - # 跳过已经有很多连接的记忆 - existing_edges = len([ - e for e in memory.edges - if e.edge_type == EdgeType.RELATION - ]) - if existing_edges >= 10: - continue - - # 3. 使用向量搜索找候选记忆 - candidates = await self._find_link_candidates( - memory, - exclude_ids={memory.id}, - max_results=max_candidates - ) - - if not candidates: - continue - - # 4. 使用LLM分析关系 - relations = await self._analyze_memory_relations( - source_memory=memory, - candidate_memories=candidates, - min_confidence=min_confidence - ) - - # 5. 建立关联 - for relation in relations: - try: - # 创建关联边 - edge = MemoryEdge( - id=f"edge_{uuid.uuid4().hex[:12]}", - source_id=memory.subject_id, - target_id=relation["target_memory"].subject_id, - relation=relation["relation_type"], - edge_type=EdgeType.RELATION, - importance=relation["confidence"], - metadata={ - "auto_linked": True, - "confidence": relation["confidence"], - "reasoning": relation["reasoning"], - "created_at": datetime.now().isoformat(), - } - ) - - # 添加到图 - self.graph_store.graph.add_edge( - edge.source_id, - edge.target_id, - edge_id=edge.id, - relation=edge.relation, - edge_type=edge.edge_type.value, - importance=edge.importance, - metadata=edge.metadata, - ) - - # 同时添加到记忆的边列表 - memory.edges.append(edge) - - result["linked_count"] += 1 - - # 更新统计 - result["relation_stats"][relation["relation_type"]] = \ - result["relation_stats"].get(relation["relation_type"], 0) + 1 - - # 记录详细关系 - if memory.id not in result["relations"]: - result["relations"][memory.id] = [] - result["relations"][memory.id].append({ - "target_id": relation["target_memory"].id, - "relation_type": relation["relation_type"], - "confidence": relation["confidence"], - "reasoning": relation["reasoning"], - }) - - logger.info( - f"建立关联: {memory.id[:8]} --[{relation['relation_type']}]--> " - f"{relation['target_memory'].id[:8]} " - f"(置信度={relation['confidence']:.2f})" - ) - - except Exception as e: - logger.warning(f"建立关联失败: {e}") - continue - - # 异步保存更新后的图数据 - if result["linked_count"] > 0: - asyncio.create_task(self._async_save_graph_store("自动关联")) - logger.info(f"已启动保存任务: {result['linked_count']} 条自动关联边") - - logger.info(f"自动关联完成: {result}") - return result - - except Exception as e: - logger.error(f"自动关联失败: {e}", exc_info=True) - return {"error": str(e), "checked_count": 0, "linked_count": 0} - - async def _find_link_candidates( + async def _find_link_candidates( # 已废弃 self, memory: Memory, exclude_ids: set[str], max_results: int = 5, ) -> list[Memory]: """ - 为记忆寻找关联候选 + 为记忆寻找关联候选(已废弃) - 使用向量相似度 + 时间接近度找到潜在相关记忆 + 该功能已由三级记忆系统取代。 """ - try: - # 获取记忆的主题 - topic_node = next( - (n for n in memory.nodes if n.node_type == NodeType.TOPIC), - None - ) + logger.warning("_find_link_candidates 已废弃") + return [] - if not topic_node or not topic_node.content: - return [] - - # 使用主题内容搜索相似记忆 - candidates = await self.search_memories( - query=topic_node.content, - top_k=max_results * 2, - include_forgotten=False, - ) - - # 过滤:排除自己和已关联的 - existing_targets = { - e.target_id for e in memory.edges - if e.edge_type == EdgeType.RELATION - } - - filtered = [ - c for c in candidates - if c.id not in exclude_ids - and c.id not in existing_targets - ] - - return filtered[:max_results] - - except Exception as e: - logger.warning(f"查找候选失败: {e}") - return [] - - async def _analyze_memory_relations( + async def _analyze_memory_relations( # 已废弃 self, source_memory: Memory, candidate_memories: list[Memory], min_confidence: float = 0.7, ) -> list[dict[str, Any]]: """ - 使用LLM分析记忆之间的关系 + 使用LLM分析记忆之间的关系(已废弃) + + 该功能已由三级记忆系统取代。 Args: source_memory: 源记忆 @@ -1675,171 +1250,26 @@ class MemoryManager: min_confidence: 最低置信度 Returns: - 关系列表,每项包含: - - target_memory: 目标记忆 - - relation_type: 关系类型 - - confidence: 置信度 - - reasoning: 推理过程 + 空列表(向后兼容) """ - try: - from src.config.config import model_config - from src.llm_models.utils_model import LLMRequest + logger.warning("_analyze_memory_relations 已废弃") + return [] - # 构建LLM请求 - llm = LLMRequest( - model_set=model_config.model_task_config.utils_small, - request_type="memory.relation_analysis" - ) - - # 格式化记忆信息 - source_desc = self._format_memory_for_llm(source_memory) - candidates_desc = "\n\n".join([ - f"记忆{i+1}:\n{self._format_memory_for_llm(mem)}" - for i, mem in enumerate(candidate_memories) - ]) - - # 构建提示词 - prompt = f"""你是一个记忆关系分析专家。请分析源记忆与候选记忆之间是否存在有意义的关系。 - -**关系类型说明:** -- 导致: A的发生导致了B的发生(因果关系) -- 引用: A提到或涉及B(引用关系) -- 相似: A和B描述相似的内容(相似关系) -- 相反: A和B表达相反的观点(对立关系) -- 关联: A和B存在某种关联但不属于以上类型(一般关联) - -**源记忆:** -{source_desc} - -**候选记忆:** -{candidates_desc} - -**任务要求:** -1. 对每个候选记忆,判断是否与源记忆存在关系 -2. 如果存在关系,指定关系类型和置信度(0.0-1.0) -3. 简要说明判断理由 -4. 只返回置信度 >= {min_confidence} 的关系 - -**输出格式(JSON):** -```json -[ - {{ - "candidate_id": 1, - "has_relation": true, - "relation_type": "导致", - "confidence": 0.85, - "reasoning": "记忆1是记忆源的结果" - }}, - {{ - "candidate_id": 2, - "has_relation": false, - "reasoning": "两者无明显关联" - }} -] -``` - -请分析并输出JSON结果:""" - - # 调用LLM - response, _ = await llm.generate_response_async( - prompt, - temperature=0.3, - max_tokens=1000, - ) - - # 解析响应 - import json - import re - - # 提取JSON - json_match = re.search(r"```json\s*(.*?)\s*```", response, re.DOTALL) - if json_match: - json_str = json_match.group(1) - else: - json_str = response.strip() - - try: - analysis_results = json.loads(json_str) - except json.JSONDecodeError: - logger.warning(f"LLM返回格式错误,尝试修复: {response[:200]}") - # 尝试简单修复 - json_str = re.sub(r"[\r\n\t]", "", json_str) - analysis_results = json.loads(json_str) - - # 转换为结果格式 - relations = [] - for result in analysis_results: - if not result.get("has_relation", False): - continue - - confidence = result.get("confidence", 0.0) - if confidence < min_confidence: - continue - - candidate_id = result.get("candidate_id", 0) - 1 - if 0 <= candidate_id < len(candidate_memories): - relations.append({ - "target_memory": candidate_memories[candidate_id], - "relation_type": result.get("relation_type", "关联"), - "confidence": confidence, - "reasoning": result.get("reasoning", ""), - }) - - logger.debug(f"LLM分析完成: 发现 {len(relations)} 个关系") - return relations - - except Exception as e: - logger.error(f"LLM关系分析失败: {e}", exc_info=True) - return [] - - def _format_memory_for_llm(self, memory: Memory) -> str: - """格式化记忆为LLM可读的文本""" - try: - # 获取关键节点 - subject_node = next( - (n for n in memory.nodes if n.node_type == NodeType.SUBJECT), - None - ) - topic_node = next( - (n for n in memory.nodes if n.node_type == NodeType.TOPIC), - None - ) - object_node = next( - (n for n in memory.nodes if n.node_type == NodeType.OBJECT), - None - ) - - parts = [] - parts.append(f"类型: {memory.memory_type.value}") - - if subject_node: - parts.append(f"主体: {subject_node.content}") - - if topic_node: - parts.append(f"主题: {topic_node.content}") - - if object_node: - parts.append(f"对象: {object_node.content}") - - parts.append(f"重要性: {memory.importance:.2f}") - parts.append(f"时间: {memory.created_at.strftime('%Y-%m-%d %H:%M')}") - - return " | ".join(parts) - - except Exception as e: - logger.warning(f"格式化记忆失败: {e}") - return f"记忆ID: {memory.id}" + def _format_memory_for_llm(self, memory: Memory) -> str: # 已废弃 + """格式化记忆为LLM可读的文本(已废弃)""" + logger.warning("_format_memory_for_llm 已废弃") + return f"记忆ID: {memory.id}" async def maintenance(self) -> dict[str, Any]: """ - 执行维护任务(优化版本) + 执行维护任务(简化版) - 包括: - - 记忆整理(异步后台执行) - - 自动关联记忆(轻量级执行) - - 自动遗忘低激活度记忆 + 只包括: + - 简化的记忆整理(检查遗忘+清理孤立节点) - 保存数据 + 注意:记忆的创建、合并、关联等操作已由三级记忆系统自动处理 + Returns: 维护结果 """ @@ -1847,53 +1277,28 @@ class MemoryManager: await self.initialize() try: - logger.info("🔧 开始执行记忆系统维护(优化版)...") + logger.info("🔧 开始执行记忆系统维护...") result = { - "consolidation_task": "none", - "linked": 0, "forgotten": 0, + "orphan_nodes_cleaned": 0, + "orphan_edges_cleaned": 0, "saved": False, "total_time": 0, } start_time = datetime.now() - # 1. 记忆整理(异步后台执行,不阻塞主流程) + # 1. 简化的记忆整理(只检查遗忘和清理孤立节点) if getattr(self.config, "consolidation_enabled", False): - logger.info("🚀 启动异步记忆整理任务...") - consolidate_result = await self.consolidate_memories( - similarity_threshold=getattr(self.config, "consolidation_deduplication_threshold", 0.93), - time_window_hours=getattr(self.config, "consolidation_time_window_hours", 2.0), # 统一时间窗口 - max_batch_size=getattr(self.config, "consolidation_max_batch_size", 30) - ) + consolidate_result = await self.consolidate_memories() + result["forgotten"] = consolidate_result.get("forgotten_count", 0) + result["orphan_nodes_cleaned"] = consolidate_result.get("orphan_nodes_cleaned", 0) + result["orphan_edges_cleaned"] = consolidate_result.get("orphan_edges_cleaned", 0) - if consolidate_result.get("task_started"): - result["consolidation_task"] = f"background_task_{consolidate_result.get('task_id', 'unknown')}" - logger.info("✅ 记忆整理任务已启动到后台执行") - else: - result["consolidation_task"] = "failed" - logger.warning("❌ 记忆整理任务启动失败") - - # 2. 自动关联记忆(使用统一的时间窗口) - if getattr(self.config, "consolidation_linking_enabled", True): - logger.info("🔗 执行轻量级自动关联...") - link_result = await self._lightweight_auto_link_memories() - result["linked"] = link_result.get("linked_count", 0) - - # 3. 自动遗忘(快速执行) - if getattr(self.config, "forgetting_enabled", True): - logger.info("🗑️ 执行自动遗忘...") - forgotten_count = await self.auto_forget_memories( - threshold=getattr(self.config, "forgetting_activation_threshold", 0.1) - ) - result["forgotten"] = forgotten_count - - # 4. 保存数据(如果记忆整理不在后台执行) - if result["consolidation_task"] == "none": - await self.persistence.save_graph_store(self.graph_store) - result["saved"] = True - logger.info("💾 数据保存完成") + # 2. 保存数据 + await self.persistence.save_graph_store(self.graph_store) + result["saved"] = True self._last_maintenance = datetime.now() @@ -1908,293 +1313,45 @@ class MemoryManager: logger.error(f"❌ 维护失败: {e}", exc_info=True) return {"error": str(e), "total_time": 0} - async def _lightweight_auto_link_memories( + async def _lightweight_auto_link_memories( # 已废弃 self, - time_window_hours: float | None = None, # 从配置读取 - max_candidates: int | None = None, # 从配置读取 - max_memories: int | None = None, # 从配置读取 + time_window_hours: float | None = None, + max_candidates: int | None = None, + max_memories: int | None = None, ) -> dict[str, Any]: """ - 智能轻量级自动关联记忆(保留LLM判断,优化性能) + 智能轻量级自动关联记忆(已废弃) - 优化策略: - 1. 从配置读取处理参数,尊重用户设置 - 2. 使用向量相似度预筛选,仅对高相似度记忆调用LLM - 3. 批量LLM调用,减少网络开销 - 4. 异步执行,避免阻塞 + 该功能已由三级记忆系统取代。 + + Args: + time_window_hours: 从配置读取 + max_candidates: 从配置读取 + max_memories: 从配置读取 + + Returns: + 空结果(向后兼容) """ - try: - result = { - "checked_count": 0, - "linked_count": 0, - "llm_calls": 0, - } + logger.warning("_lightweight_auto_link_memories 已废弃") + return {"checked_count": 0, "linked_count": 0, "deprecated": True} - # 从配置读取参数,使用统一的时间窗口 - if time_window_hours is None: - time_window_hours = getattr(self.config, "consolidation_time_window_hours", 2.0) - if max_candidates is None: - max_candidates = getattr(self.config, "consolidation_linking_max_candidates", 10) - if max_memories is None: - max_memories = getattr(self.config, "consolidation_linking_max_memories", 20) - - # 获取用户配置时间窗口内的记忆 - time_threshold = datetime.now() - timedelta(hours=time_window_hours) - all_memories = self.graph_store.get_all_memories() - - recent_memories = [ - mem for mem in all_memories - if mem.created_at >= time_threshold - and not mem.metadata.get("forgotten", False) - and mem.importance >= getattr(self.config, "consolidation_linking_min_importance", 0.5) # 从配置读取重要性阈值 - ] - - if len(recent_memories) > max_memories: - recent_memories = sorted(recent_memories, key=lambda m: m.created_at, reverse=True)[:max_memories] - - if len(recent_memories) < 2: - logger.debug("记忆数量不足,跳过智能关联") - return result - - logger.debug(f"🧠 智能关联: 检查 {len(recent_memories)} 条重要记忆") - - # 第一步:向量相似度预筛选,找到潜在关联对 - candidate_pairs = [] - - for i, memory in enumerate(recent_memories): - # 获取主题节点 - topic_node = next( - (n for n in memory.nodes if n.node_type == NodeType.TOPIC), - None - ) - - if not topic_node or topic_node.embedding is None: - continue - - # 与其他记忆计算相似度 - for j, other_memory in enumerate(recent_memories[i+1:], i+1): - other_topic = next( - (n for n in other_memory.nodes if n.node_type == NodeType.TOPIC), - None - ) - - if not other_topic or other_topic.embedding is None: - continue - - # 快速相似度计算 - similarity = cosine_similarity( - topic_node.embedding, - other_topic.embedding - ) - - # 使用配置的预筛选阈值 - pre_filter_threshold = getattr(self.config, "consolidation_linking_pre_filter_threshold", 0.7) - if similarity >= pre_filter_threshold: - candidate_pairs.append((memory, other_memory, similarity)) - - # 让出控制权 - if i % 3 == 0: - await asyncio.sleep(0.001) - - logger.debug(f"🔍 预筛选找到 {len(candidate_pairs)} 个候选关联对") - - if not candidate_pairs: - return result - - # 第二步:批量LLM分析(使用配置的最大候选对数) - max_pairs_for_llm = getattr(self.config, "consolidation_linking_max_pairs_for_llm", 5) - if len(candidate_pairs) <= max_pairs_for_llm: - link_relations = await self._batch_analyze_memory_relations(candidate_pairs) - result["llm_calls"] = 1 - - # 第三步:建立LLM确认的关联 - for relation_info in link_relations: - try: - memory_a, memory_b = relation_info["memory_pair"] - relation_type = relation_info["relation_type"] - confidence = relation_info["confidence"] - - # 创建关联边 - edge = MemoryEdge( - id=f"smart_edge_{uuid.uuid4().hex[:12]}", - source_id=memory_a.subject_id, - target_id=memory_b.subject_id, - relation=relation_type, - edge_type=EdgeType.RELATION, - importance=confidence, - metadata={ - "auto_linked": True, - "method": "llm_analyzed", - "vector_similarity": relation_info.get("vector_similarity", 0.0), - "confidence": confidence, - "reasoning": relation_info.get("reasoning", ""), - "created_at": datetime.now().isoformat(), - } - ) - - # 添加到图 - self.graph_store.graph.add_edge( - edge.source_id, - edge.target_id, - edge_id=edge.id, - relation=edge.relation, - edge_type=edge.edge_type.value, - importance=edge.importance, - metadata=edge.metadata, - ) - - memory_a.edges.append(edge) - result["linked_count"] += 1 - - logger.debug(f"🧠 智能关联: {memory_a.id[:8]} --[{relation_type}]--> {memory_b.id[:8]} (置信度={confidence:.2f})") - - except Exception as e: - logger.warning(f"建立智能关联失败: {e}") - continue - - # 保存关联结果 - if result["linked_count"] > 0: - await self.persistence.save_graph_store(self.graph_store) - - logger.debug(f"✅ 智能关联完成: 建立了 {result['linked_count']} 个关联,LLM调用 {result['llm_calls']} 次") - return result - - except Exception as e: - logger.error(f"智能关联失败: {e}", exc_info=True) - return {"error": str(e), "checked_count": 0, "linked_count": 0} - - async def _batch_analyze_memory_relations( + async def _batch_analyze_memory_relations( # 已废弃 self, candidate_pairs: list[tuple[Memory, Memory, float]] ) -> list[dict[str, Any]]: """ - 批量分析记忆关系(优化LLM调用) + 批量分析记忆关系(已废弃) + + 该功能已由三级记忆系统取代。 Args: - candidate_pairs: 候选记忆对列表,每项包含 (memory_a, memory_b, vector_similarity) + candidate_pairs: 候选记忆对列表 Returns: - 关系分析结果列表 + 空列表(向后兼容) """ - try: - from src.config.config import model_config - from src.llm_models.utils_model import LLMRequest - - llm = LLMRequest( - model_set=model_config.model_task_config.utils_small, - request_type="memory.batch_relation_analysis" - ) - - # 格式化所有候选记忆对 - candidates_text = "" - for i, (mem_a, mem_b, similarity) in enumerate(candidate_pairs): - desc_a = self._format_memory_for_llm(mem_a) - desc_b = self._format_memory_for_llm(mem_b) - candidates_text += f""" -候选对 {i+1}: -记忆A: {desc_a} -记忆B: {desc_b} -向量相似度: {similarity:.3f} -""" - - # 构建批量分析提示词(使用配置的置信度阈值) - min_confidence = getattr(self.config, "consolidation_linking_min_confidence", 0.7) - - prompt = f"""你是记忆关系分析专家。请批量分析以下候选记忆对之间的关系。 - -**关系类型说明:** -- 导致: A的发生导致了B的发生(因果关系) -- 引用: A提到或涉及B(引用关系) -- 相似: A和B描述相似的内容(相似关系) -- 相反: A和B表达相反的观点(对立关系) -- 关联: A和B存在某种关联但不属于以上类型(一般关联) - -**候选记忆对:** -{candidates_text} - -**任务要求:** -1. 对每个候选对,判断是否存在有意义的关系 -2. 如果存在关系,指定关系类型和置信度(0.0-1.0) -3. 简要说明判断理由 -4. 只返回置信度 >= {min_confidence} 的关系 -5. 优先考虑因果、引用等强关系,谨慎建立相似关系 - -**输出格式(JSON):** -```json -[ - {{ - "candidate_id": 1, - "has_relation": true, - "relation_type": "导致", - "confidence": 0.85, - "reasoning": "记忆A描述的原因导致记忆B的结果" - }}, - {{ - "candidate_id": 2, - "has_relation": false, - "reasoning": "两者无明显关联" - }} -] -``` - -请分析并输出JSON结果:""" - - # 调用LLM(使用配置的参数) - llm_temperature = getattr(self.config, "consolidation_linking_llm_temperature", 0.2) - llm_max_tokens = getattr(self.config, "consolidation_linking_llm_max_tokens", 1500) - - response, _ = await llm.generate_response_async( - prompt, - temperature=llm_temperature, - max_tokens=llm_max_tokens, - ) - - # 解析响应 - import json - import re - - # 提取JSON - json_match = re.search(r"```json\s*(.*?)\s*```", response, re.DOTALL) - if json_match: - json_str = json_match.group(1) - else: - json_str = response.strip() - - try: - analysis_results = json.loads(json_str) - except json.JSONDecodeError: - logger.warning(f"LLM返回格式错误,尝试修复: {response[:200]}") - # 尝试简单修复 - json_str = re.sub(r"[\r\n\t]", "", json_str) - analysis_results = json.loads(json_str) - - # 转换为结果格式 - relations = [] - for result in analysis_results: - if not result.get("has_relation", False): - continue - - confidence = result.get("confidence", 0.0) - if confidence < min_confidence: # 使用配置的置信度阈值 - continue - - candidate_id = result.get("candidate_id", 0) - 1 - if 0 <= candidate_id < len(candidate_pairs): - mem_a, mem_b, vector_similarity = candidate_pairs[candidate_id] - relations.append({ - "memory_pair": (mem_a, mem_b), - "relation_type": result.get("relation_type", "关联"), - "confidence": confidence, - "reasoning": result.get("reasoning", ""), - "vector_similarity": vector_similarity, - }) - - logger.debug(f"🧠 LLM批量分析完成: 发现 {len(relations)} 个关系") - return relations - - except Exception as e: - logger.error(f"LLM批量关系分析失败: {e}", exc_info=True) - return [] + logger.warning("_batch_analyze_memory_relations 已废弃") + return [] def _start_maintenance_task(self) -> None: """ diff --git a/src/memory_graph/manager_singleton.py b/src/memory_graph/manager_singleton.py index dc735a06b..60cf8a9d2 100644 --- a/src/memory_graph/manager_singleton.py +++ b/src/memory_graph/manager_singleton.py @@ -1,7 +1,7 @@ """ 记忆系统管理单例 -提供全局访问的 MemoryManager 实例 +提供全局访问的 MemoryManager 和 UnifiedMemoryManager 实例 """ from __future__ import annotations @@ -13,10 +13,18 @@ from src.memory_graph.manager import MemoryManager logger = get_logger(__name__) -# 全局 MemoryManager 实例 +# 全局 MemoryManager 实例(旧的单层记忆系统,已弃用) _memory_manager: MemoryManager | None = None _initialized: bool = False +# 全局 UnifiedMemoryManager 实例(新的三层记忆系统) +_unified_memory_manager = None + + +# ============================================================================ +# 旧的单层记忆系统 API(已弃用,保留用于向后兼容) +# ============================================================================ + async def initialize_memory_manager( data_dir: Path | str | None = None, @@ -104,3 +112,97 @@ async def shutdown_memory_manager(): def is_initialized() -> bool: """检查 MemoryManager 是否已初始化""" return _initialized and _memory_manager is not None + + +# ============================================================================ +# 新的三层记忆系统 API(推荐使用) +# ============================================================================ + + +async def initialize_unified_memory_manager(): + """ + 初始化统一记忆管理器(三层记忆系统) + + 从全局配置读取参数 + + Returns: + 初始化后的管理器实例,未启用返回 None + """ + global _unified_memory_manager + + if _unified_memory_manager is not None: + logger.warning("统一记忆管理器已经初始化") + return _unified_memory_manager + + try: + from src.config.config import global_config + from src.memory_graph.unified_manager import UnifiedMemoryManager + + # 检查是否启用三层记忆系统 + if not hasattr(global_config, "three_tier_memory") or not getattr( + global_config.three_tier_memory, "enable", False + ): + logger.warning("三层记忆系统未启用,跳过初始化") + return None + + config = global_config.three_tier_memory + + # 创建管理器实例 + _unified_memory_manager = UnifiedMemoryManager( + data_dir=Path(getattr(config, "data_dir", "data/memory_graph/three_tier")), + # 感知记忆配置 + perceptual_max_blocks=getattr(config, "perceptual_max_blocks", 50), + perceptual_block_size=getattr(config, "perceptual_block_size", 5), + perceptual_activation_threshold=getattr(config, "perceptual_activation_threshold", 3), + perceptual_recall_top_k=getattr(config, "perceptual_recall_top_k", 5), + perceptual_recall_threshold=getattr(config, "perceptual_recall_threshold", 0.55), + # 短期记忆配置 + short_term_max_memories=getattr(config, "short_term_max_memories", 30), + short_term_transfer_threshold=getattr(config, "short_term_transfer_threshold", 0.6), + # 长期记忆配置 + long_term_batch_size=getattr(config, "long_term_batch_size", 10), + long_term_search_top_k=getattr(config, "long_term_search_top_k", 5), + long_term_decay_factor=getattr(config, "long_term_decay_factor", 0.95), + # 智能检索配置 + judge_confidence_threshold=getattr(config, "judge_confidence_threshold", 0.7), + ) + + # 初始化 + await _unified_memory_manager.initialize() + + logger.info("✅ 统一记忆管理器单例已初始化") + return _unified_memory_manager + + except Exception as e: + logger.error(f"初始化统一记忆管理器失败: {e}", exc_info=True) + raise + + +def get_unified_memory_manager(): + """ + 获取统一记忆管理器实例(三层记忆系统) + + Returns: + 管理器实例,未初始化返回 None + """ + if _unified_memory_manager is None: + logger.warning("统一记忆管理器尚未初始化,请先调用 initialize_unified_memory_manager()") + return _unified_memory_manager + + +async def shutdown_unified_memory_manager() -> None: + """关闭统一记忆管理器""" + global _unified_memory_manager + + if _unified_memory_manager is None: + logger.warning("统一记忆管理器未初始化,无需关闭") + return + + try: + await _unified_memory_manager.shutdown() + _unified_memory_manager = None + logger.info("✅ 统一记忆管理器已关闭") + + except Exception as e: + logger.error(f"关闭统一记忆管理器失败: {e}", exc_info=True) + diff --git a/src/memory_graph/models.py b/src/memory_graph/models.py index 0441c9bf3..2c6f1d134 100644 --- a/src/memory_graph/models.py +++ b/src/memory_graph/models.py @@ -1,7 +1,7 @@ """ 记忆图系统核心数据模型 -定义节点、边、记忆等核心数据结构 +定义节点、边、记忆等核心数据结构(包含三层记忆系统) """ from __future__ import annotations @@ -15,6 +15,50 @@ from typing import Any import numpy as np +# ============================================================================ +# 三层记忆系统枚举 +# ============================================================================ + + +class MemoryTier(Enum): + """记忆层级枚举""" + + PERCEPTUAL = "perceptual" # 感知记忆层 + SHORT_TERM = "short_term" # 短期记忆层 + LONG_TERM = "long_term" # 长期记忆层 + + +class GraphOperationType(Enum): + """图操作类型枚举""" + + CREATE_NODE = "create_node" # 创建节点 + UPDATE_NODE = "update_node" # 更新节点 + DELETE_NODE = "delete_node" # 删除节点 + MERGE_NODES = "merge_nodes" # 合并节点 + CREATE_EDGE = "create_edge" # 创建边 + UPDATE_EDGE = "update_edge" # 更新边 + DELETE_EDGE = "delete_edge" # 删除边 + CREATE_MEMORY = "create_memory" # 创建记忆 + UPDATE_MEMORY = "update_memory" # 更新记忆 + DELETE_MEMORY = "delete_memory" # 删除记忆 + MERGE_MEMORIES = "merge_memories" # 合并记忆 + + +class ShortTermOperation(Enum): + """短期记忆操作类型枚举""" + + MERGE = "merge" # 合并到现有记忆 + UPDATE = "update" # 更新现有记忆 + CREATE_NEW = "create_new" # 创建新记忆 + DISCARD = "discard" # 丢弃(低价值) + KEEP_SEPARATE = "keep_separate" # 保持独立(暂不合并) + + +# ============================================================================ +# 图谱系统枚举 +# ============================================================================ + + class NodeType(Enum): """节点类型枚举""" @@ -305,3 +349,329 @@ class StagedMemory: consolidated_at=datetime.fromisoformat(data["consolidated_at"]) if data.get("consolidated_at") else None, merge_history=data.get("merge_history", []), ) + + +# ============================================================================ +# 三层记忆系统数据模型 +# ============================================================================ + + +@dataclass +class MemoryBlock: + """ + 感知记忆块 + + 表示 n 条消息组成的一个语义单元,是感知记忆的基本单位。 + """ + + id: str # 记忆块唯一ID + messages: list[dict[str, Any]] # 原始消息列表(包含消息内容、发送者、时间等) + combined_text: str # 合并后的文本(用于生成向量) + embedding: np.ndarray | None = None # 整个块的向量表示 + created_at: datetime = field(default_factory=datetime.now) + recall_count: int = 0 # 被召回次数(用于判断是否激活) + last_recalled: datetime | None = None # 最后一次被召回的时间 + position_in_stack: int = 0 # 在记忆堆中的位置(0=最顶层) + metadata: dict[str, Any] = field(default_factory=dict) # 额外元数据 + + def __post_init__(self): + """后初始化处理""" + if not self.id: + self.id = f"block_{uuid.uuid4().hex[:12]}" + + def to_dict(self) -> dict[str, Any]: + """转换为字典(用于序列化)""" + return { + "id": self.id, + "messages": self.messages, + "combined_text": self.combined_text, + "created_at": self.created_at.isoformat(), + "recall_count": self.recall_count, + "last_recalled": self.last_recalled.isoformat() if self.last_recalled else None, + "position_in_stack": self.position_in_stack, + "metadata": self.metadata, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> MemoryBlock: + """从字典创建记忆块""" + return cls( + id=data["id"], + messages=data["messages"], + combined_text=data["combined_text"], + embedding=None, # 向量数据需要单独加载 + created_at=datetime.fromisoformat(data["created_at"]), + recall_count=data.get("recall_count", 0), + last_recalled=datetime.fromisoformat(data["last_recalled"]) if data.get("last_recalled") else None, + position_in_stack=data.get("position_in_stack", 0), + metadata=data.get("metadata", {}), + ) + + def increment_recall(self) -> None: + """增加召回计数""" + self.recall_count += 1 + self.last_recalled = datetime.now() + + def __str__(self) -> str: + return f"MemoryBlock({self.id[:8]}, messages={len(self.messages)}, recalls={self.recall_count})" + + +@dataclass +class PerceptualMemory: + """ + 感知记忆(记忆堆的完整状态) + + 全局单例,管理所有感知记忆块 + """ + + blocks: list[MemoryBlock] = field(default_factory=list) # 记忆块列表(有序,新的在前) + max_blocks: int = 50 # 记忆堆最大容量 + block_size: int = 5 # 每个块包含的消息数量 + pending_messages: list[dict[str, Any]] = field(default_factory=list) # 等待组块的消息缓存 + created_at: datetime = field(default_factory=datetime.now) + metadata: dict[str, Any] = field(default_factory=dict) # 全局元数据 + + def to_dict(self) -> dict[str, Any]: + """转换为字典(用于序列化)""" + return { + "blocks": [block.to_dict() for block in self.blocks], + "max_blocks": self.max_blocks, + "block_size": self.block_size, + "pending_messages": self.pending_messages, + "created_at": self.created_at.isoformat(), + "metadata": self.metadata, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> PerceptualMemory: + """从字典创建感知记忆""" + return cls( + blocks=[MemoryBlock.from_dict(b) for b in data.get("blocks", [])], + max_blocks=data.get("max_blocks", 50), + block_size=data.get("block_size", 5), + pending_messages=data.get("pending_messages", []), + created_at=datetime.fromisoformat(data["created_at"]), + metadata=data.get("metadata", {}), + ) + + +@dataclass +class ShortTermMemory: + """ + 短期记忆 + + 结构化的活跃记忆,介于感知记忆和长期记忆之间。 + 使用与长期记忆相同的 Memory 结构,但不包含图关系。 + """ + + id: str # 短期记忆唯一ID + content: str # 记忆的文本内容(LLM 结构化后的描述) + embedding: np.ndarray | None = None # 向量表示 + importance: float = 0.5 # 重要性评分 [0-1] + source_block_ids: list[str] = field(default_factory=list) # 来源感知记忆块ID列表 + created_at: datetime = field(default_factory=datetime.now) + last_accessed: datetime = field(default_factory=datetime.now) + access_count: int = 0 # 访问次数 + metadata: dict[str, Any] = field(default_factory=dict) # 额外元数据 + + # 记忆结构化字段(与长期记忆 Memory 兼容) + subject: str | None = None # 主体 + topic: str | None = None # 主题 + object: str | None = None # 客体 + memory_type: str | None = None # 记忆类型 + attributes: dict[str, str] = field(default_factory=dict) # 属性 + + def __post_init__(self): + """后初始化处理""" + if not self.id: + self.id = f"stm_{uuid.uuid4().hex[:12]}" + # 确保重要性在有效范围内 + self.importance = max(0.0, min(1.0, self.importance)) + + def to_dict(self) -> dict[str, Any]: + """转换为字典(用于序列化)""" + return { + "id": self.id, + "content": self.content, + "importance": self.importance, + "source_block_ids": self.source_block_ids, + "created_at": self.created_at.isoformat(), + "last_accessed": self.last_accessed.isoformat(), + "access_count": self.access_count, + "metadata": self.metadata, + "subject": self.subject, + "topic": self.topic, + "object": self.object, + "memory_type": self.memory_type, + "attributes": self.attributes, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ShortTermMemory: + """从字典创建短期记忆""" + return cls( + id=data["id"], + content=data["content"], + embedding=None, # 向量数据需要单独加载 + importance=data.get("importance", 0.5), + source_block_ids=data.get("source_block_ids", []), + created_at=datetime.fromisoformat(data["created_at"]), + last_accessed=datetime.fromisoformat(data.get("last_accessed", data["created_at"])), + access_count=data.get("access_count", 0), + metadata=data.get("metadata", {}), + subject=data.get("subject"), + topic=data.get("topic"), + object=data.get("object"), + memory_type=data.get("memory_type"), + attributes=data.get("attributes", {}), + ) + + def update_access(self) -> None: + """更新访问记录""" + self.last_accessed = datetime.now() + self.access_count += 1 + + def __str__(self) -> str: + return f"ShortTermMemory({self.id[:8]}, content={self.content[:30]}..., importance={self.importance:.2f})" + + +@dataclass +class GraphOperation: + """ + 图操作指令 + + 表示一个对长期记忆图的原子操作,由 LLM 生成。 + """ + + operation_type: GraphOperationType # 操作类型 + target_id: str | None = None # 目标对象ID(节点/边/记忆ID) + target_ids: list[str] = field(default_factory=list) # 多个目标ID(用于合并操作) + parameters: dict[str, Any] = field(default_factory=dict) # 操作参数 + reason: str = "" # 操作原因(LLM 的推理过程) + confidence: float = 1.0 # 操作置信度 [0-1] + + def __post_init__(self): + """后初始化处理""" + self.confidence = max(0.0, min(1.0, self.confidence)) + + def to_dict(self) -> dict[str, Any]: + """转换为字典""" + return { + "operation_type": self.operation_type.value, + "target_id": self.target_id, + "target_ids": self.target_ids, + "parameters": self.parameters, + "reason": self.reason, + "confidence": self.confidence, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> GraphOperation: + """从字典创建操作""" + return cls( + operation_type=GraphOperationType(data["operation_type"]), + target_id=data.get("target_id"), + target_ids=data.get("target_ids", []), + parameters=data.get("parameters", {}), + reason=data.get("reason", ""), + confidence=data.get("confidence", 1.0), + ) + + def __str__(self) -> str: + return f"GraphOperation({self.operation_type.value}, target={self.target_id}, confidence={self.confidence:.2f})" + + +@dataclass +class JudgeDecision: + """ + 裁判模型决策结果 + + 用于判断检索到的记忆是否充足 + """ + + is_sufficient: bool # 是否充足 + confidence: float = 0.5 # 置信度 [0-1] + reasoning: str = "" # 推理过程 + additional_queries: list[str] = field(default_factory=list) # 额外需要检索的 query + missing_aspects: list[str] = field(default_factory=list) # 缺失的信息维度 + + def __post_init__(self): + """后初始化处理""" + self.confidence = max(0.0, min(1.0, self.confidence)) + + def to_dict(self) -> dict[str, Any]: + """转换为字典""" + return { + "is_sufficient": self.is_sufficient, + "confidence": self.confidence, + "reasoning": self.reasoning, + "additional_queries": self.additional_queries, + "missing_aspects": self.missing_aspects, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> JudgeDecision: + """从字典创建决策""" + return cls( + is_sufficient=data["is_sufficient"], + confidence=data.get("confidence", 0.5), + reasoning=data.get("reasoning", ""), + additional_queries=data.get("additional_queries", []), + missing_aspects=data.get("missing_aspects", []), + ) + + def __str__(self) -> str: + status = "充足" if self.is_sufficient else "不足" + return f"JudgeDecision({status}, confidence={self.confidence:.2f}, extra_queries={len(self.additional_queries)})" + + +@dataclass +class ShortTermDecision: + """ + 短期记忆决策结果 + + LLM 对新短期记忆的处理决策 + """ + + operation: ShortTermOperation # 操作类型 + target_memory_id: str | None = None # 目标记忆ID(用于 MERGE/UPDATE) + merged_content: str | None = None # 合并后的内容 + reasoning: str = "" # 推理过程 + confidence: float = 1.0 # 置信度 [0-1] + updated_importance: float | None = None # 更新后的重要性 + updated_metadata: dict[str, Any] = field(default_factory=dict) # 更新后的元数据 + + def __post_init__(self): + """后初始化处理""" + self.confidence = max(0.0, min(1.0, self.confidence)) + if self.updated_importance is not None: + self.updated_importance = max(0.0, min(1.0, self.updated_importance)) + + def to_dict(self) -> dict[str, Any]: + """转换为字典""" + return { + "operation": self.operation.value, + "target_memory_id": self.target_memory_id, + "merged_content": self.merged_content, + "reasoning": self.reasoning, + "confidence": self.confidence, + "updated_importance": self.updated_importance, + "updated_metadata": self.updated_metadata, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ShortTermDecision: + """从字典创建决策""" + return cls( + operation=ShortTermOperation(data["operation"]), + target_memory_id=data.get("target_memory_id"), + merged_content=data.get("merged_content"), + reasoning=data.get("reasoning", ""), + confidence=data.get("confidence", 1.0), + updated_importance=data.get("updated_importance"), + updated_metadata=data.get("updated_metadata", {}), + ) + + def __str__(self) -> str: + return f"ShortTermDecision({self.operation.value}, target={self.target_memory_id}, confidence={self.confidence:.2f})" + diff --git a/src/memory_graph/three_tier/perceptual_manager.py b/src/memory_graph/perceptual_manager.py similarity index 98% rename from src/memory_graph/three_tier/perceptual_manager.py rename to src/memory_graph/perceptual_manager.py index e760a7519..a0fd2a694 100644 --- a/src/memory_graph/three_tier/perceptual_manager.py +++ b/src/memory_graph/perceptual_manager.py @@ -18,7 +18,7 @@ from typing import Any import numpy as np from src.common.logger import get_logger -from src.memory_graph.three_tier.models import MemoryBlock, PerceptualMemory +from src.memory_graph.models import MemoryBlock, PerceptualMemory from src.memory_graph.utils.embeddings import EmbeddingGenerator from src.memory_graph.utils.similarity import cosine_similarity @@ -75,6 +75,13 @@ class PerceptualMemoryManager: f"block_size={block_size}, activation_threshold={activation_threshold})" ) + @property + def memory(self) -> PerceptualMemory: + """获取感知记忆对象(保证非 None)""" + if self.perceptual_memory is None: + raise RuntimeError("感知记忆管理器未初始化") + return self.perceptual_memory + async def initialize(self) -> None: """初始化管理器""" if self._initialized: diff --git a/src/memory_graph/plugin_tools/memory_plugin_tools.py b/src/memory_graph/plugin_tools/memory_plugin_tools.py index 91a4c104f..0f44ede61 100644 --- a/src/memory_graph/plugin_tools/memory_plugin_tools.py +++ b/src/memory_graph/plugin_tools/memory_plugin_tools.py @@ -1,7 +1,7 @@ """ -记忆系统插件工具 +记忆系统插件工具(已废弃) -将 MemoryTools 适配为 BaseTool 格式,供 LLM 使用 +警告:记忆创建不再由工具负责,而是通过三级记忆系统自动处理 """ from __future__ import annotations @@ -15,7 +15,15 @@ from src.plugin_system.base.component_types import ToolParamType logger = get_logger(__name__) -class CreateMemoryTool(BaseTool): +# ========== 以下工具类已废弃 ========== +# 记忆系统现在采用三级记忆架构: +# 1. 感知记忆:自动收集消息块 +# 2. 短期记忆:激活后由模型格式化 +# 3. 长期记忆:定期转移到图结构 +# +# 不再需要LLM手动调用工具创建记忆 + +class _DeprecatedCreateMemoryTool(BaseTool): """创建记忆工具""" name = "create_memory" @@ -129,8 +137,8 @@ class CreateMemoryTool(BaseTool): } -class LinkMemoriesTool(BaseTool): - """关联记忆工具""" +class _DeprecatedLinkMemoriesTool(BaseTool): + """关联记忆工具(已废弃)""" name = "link_memories" description = "在两个记忆之间建立关联关系。用于连接相关的记忆,形成知识网络。" @@ -189,8 +197,8 @@ class LinkMemoriesTool(BaseTool): } -class SearchMemoriesTool(BaseTool): - """搜索记忆工具""" +class _DeprecatedSearchMemoriesTool(BaseTool): + """搜索记忆工具(已废弃)""" name = "search_memories" description = "搜索相关的记忆。根据查询词搜索记忆库,返回最相关的记忆。" diff --git a/src/memory_graph/three_tier/short_term_manager.py b/src/memory_graph/short_term_manager.py similarity index 98% rename from src/memory_graph/three_tier/short_term_manager.py rename to src/memory_graph/short_term_manager.py index 77c5c31ff..46eeb4c8e 100644 --- a/src/memory_graph/three_tier/short_term_manager.py +++ b/src/memory_graph/short_term_manager.py @@ -18,7 +18,7 @@ from typing import Any import numpy as np from src.common.logger import get_logger -from src.memory_graph.three_tier.models import ( +from src.memory_graph.models import ( MemoryBlock, ShortTermDecision, ShortTermMemory, @@ -194,9 +194,9 @@ class ShortTermMemoryManager: 请输出JSON:""" - # 调用 LLM + # 调用短期记忆构建模型 llm = LLMRequest( - model_set=model_config.model_task_config.utils_small, + model_set=model_config.model_task_config.memory_short_term_builder, request_type="short_term_memory.extract", ) @@ -299,9 +299,9 @@ class ShortTermMemoryManager: 请输出JSON:""" - # 调用 LLM + # 调用短期记忆决策模型 llm = LLMRequest( - model_set=model_config.model_task_config.utils_small, + model_set=model_config.model_task_config.memory_short_term_decider, request_type="short_term_memory.decide", ) diff --git a/src/memory_graph/three_tier/__init__.py b/src/memory_graph/three_tier/__init__.py deleted file mode 100644 index 70a104ada..000000000 --- a/src/memory_graph/three_tier/__init__.py +++ /dev/null @@ -1,38 +0,0 @@ -""" -三层记忆系统 (Three-Tier Memory System) - -分层架构: -1. 感知记忆层 (Perceptual Memory Layer) - 消息块的短期缓存 -2. 短期记忆层 (Short-term Memory Layer) - 结构化的活跃记忆 -3. 长期记忆层 (Long-term Memory Layer) - 持久化的图结构记忆 - -设计灵感来源于人脑的记忆机制和 Mem0 项目。 -""" - -from .models import ( - MemoryBlock, - PerceptualMemory, - ShortTermMemory, - GraphOperation, - GraphOperationType, - JudgeDecision, -) -from .perceptual_manager import PerceptualMemoryManager -from .short_term_manager import ShortTermMemoryManager -from .long_term_manager import LongTermMemoryManager -from .unified_manager import UnifiedMemoryManager - -__all__ = [ - # 数据模型 - "MemoryBlock", - "PerceptualMemory", - "ShortTermMemory", - "GraphOperation", - "GraphOperationType", - "JudgeDecision", - # 管理器 - "PerceptualMemoryManager", - "ShortTermMemoryManager", - "LongTermMemoryManager", - "UnifiedMemoryManager", -] diff --git a/src/memory_graph/three_tier/manager_singleton.py b/src/memory_graph/three_tier/manager_singleton.py deleted file mode 100644 index a7bf096cc..000000000 --- a/src/memory_graph/three_tier/manager_singleton.py +++ /dev/null @@ -1,101 +0,0 @@ -""" -三层记忆系统单例管理器 - -提供全局访问点 -""" - -from pathlib import Path - -from src.common.logger import get_logger -from src.config.config import global_config -from src.memory_graph.three_tier.unified_manager import UnifiedMemoryManager - -logger = get_logger(__name__) - -# 全局单例 -_unified_memory_manager: UnifiedMemoryManager | None = None - - -async def initialize_unified_memory_manager() -> UnifiedMemoryManager: - """ - 初始化统一记忆管理器 - - 从全局配置读取参数 - - Returns: - 初始化后的管理器实例 - """ - global _unified_memory_manager - - if _unified_memory_manager is not None: - logger.warning("统一记忆管理器已经初始化") - return _unified_memory_manager - - try: - # 检查是否启用三层记忆系统 - if not hasattr(global_config, "three_tier_memory") or not getattr( - global_config.three_tier_memory, "enable", False - ): - logger.warning("三层记忆系统未启用,跳过初始化") - return None - - config = global_config.three_tier_memory - - # 创建管理器实例 - _unified_memory_manager = UnifiedMemoryManager( - data_dir=Path(getattr(config, "data_dir", "data/memory_graph/three_tier")), - # 感知记忆配置 - perceptual_max_blocks=getattr(config, "perceptual_max_blocks", 50), - perceptual_block_size=getattr(config, "perceptual_block_size", 5), - perceptual_activation_threshold=getattr(config, "perceptual_activation_threshold", 3), - perceptual_recall_top_k=getattr(config, "perceptual_recall_top_k", 5), - perceptual_recall_threshold=getattr(config, "perceptual_recall_threshold", 0.55), - # 短期记忆配置 - short_term_max_memories=getattr(config, "short_term_max_memories", 30), - short_term_transfer_threshold=getattr(config, "short_term_transfer_threshold", 0.6), - # 长期记忆配置 - long_term_batch_size=getattr(config, "long_term_batch_size", 10), - long_term_search_top_k=getattr(config, "long_term_search_top_k", 5), - long_term_decay_factor=getattr(config, "long_term_decay_factor", 0.95), - # 智能检索配置 - judge_confidence_threshold=getattr(config, "judge_confidence_threshold", 0.7), - ) - - # 初始化 - await _unified_memory_manager.initialize() - - logger.info("✅ 统一记忆管理器单例已初始化") - return _unified_memory_manager - - except Exception as e: - logger.error(f"初始化统一记忆管理器失败: {e}", exc_info=True) - raise - - -def get_unified_memory_manager() -> UnifiedMemoryManager | None: - """ - 获取统一记忆管理器实例 - - Returns: - 管理器实例,未初始化返回 None - """ - if _unified_memory_manager is None: - logger.warning("统一记忆管理器尚未初始化,请先调用 initialize_unified_memory_manager()") - return _unified_memory_manager - - -async def shutdown_unified_memory_manager() -> None: - """关闭统一记忆管理器""" - global _unified_memory_manager - - if _unified_memory_manager is None: - logger.warning("统一记忆管理器未初始化,无需关闭") - return - - try: - await _unified_memory_manager.shutdown() - _unified_memory_manager = None - logger.info("✅ 统一记忆管理器已关闭") - - except Exception as e: - logger.error(f"关闭统一记忆管理器失败: {e}", exc_info=True) diff --git a/src/memory_graph/three_tier/models.py b/src/memory_graph/three_tier/models.py deleted file mode 100644 index c691a862a..000000000 --- a/src/memory_graph/three_tier/models.py +++ /dev/null @@ -1,369 +0,0 @@ -""" -三层记忆系统的核心数据模型 - -定义感知记忆块、短期记忆、图操作语言等数据结构 -""" - -from __future__ import annotations - -import uuid -from dataclasses import dataclass, field -from datetime import datetime -from enum import Enum -from typing import Any - -import numpy as np - - -class MemoryTier(Enum): - """记忆层级枚举""" - - PERCEPTUAL = "perceptual" # 感知记忆层 - SHORT_TERM = "short_term" # 短期记忆层 - LONG_TERM = "long_term" # 长期记忆层 - - -class GraphOperationType(Enum): - """图操作类型枚举""" - - CREATE_NODE = "create_node" # 创建节点 - UPDATE_NODE = "update_node" # 更新节点 - DELETE_NODE = "delete_node" # 删除节点 - MERGE_NODES = "merge_nodes" # 合并节点 - CREATE_EDGE = "create_edge" # 创建边 - UPDATE_EDGE = "update_edge" # 更新边 - DELETE_EDGE = "delete_edge" # 删除边 - CREATE_MEMORY = "create_memory" # 创建记忆 - UPDATE_MEMORY = "update_memory" # 更新记忆 - DELETE_MEMORY = "delete_memory" # 删除记忆 - MERGE_MEMORIES = "merge_memories" # 合并记忆 - - -class ShortTermOperation(Enum): - """短期记忆操作类型枚举""" - - MERGE = "merge" # 合并到现有记忆 - UPDATE = "update" # 更新现有记忆 - CREATE_NEW = "create_new" # 创建新记忆 - DISCARD = "discard" # 丢弃(低价值) - KEEP_SEPARATE = "keep_separate" # 保持独立(暂不合并) - - -@dataclass -class MemoryBlock: - """ - 感知记忆块 - - 表示 n 条消息组成的一个语义单元,是感知记忆的基本单位。 - """ - - id: str # 记忆块唯一ID - messages: list[dict[str, Any]] # 原始消息列表(包含消息内容、发送者、时间等) - combined_text: str # 合并后的文本(用于生成向量) - embedding: np.ndarray | None = None # 整个块的向量表示 - created_at: datetime = field(default_factory=datetime.now) - recall_count: int = 0 # 被召回次数(用于判断是否激活) - last_recalled: datetime | None = None # 最后一次被召回的时间 - position_in_stack: int = 0 # 在记忆堆中的位置(0=最顶层) - metadata: dict[str, Any] = field(default_factory=dict) # 额外元数据 - - def __post_init__(self): - """后初始化处理""" - if not self.id: - self.id = f"block_{uuid.uuid4().hex[:12]}" - - def to_dict(self) -> dict[str, Any]: - """转换为字典(用于序列化)""" - return { - "id": self.id, - "messages": self.messages, - "combined_text": self.combined_text, - "created_at": self.created_at.isoformat(), - "recall_count": self.recall_count, - "last_recalled": self.last_recalled.isoformat() if self.last_recalled else None, - "position_in_stack": self.position_in_stack, - "metadata": self.metadata, - } - - @classmethod - def from_dict(cls, data: dict[str, Any]) -> MemoryBlock: - """从字典创建记忆块""" - return cls( - id=data["id"], - messages=data["messages"], - combined_text=data["combined_text"], - embedding=None, # 向量数据需要单独加载 - created_at=datetime.fromisoformat(data["created_at"]), - recall_count=data.get("recall_count", 0), - last_recalled=datetime.fromisoformat(data["last_recalled"]) if data.get("last_recalled") else None, - position_in_stack=data.get("position_in_stack", 0), - metadata=data.get("metadata", {}), - ) - - def increment_recall(self) -> None: - """增加召回计数""" - self.recall_count += 1 - self.last_recalled = datetime.now() - - def __str__(self) -> str: - return f"MemoryBlock({self.id[:8]}, messages={len(self.messages)}, recalls={self.recall_count})" - - -@dataclass -class PerceptualMemory: - """ - 感知记忆(记忆堆的完整状态) - - 全局单例,管理所有感知记忆块 - """ - - blocks: list[MemoryBlock] = field(default_factory=list) # 记忆块列表(有序,新的在前) - max_blocks: int = 50 # 记忆堆最大容量 - block_size: int = 5 # 每个块包含的消息数量 - pending_messages: list[dict[str, Any]] = field(default_factory=list) # 等待组块的消息缓存 - created_at: datetime = field(default_factory=datetime.now) - metadata: dict[str, Any] = field(default_factory=dict) # 全局元数据 - - def to_dict(self) -> dict[str, Any]: - """转换为字典(用于序列化)""" - return { - "blocks": [block.to_dict() for block in self.blocks], - "max_blocks": self.max_blocks, - "block_size": self.block_size, - "pending_messages": self.pending_messages, - "created_at": self.created_at.isoformat(), - "metadata": self.metadata, - } - - @classmethod - def from_dict(cls, data: dict[str, Any]) -> PerceptualMemory: - """从字典创建感知记忆""" - return cls( - blocks=[MemoryBlock.from_dict(b) for b in data.get("blocks", [])], - max_blocks=data.get("max_blocks", 50), - block_size=data.get("block_size", 5), - pending_messages=data.get("pending_messages", []), - created_at=datetime.fromisoformat(data["created_at"]), - metadata=data.get("metadata", {}), - ) - - -@dataclass -class ShortTermMemory: - """ - 短期记忆 - - 结构化的活跃记忆,介于感知记忆和长期记忆之间。 - 使用与长期记忆相同的 Memory 结构,但不包含图关系。 - """ - - id: str # 短期记忆唯一ID - content: str # 记忆的文本内容(LLM 结构化后的描述) - embedding: np.ndarray | None = None # 向量表示 - importance: float = 0.5 # 重要性评分 [0-1] - source_block_ids: list[str] = field(default_factory=list) # 来源感知记忆块ID列表 - created_at: datetime = field(default_factory=datetime.now) - last_accessed: datetime = field(default_factory=datetime.now) - access_count: int = 0 # 访问次数 - metadata: dict[str, Any] = field(default_factory=dict) # 额外元数据 - - # 记忆结构化字段(与长期记忆 Memory 兼容) - subject: str | None = None # 主体 - topic: str | None = None # 主题 - object: str | None = None # 客体 - memory_type: str | None = None # 记忆类型 - attributes: dict[str, str] = field(default_factory=dict) # 属性 - - def __post_init__(self): - """后初始化处理""" - if not self.id: - self.id = f"stm_{uuid.uuid4().hex[:12]}" - # 确保重要性在有效范围内 - self.importance = max(0.0, min(1.0, self.importance)) - - def to_dict(self) -> dict[str, Any]: - """转换为字典(用于序列化)""" - return { - "id": self.id, - "content": self.content, - "importance": self.importance, - "source_block_ids": self.source_block_ids, - "created_at": self.created_at.isoformat(), - "last_accessed": self.last_accessed.isoformat(), - "access_count": self.access_count, - "metadata": self.metadata, - "subject": self.subject, - "topic": self.topic, - "object": self.object, - "memory_type": self.memory_type, - "attributes": self.attributes, - } - - @classmethod - def from_dict(cls, data: dict[str, Any]) -> ShortTermMemory: - """从字典创建短期记忆""" - return cls( - id=data["id"], - content=data["content"], - embedding=None, # 向量数据需要单独加载 - importance=data.get("importance", 0.5), - source_block_ids=data.get("source_block_ids", []), - created_at=datetime.fromisoformat(data["created_at"]), - last_accessed=datetime.fromisoformat(data.get("last_accessed", data["created_at"])), - access_count=data.get("access_count", 0), - metadata=data.get("metadata", {}), - subject=data.get("subject"), - topic=data.get("topic"), - object=data.get("object"), - memory_type=data.get("memory_type"), - attributes=data.get("attributes", {}), - ) - - def update_access(self) -> None: - """更新访问记录""" - self.last_accessed = datetime.now() - self.access_count += 1 - - def __str__(self) -> str: - return f"ShortTermMemory({self.id[:8]}, content={self.content[:30]}..., importance={self.importance:.2f})" - - -@dataclass -class GraphOperation: - """ - 图操作指令 - - 表示一个对长期记忆图的原子操作,由 LLM 生成。 - """ - - operation_type: GraphOperationType # 操作类型 - target_id: str | None = None # 目标对象ID(节点/边/记忆ID) - target_ids: list[str] = field(default_factory=list) # 多个目标ID(用于合并操作) - parameters: dict[str, Any] = field(default_factory=dict) # 操作参数 - reason: str = "" # 操作原因(LLM 的推理过程) - confidence: float = 1.0 # 操作置信度 [0-1] - - def __post_init__(self): - """后初始化处理""" - self.confidence = max(0.0, min(1.0, self.confidence)) - - def to_dict(self) -> dict[str, Any]: - """转换为字典""" - return { - "operation_type": self.operation_type.value, - "target_id": self.target_id, - "target_ids": self.target_ids, - "parameters": self.parameters, - "reason": self.reason, - "confidence": self.confidence, - } - - @classmethod - def from_dict(cls, data: dict[str, Any]) -> GraphOperation: - """从字典创建操作""" - return cls( - operation_type=GraphOperationType(data["operation_type"]), - target_id=data.get("target_id"), - target_ids=data.get("target_ids", []), - parameters=data.get("parameters", {}), - reason=data.get("reason", ""), - confidence=data.get("confidence", 1.0), - ) - - def __str__(self) -> str: - return f"GraphOperation({self.operation_type.value}, target={self.target_id}, confidence={self.confidence:.2f})" - - -@dataclass -class JudgeDecision: - """ - 裁判模型决策结果 - - 用于判断检索到的记忆是否充足 - """ - - is_sufficient: bool # 是否充足 - confidence: float = 0.5 # 置信度 [0-1] - reasoning: str = "" # 推理过程 - additional_queries: list[str] = field(default_factory=list) # 额外需要检索的 query - missing_aspects: list[str] = field(default_factory=list) # 缺失的信息维度 - - def __post_init__(self): - """后初始化处理""" - self.confidence = max(0.0, min(1.0, self.confidence)) - - def to_dict(self) -> dict[str, Any]: - """转换为字典""" - return { - "is_sufficient": self.is_sufficient, - "confidence": self.confidence, - "reasoning": self.reasoning, - "additional_queries": self.additional_queries, - "missing_aspects": self.missing_aspects, - } - - @classmethod - def from_dict(cls, data: dict[str, Any]) -> JudgeDecision: - """从字典创建决策""" - return cls( - is_sufficient=data["is_sufficient"], - confidence=data.get("confidence", 0.5), - reasoning=data.get("reasoning", ""), - additional_queries=data.get("additional_queries", []), - missing_aspects=data.get("missing_aspects", []), - ) - - def __str__(self) -> str: - status = "充足" if self.is_sufficient else "不足" - return f"JudgeDecision({status}, confidence={self.confidence:.2f}, extra_queries={len(self.additional_queries)})" - - -@dataclass -class ShortTermDecision: - """ - 短期记忆决策结果 - - LLM 对新短期记忆的处理决策 - """ - - operation: ShortTermOperation # 操作类型 - target_memory_id: str | None = None # 目标记忆ID(用于 MERGE/UPDATE) - merged_content: str | None = None # 合并后的内容 - reasoning: str = "" # 推理过程 - confidence: float = 1.0 # 置信度 [0-1] - updated_importance: float | None = None # 更新后的重要性 - updated_metadata: dict[str, Any] = field(default_factory=dict) # 更新后的元数据 - - def __post_init__(self): - """后初始化处理""" - self.confidence = max(0.0, min(1.0, self.confidence)) - if self.updated_importance is not None: - self.updated_importance = max(0.0, min(1.0, self.updated_importance)) - - def to_dict(self) -> dict[str, Any]: - """转换为字典""" - return { - "operation": self.operation.value, - "target_memory_id": self.target_memory_id, - "merged_content": self.merged_content, - "reasoning": self.reasoning, - "confidence": self.confidence, - "updated_importance": self.updated_importance, - "updated_metadata": self.updated_metadata, - } - - @classmethod - def from_dict(cls, data: dict[str, Any]) -> ShortTermDecision: - """从字典创建决策""" - return cls( - operation=ShortTermOperation(data["operation"]), - target_memory_id=data.get("target_memory_id"), - merged_content=data.get("merged_content"), - reasoning=data.get("reasoning", ""), - confidence=data.get("confidence", 1.0), - updated_importance=data.get("updated_importance"), - updated_metadata=data.get("updated_metadata", {}), - ) - - def __str__(self) -> str: - return f"ShortTermDecision({self.operation.value}, target={self.target_memory_id}, confidence={self.confidence:.2f})" diff --git a/src/memory_graph/three_tier/unified_manager.py b/src/memory_graph/three_tier/unified_manager.py index 15a5be671..a11506e90 100644 --- a/src/memory_graph/three_tier/unified_manager.py +++ b/src/memory_graph/three_tier/unified_manager.py @@ -74,12 +74,12 @@ class UnifiedMemoryManager: self.judge_confidence_threshold = judge_confidence_threshold # 三层管理器 - self.perceptual_manager: PerceptualMemoryManager | None = None - self.short_term_manager: ShortTermMemoryManager | None = None - self.long_term_manager: LongTermMemoryManager | None = None + self.perceptual_manager: PerceptualMemoryManager + self.short_term_manager: ShortTermMemoryManager + self.long_term_manager: LongTermMemoryManager # 底层 MemoryManager(长期记忆) - self.memory_manager: MemoryManager | None = None + self.memory_manager: MemoryManager # 配置参数存储(用于初始化) self._config = { @@ -313,15 +313,29 @@ class UnifiedMemoryManager: try: from src.config.config import model_config from src.llm_models.utils_model import LLMRequest + from src.memory_graph.utils.memory_formatter import format_memory_for_prompt - # 构建提示词 - perceptual_desc = "\n\n".join( - [f"记忆块{i+1}:\n{block.combined_text}" for i, block in enumerate(perceptual_blocks)] - ) + # 构建提示词 - 使用优化的格式 + # 防御性处理:确保 combined_text 是字符串 + perceptual_texts = [] + for i, block in enumerate(perceptual_blocks): + text = block.combined_text + if isinstance(text, list): + text = " ".join(str(item) for item in text) + elif not isinstance(text, str): + text = str(text) + perceptual_texts.append(f"记忆块{i+1}:\n{text}") + + perceptual_desc = "\n\n".join(perceptual_texts) - short_term_desc = "\n\n".join( - [f"记忆{i+1}:\n{mem.content}" for i, mem in enumerate(short_term_memories)] - ) + # 短期记忆使用 "主体-主题(属性)" 格式 + short_term_texts = [] + for mem in short_term_memories: + formatted = format_memory_for_prompt(mem, include_metadata=False) + if formatted: # 只添加非空的格式化结果 + short_term_texts.append(f"- {formatted}") + + short_term_desc = "\n".join(short_term_texts) prompt = f"""你是一个记忆检索评估专家。请判断检索到的记忆是否足以回答用户的问题。 @@ -331,7 +345,7 @@ class UnifiedMemoryManager: **检索到的感知记忆块:** {perceptual_desc or '(无)'} -**检索到的短期记忆:** +**检索到的短期记忆(结构化记忆,格式:主体-主题(属性)):** {short_term_desc or '(无)'} **任务要求:** @@ -352,16 +366,16 @@ class UnifiedMemoryManager: 请输出JSON:""" - # 调用 LLM + # 调用记忆裁判模型 llm = LLMRequest( - model_set=model_config.model_task_config.utils_small, + model_set=model_config.model_task_config.memory_judge, request_type="unified_memory.judge", ) response, _ = await llm.generate_response_async( prompt, - temperature=0.2, - max_tokens=800, + temperature=0.1, + max_tokens=600, ) # 解析响应 @@ -407,32 +421,53 @@ class UnifiedMemoryManager: logger.info("自动转移任务已启动") async def _auto_transfer_loop(self) -> None: - """自动转移循环""" + """自动转移循环(批量缓存模式)""" + transfer_cache = [] # 缓存待转移的短期记忆 + cache_size_threshold = self._config["long_term"]["batch_size"] # 使用配置的批量大小 + while True: try: # 每 10 分钟检查一次 await asyncio.sleep(600) - # 检查短期记忆是否达到上限 - if len(self.short_term_manager.memories) >= self.short_term_manager.max_memories: - logger.info("短期记忆已达上限,开始转移到长期记忆") + # 检查短期记忆是否有需要转移的 + memories_to_transfer = self.short_term_manager.get_memories_for_transfer() + + if memories_to_transfer: + # 添加到缓存 + transfer_cache.extend(memories_to_transfer) + logger.info( + f"缓存待转移记忆: 新增{len(memories_to_transfer)}条, " + f"缓存总数{len(transfer_cache)}/{cache_size_threshold}" + ) + + # 检查是否达到批量转移阈值或短期记忆已满 + should_transfer = ( + len(transfer_cache) >= cache_size_threshold or + len(self.short_term_manager.memories) >= self.short_term_manager.max_memories + ) + + if should_transfer and transfer_cache: + logger.info(f"触发批量转移: {len(transfer_cache)}条短期记忆→长期记忆") + + # 执行批量转移 + result = await self.long_term_manager.transfer_from_short_term( + transfer_cache + ) - # 获取待转移的记忆 - memories_to_transfer = self.short_term_manager.get_memories_for_transfer() - - if memories_to_transfer: - # 执行转移 - result = await self.long_term_manager.transfer_from_short_term( - memories_to_transfer + # 清除已转移的记忆 + if result.get("transferred_memory_ids"): + await self.short_term_manager.clear_transferred_memories( + result["transferred_memory_ids"] ) + # 从缓存中移除已转移的 + transferred_ids = set(result["transferred_memory_ids"]) + transfer_cache = [ + m for m in transfer_cache + if m.id not in transferred_ids + ] - # 清除已转移的记忆 - if result.get("transferred_memory_ids"): - await self.short_term_manager.clear_transferred_memories( - result["transferred_memory_ids"] - ) - - logger.info(f"自动转移完成: {result}") + logger.info(f"✅ 批量转移完成: {result}") except asyncio.CancelledError: logger.info("自动转移任务已取消") diff --git a/src/memory_graph/unified_manager.py b/src/memory_graph/unified_manager.py new file mode 100644 index 000000000..d2792bd65 --- /dev/null +++ b/src/memory_graph/unified_manager.py @@ -0,0 +1,561 @@ +""" +统一记忆管理器 (Unified Memory Manager) + +整合三层记忆系统: +- 感知记忆层 +- 短期记忆层 +- 长期记忆层 + +提供统一的接口供外部调用 +""" + +import asyncio +from datetime import datetime +from pathlib import Path +from typing import Any + +from src.common.logger import get_logger +from src.memory_graph.manager import MemoryManager +from src.memory_graph.long_term_manager import LongTermMemoryManager +from src.memory_graph.models import JudgeDecision, MemoryBlock, ShortTermMemory +from src.memory_graph.perceptual_manager import PerceptualMemoryManager +from src.memory_graph.short_term_manager import ShortTermMemoryManager + +logger = get_logger(__name__) + + +class UnifiedMemoryManager: + """ + 统一记忆管理器 + + 整合三层记忆系统,提供统一接口 + """ + + def __init__( + self, + data_dir: Path | None = None, + # 感知记忆配置 + perceptual_max_blocks: int = 50, + perceptual_block_size: int = 5, + perceptual_activation_threshold: int = 3, + perceptual_recall_top_k: int = 5, + perceptual_recall_threshold: float = 0.55, + # 短期记忆配置 + short_term_max_memories: int = 30, + short_term_transfer_threshold: float = 0.6, + # 长期记忆配置 + long_term_batch_size: int = 10, + long_term_search_top_k: int = 5, + long_term_decay_factor: float = 0.95, + # 智能检索配置 + judge_confidence_threshold: float = 0.7, + ): + """ + 初始化统一记忆管理器 + + Args: + data_dir: 数据存储目录 + perceptual_max_blocks: 感知记忆堆最大容量 + perceptual_block_size: 每个记忆块的消息数量 + perceptual_activation_threshold: 激活阈值(召回次数) + perceptual_recall_top_k: 召回时返回的最大块数 + perceptual_recall_threshold: 召回的相似度阈值 + short_term_max_memories: 短期记忆最大数量 + short_term_transfer_threshold: 转移到长期记忆的重要性阈值 + long_term_batch_size: 批量处理的短期记忆数量 + long_term_search_top_k: 检索相似记忆的数量 + long_term_decay_factor: 长期记忆的衰减因子 + judge_confidence_threshold: 裁判模型的置信度阈值 + """ + self.data_dir = data_dir or Path("data/memory_graph/three_tier") + self.data_dir.mkdir(parents=True, exist_ok=True) + + # 配置参数 + self.judge_confidence_threshold = judge_confidence_threshold + + # 三层管理器 + self.perceptual_manager: PerceptualMemoryManager + self.short_term_manager: ShortTermMemoryManager + self.long_term_manager: LongTermMemoryManager + + # 底层 MemoryManager(长期记忆) + self.memory_manager: MemoryManager + + # 配置参数存储(用于初始化) + self._config = { + "perceptual": { + "max_blocks": perceptual_max_blocks, + "block_size": perceptual_block_size, + "activation_threshold": perceptual_activation_threshold, + "recall_top_k": perceptual_recall_top_k, + "recall_similarity_threshold": perceptual_recall_threshold, + }, + "short_term": { + "max_memories": short_term_max_memories, + "transfer_importance_threshold": short_term_transfer_threshold, + }, + "long_term": { + "batch_size": long_term_batch_size, + "search_top_k": long_term_search_top_k, + "long_term_decay_factor": long_term_decay_factor, + }, + } + + # 状态 + self._initialized = False + self._auto_transfer_task: asyncio.Task | None = None + + logger.info("统一记忆管理器已创建") + + async def initialize(self) -> None: + """初始化统一记忆管理器""" + if self._initialized: + logger.warning("统一记忆管理器已经初始化") + return + + try: + logger.info("开始初始化统一记忆管理器...") + + # 初始化底层 MemoryManager(长期记忆) + self.memory_manager = MemoryManager(data_dir=self.data_dir.parent) + await self.memory_manager.initialize() + + # 初始化感知记忆层 + self.perceptual_manager = PerceptualMemoryManager( + data_dir=self.data_dir, + **self._config["perceptual"], + ) + await self.perceptual_manager.initialize() + + # 初始化短期记忆层 + self.short_term_manager = ShortTermMemoryManager( + data_dir=self.data_dir, + **self._config["short_term"], + ) + await self.short_term_manager.initialize() + + # 初始化长期记忆层 + self.long_term_manager = LongTermMemoryManager( + memory_manager=self.memory_manager, + **self._config["long_term"], + ) + await self.long_term_manager.initialize() + + self._initialized = True + logger.info("✅ 统一记忆管理器初始化完成") + + # 启动自动转移任务 + self._start_auto_transfer_task() + + except Exception as e: + logger.error(f"统一记忆管理器初始化失败: {e}", exc_info=True) + raise + + async def add_message(self, message: dict[str, Any]) -> MemoryBlock | None: + """ + 添加消息到感知记忆层 + + Args: + message: 消息字典 + + Returns: + 如果创建了新块,返回 MemoryBlock + """ + if not self._initialized: + await self.initialize() + + new_block = await self.perceptual_manager.add_message(message) + + # 注意:感知→短期的转移由召回触发,不是由添加消息触发 + # 转移逻辑在 search_memories 中处理 + + return new_block + + # 已移除 _process_activated_blocks 方法 + # 转移逻辑现在在 search_memories 中处理: + # 当召回某个记忆块时,如果其 recall_count >= activation_threshold, + # 立即将该块转移到短期记忆 + + async def search_memories( + self, query_text: str, use_judge: bool = True + ) -> dict[str, Any]: + """ + 智能检索记忆 + + 流程: + 1. 优先检索感知记忆和短期记忆 + 2. 使用裁判模型评估是否充足 + 3. 如果不充足,生成补充 query 并检索长期记忆 + + Args: + query_text: 查询文本 + use_judge: 是否使用裁判模型 + + Returns: + 检索结果字典,包含: + - perceptual_blocks: 感知记忆块列表 + - short_term_memories: 短期记忆列表 + - long_term_memories: 长期记忆列表 + - judge_decision: 裁判决策(如果使用) + """ + if not self._initialized: + await self.initialize() + + try: + result = { + "perceptual_blocks": [], + "short_term_memories": [], + "long_term_memories": [], + "judge_decision": None, + } + + # 步骤1: 检索感知记忆和短期记忆 + perceptual_blocks = await self.perceptual_manager.recall_blocks(query_text) + short_term_memories = await self.short_term_manager.search_memories(query_text) + + # 步骤1.5: 检查并处理需要转移的记忆块 + # 当某个块的召回次数达到阈值时,立即转移到短期记忆 + blocks_to_transfer = [ + block for block in perceptual_blocks + if block.metadata.get("needs_transfer", False) + ] + + if blocks_to_transfer: + logger.info(f"检测到 {len(blocks_to_transfer)} 个记忆块需要转移到短期记忆") + for block in blocks_to_transfer: + # 转换为短期记忆 + stm = await self.short_term_manager.add_from_block(block) + if stm: + # 从感知记忆中移除 + await self.perceptual_manager.remove_block(block.id) + logger.info(f"✅ 记忆块 {block.id} 已转为短期记忆 {stm.id}") + # 将新创建的短期记忆加入结果 + short_term_memories.append(stm) + + result["perceptual_blocks"] = perceptual_blocks + result["short_term_memories"] = short_term_memories + + logger.info( + f"初步检索: 感知记忆 {len(perceptual_blocks)} 块, " + f"短期记忆 {len(short_term_memories)} 条" + ) + + # 步骤2: 裁判模型评估 + if use_judge: + judge_decision = await self._judge_retrieval_sufficiency( + query_text, perceptual_blocks, short_term_memories + ) + result["judge_decision"] = judge_decision + + # 步骤3: 如果不充足,检索长期记忆 + if not judge_decision.is_sufficient: + logger.info("裁判判定记忆不充足,启动长期记忆检索") + + # 使用额外的 query 检索 + long_term_memories = [] + queries = [query_text] + judge_decision.additional_queries + + for q in queries: + memories = await self.memory_manager.search_memories( + query=q, + top_k=5, + use_multi_query=False, + ) + long_term_memories.extend(memories) + + # 去重 + seen_ids = set() + unique_memories = [] + for mem in long_term_memories: + if mem.id not in seen_ids: + unique_memories.append(mem) + seen_ids.add(mem.id) + + result["long_term_memories"] = unique_memories + logger.info(f"长期记忆检索: {len(unique_memories)} 条") + else: + # 不使用裁判,直接检索长期记忆 + long_term_memories = await self.memory_manager.search_memories( + query=query_text, + top_k=5, + use_multi_query=False, + ) + result["long_term_memories"] = long_term_memories + + return result + + except Exception as e: + logger.error(f"智能检索失败: {e}", exc_info=True) + return { + "perceptual_blocks": [], + "short_term_memories": [], + "long_term_memories": [], + "error": str(e), + } + + async def _judge_retrieval_sufficiency( + self, + query: str, + perceptual_blocks: list[MemoryBlock], + short_term_memories: list[ShortTermMemory], + ) -> JudgeDecision: + """ + 使用裁判模型评估检索结果是否充足 + + Args: + query: 原始查询 + perceptual_blocks: 感知记忆块 + short_term_memories: 短期记忆 + + Returns: + 裁判决策 + """ + try: + from src.config.config import model_config + from src.llm_models.utils_model import LLMRequest + from src.memory_graph.utils.memory_formatter import format_memory_for_prompt + + # 构建提示词 - 使用优化的格式 + # 防御性处理:确保 combined_text 是字符串 + perceptual_texts = [] + for i, block in enumerate(perceptual_blocks): + text = block.combined_text + if isinstance(text, list): + text = " ".join(str(item) for item in text) + elif not isinstance(text, str): + text = str(text) + perceptual_texts.append(f"记忆块{i+1}:\n{text}") + + perceptual_desc = "\n\n".join(perceptual_texts) + + # 短期记忆使用 "主体-主题(属性)" 格式 + short_term_texts = [] + for mem in short_term_memories: + formatted = format_memory_for_prompt(mem, include_metadata=False) + if formatted: # 只添加非空的格式化结果 + short_term_texts.append(f"- {formatted}") + + short_term_desc = "\n".join(short_term_texts) + + prompt = f"""你是一个记忆检索评估专家。请判断检索到的记忆是否足以回答用户的问题。 + +**用户查询:** +{query} + +**检索到的感知记忆块:** +{perceptual_desc or '(无)'} + +**检索到的短期记忆(结构化记忆,格式:主体-主题(属性)):** +{short_term_desc or '(无)'} + +**任务要求:** +1. 判断这些记忆是否足以回答用户的问题 +2. 如果不充足,分析缺少哪些方面的信息 +3. 生成额外需要检索的 query(用于在长期记忆中检索) + +**输出格式(JSON):** +```json +{{ + "is_sufficient": true/false, + "confidence": 0.85, + "reasoning": "判断理由", + "missing_aspects": ["缺失的信息1", "缺失的信息2"], + "additional_queries": ["补充query1", "补充query2"] +}} +``` + +请输出JSON:""" + + # 调用记忆裁判模型 + llm = LLMRequest( + model_set=model_config.model_task_config.memory_judge, + request_type="unified_memory.judge", + ) + + response, _ = await llm.generate_response_async( + prompt, + temperature=0.1, + max_tokens=600, + ) + + # 解析响应 + import json + import re + + json_match = re.search(r"```json\s*(.*?)\s*```", response, re.DOTALL) + if json_match: + json_str = json_match.group(1) + else: + json_str = response.strip() + + data = json.loads(json_str) + + decision = JudgeDecision( + is_sufficient=data.get("is_sufficient", False), + confidence=data.get("confidence", 0.5), + reasoning=data.get("reasoning", ""), + additional_queries=data.get("additional_queries", []), + missing_aspects=data.get("missing_aspects", []), + ) + + logger.info(f"裁判决策: {decision}") + return decision + + except Exception as e: + logger.error(f"裁判模型评估失败: {e}", exc_info=True) + # 默认判定为不充足,需要检索长期记忆 + return JudgeDecision( + is_sufficient=False, + confidence=0.3, + reasoning=f"裁判模型失败: {e}", + additional_queries=[query], + ) + + def _start_auto_transfer_task(self) -> None: + """启动自动转移任务""" + if self._auto_transfer_task and not self._auto_transfer_task.done(): + logger.warning("自动转移任务已在运行") + return + + self._auto_transfer_task = asyncio.create_task(self._auto_transfer_loop()) + logger.info("自动转移任务已启动") + + async def _auto_transfer_loop(self) -> None: + """自动转移循环(批量缓存模式)""" + transfer_cache = [] # 缓存待转移的短期记忆 + cache_size_threshold = self._config["long_term"]["batch_size"] # 使用配置的批量大小 + + while True: + try: + # 每 10 分钟检查一次 + await asyncio.sleep(600) + + # 检查短期记忆是否有需要转移的 + memories_to_transfer = self.short_term_manager.get_memories_for_transfer() + + if memories_to_transfer: + # 添加到缓存 + transfer_cache.extend(memories_to_transfer) + logger.info( + f"缓存待转移记忆: 新增{len(memories_to_transfer)}条, " + f"缓存总数{len(transfer_cache)}/{cache_size_threshold}" + ) + + # 检查是否达到批量转移阈值或短期记忆已满 + should_transfer = ( + len(transfer_cache) >= cache_size_threshold or + len(self.short_term_manager.memories) >= self.short_term_manager.max_memories + ) + + if should_transfer and transfer_cache: + logger.info(f"触发批量转移: {len(transfer_cache)}条短期记忆→长期记忆") + + # 执行批量转移 + result = await self.long_term_manager.transfer_from_short_term( + transfer_cache + ) + + # 清除已转移的记忆 + if result.get("transferred_memory_ids"): + await self.short_term_manager.clear_transferred_memories( + result["transferred_memory_ids"] + ) + # 从缓存中移除已转移的 + transferred_ids = set(result["transferred_memory_ids"]) + transfer_cache = [ + m for m in transfer_cache + if m.id not in transferred_ids + ] + + logger.info(f"✅ 批量转移完成: {result}") + + except asyncio.CancelledError: + logger.info("自动转移任务已取消") + break + except Exception as e: + logger.error(f"自动转移任务错误: {e}", exc_info=True) + # 继续运行 + + async def manual_transfer(self) -> dict[str, Any]: + """ + 手动触发短期记忆到长期记忆的转移 + + Returns: + 转移结果 + """ + if not self._initialized: + await self.initialize() + + try: + memories_to_transfer = self.short_term_manager.get_memories_for_transfer() + + if not memories_to_transfer: + logger.info("没有需要转移的短期记忆") + return {"message": "没有需要转移的记忆", "transferred_count": 0} + + # 执行转移 + result = await self.long_term_manager.transfer_from_short_term(memories_to_transfer) + + # 清除已转移的记忆 + if result.get("transferred_memory_ids"): + await self.short_term_manager.clear_transferred_memories( + result["transferred_memory_ids"] + ) + + logger.info(f"手动转移完成: {result}") + return result + + except Exception as e: + logger.error(f"手动转移失败: {e}", exc_info=True) + return {"error": str(e), "transferred_count": 0} + + def get_statistics(self) -> dict[str, Any]: + """获取三层记忆系统的统计信息""" + if not self._initialized: + return {} + + return { + "perceptual": self.perceptual_manager.get_statistics(), + "short_term": self.short_term_manager.get_statistics(), + "long_term": self.long_term_manager.get_statistics(), + "total_system_memories": ( + self.perceptual_manager.get_statistics().get("total_messages", 0) + + self.short_term_manager.get_statistics().get("total_memories", 0) + + self.long_term_manager.get_statistics().get("total_memories", 0) + ), + } + + async def shutdown(self) -> None: + """关闭统一记忆管理器""" + if not self._initialized: + return + + try: + logger.info("正在关闭统一记忆管理器...") + + # 取消自动转移任务 + if self._auto_transfer_task and not self._auto_transfer_task.done(): + self._auto_transfer_task.cancel() + try: + await self._auto_transfer_task + except asyncio.CancelledError: + pass + + # 关闭各层管理器 + if self.perceptual_manager: + await self.perceptual_manager.shutdown() + + if self.short_term_manager: + await self.short_term_manager.shutdown() + + if self.long_term_manager: + await self.long_term_manager.shutdown() + + if self.memory_manager: + await self.memory_manager.shutdown() + + self._initialized = False + logger.info("✅ 统一记忆管理器已关闭") + + except Exception as e: + logger.error(f"关闭统一记忆管理器失败: {e}", exc_info=True) diff --git a/src/memory_graph/utils/memory_formatter.py b/src/memory_graph/utils/memory_formatter.py new file mode 100644 index 000000000..c546e0614 --- /dev/null +++ b/src/memory_graph/utils/memory_formatter.py @@ -0,0 +1,234 @@ +""" +记忆格式化工具 + +提供将记忆对象格式化为提示词的功能,使用 "主体-主题(属性)" 格式。 +""" + +from src.memory_graph.models import Memory, MemoryNode, NodeType +from src.memory_graph.models import ShortTermMemory + + +def get_memory_type_label(memory_type: str) -> str: + """ + 获取记忆类型的中文标签 + + Args: + memory_type: 记忆类型(英文) + + Returns: + 中文标签 + """ + type_mapping = { + "事实": "事实", + "事件": "事件", + "观点": "观点", + "关系": "关系", + "目标": "目标", + "计划": "计划", + "fact": "事实", + "event": "事件", + "opinion": "观点", + "relation": "关系", + "goal": "目标", + "plan": "计划", + "unknown": "未知", + } + return type_mapping.get(memory_type.lower(), memory_type) + + +def format_memory_for_prompt(memory: Memory | ShortTermMemory, include_metadata: bool = True) -> str: + """ + 格式化记忆为提示词文本 + + 使用 "主体-主题(属性)" 格式,例如: + - "张三-职业(程序员, 公司=MoFox)" + - "小明-喜欢(Python, 原因=简洁优雅)" + - "拾风-地址(https://mofox.com)" + + Args: + memory: Memory 或 ShortTermMemory 对象 + include_metadata: 是否包含元数据(如重要性、时间等) + + Returns: + 格式化后的记忆文本 + """ + if isinstance(memory, ShortTermMemory): + return _format_short_term_memory(memory, include_metadata) + elif isinstance(memory, Memory): + return _format_long_term_memory(memory, include_metadata) + else: + return str(memory) + + +def _format_short_term_memory(mem: ShortTermMemory, include_metadata: bool) -> str: + """ + 格式化短期记忆 + + Args: + mem: ShortTermMemory 对象 + include_metadata: 是否包含元数据 + + Returns: + 格式化后的文本 + """ + parts = [] + + # 主体 + subject = mem.subject or "" + # 主题 + topic = mem.topic or "" + # 客体 + obj = mem.object or "" + + # 构建基础格式:主体-主题 + if subject and topic: + base = f"{subject}-{topic}" + elif subject: + base = subject + elif topic: + base = topic + else: + # 如果没有结构化字段,使用 content + # 防御性编程:确保 content 是字符串 + if isinstance(mem.content, list): + return " ".join(str(item) for item in mem.content) + return str(mem.content) if mem.content else "" + + # 添加客体和属性 + attr_parts = [] + if obj: + attr_parts.append(obj) + + # 添加属性 + if mem.attributes: + for key, value in mem.attributes.items(): + if value: + attr_parts.append(f"{key}={value}") + + # 组合 + if attr_parts: + result = f"{base}({', '.join(attr_parts)})" + else: + result = base + + # 添加元数据(可选) + if include_metadata: + metadata_parts = [] + if mem.memory_type: + metadata_parts.append(f"类型:{get_memory_type_label(mem.memory_type)}") + if mem.importance > 0: + metadata_parts.append(f"重要性:{mem.importance:.2f}") + + if metadata_parts: + result = f"{result} [{', '.join(metadata_parts)}]" + + return result + + +def _format_long_term_memory(mem: Memory, include_metadata: bool) -> str: + """ + 格式化长期记忆(Memory 对象) + + Args: + mem: Memory 对象 + include_metadata: 是否包含元数据 + + Returns: + 格式化后的文本 + """ + from src.memory_graph.models import EdgeType + + # 获取主体节点 + subject_node = mem.get_subject_node() + if not subject_node: + return mem.to_text() + + subject = subject_node.content + + # 查找主题节点 + topic_node = None + for edge in mem.edges: + edge_type = edge.edge_type.value if hasattr(edge.edge_type, 'value') else str(edge.edge_type) + if edge_type == "记忆类型" and edge.source_id == mem.subject_id: + topic_node = mem.get_node_by_id(edge.target_id) + break + + if not topic_node: + return subject + + topic = topic_node.content + + # 基础格式:主体-主题 + base = f"{subject}-{topic}" + + # 收集客体和属性 + attr_parts = [] + + # 查找客体节点(通过核心关系边) + for edge in mem.edges: + edge_type = edge.edge_type.value if hasattr(edge.edge_type, 'value') else str(edge.edge_type) + if edge_type == "核心关系" and edge.source_id == topic_node.id: + obj_node = mem.get_node_by_id(edge.target_id) + if obj_node: + # 如果有关系名称,使用关系名称 + if edge.relation and edge.relation != "未知": + attr_parts.append(f"{edge.relation}={obj_node.content}") + else: + attr_parts.append(obj_node.content) + + # 查找属性节点 + for node in mem.nodes: + if node.node_type == NodeType.ATTRIBUTE: + # 属性节点的 content 格式可能是 "key=value" 或 "value" + attr_parts.append(node.content) + + # 组合 + if attr_parts: + result = f"{base}({', '.join(attr_parts)})" + else: + result = base + + # 添加元数据(可选) + if include_metadata: + metadata_parts = [] + if mem.memory_type: + type_value = mem.memory_type.value if hasattr(mem.memory_type, 'value') else str(mem.memory_type) + metadata_parts.append(f"类型:{get_memory_type_label(type_value)}") + if mem.importance > 0: + metadata_parts.append(f"重要性:{mem.importance:.2f}") + + if metadata_parts: + result = f"{result} [{', '.join(metadata_parts)}]" + + return result + + +def format_memories_block( + memories: list[Memory | ShortTermMemory], + title: str = "相关记忆", + max_count: int = 10, + include_metadata: bool = False, +) -> str: + """ + 格式化多个记忆为提示词块 + + Args: + memories: 记忆列表 + title: 块标题 + max_count: 最多显示的记忆数量 + include_metadata: 是否包含元数据 + + Returns: + 格式化后的记忆块 + """ + if not memories: + return "" + + lines = [f"### 🧠 {title}", ""] + + for mem in memories[:max_count]: + formatted = format_memory_for_prompt(mem, include_metadata=include_metadata) + if formatted: + lines.append(f"- {formatted}") + + return "\n".join(lines) diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 3d6c82c3d..80cb7476e 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "7.8.0" +version = "7.8.1" #----以下是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -314,35 +314,32 @@ max_related_memories = 5 # 激活传播时最多影响的相关记忆数 # ==================== 三层记忆系统配置 (Three-Tier Memory System) ==================== # 受人脑记忆机制启发的分层记忆架构: -# 1. 感知记忆层 (Perceptual Memory) - 消息块的短期缓存 -# 2. 短期记忆层 (Short-term Memory) - 结构化的活跃记忆 -# 3. 长期记忆层 (Long-term Memory) - 持久化的图结构记忆 +# 1. 感知记忆层 (Perceptual Memory) - 消息块的短期缓存,自动收集 +# 2. 短期记忆层 (Short-term Memory) - 结构化的活跃记忆,模型格式化 +# 3. 长期记忆层 (Long-term Memory) - 持久化的图结构记忆,批量转移 [three_tier_memory] -enable = false # 是否启用三层记忆系统(实验性功能,建议在测试环境先试用) +enable = false # 是否启用三层记忆系统(已经是核心系统,建议启用) data_dir = "data/memory_graph/three_tier" # 数据存储目录 # --- 感知记忆层配置 --- -perceptual_max_blocks = 50 # 记忆堆最大容量(全局,不区分聊天流) +perceptual_max_blocks = 50 # 记忆堆最大容量(全局) perceptual_block_size = 5 # 每个记忆块包含的消息数量 perceptual_similarity_threshold = 0.55 # 相似度阈值(0-1) -perceptual_topk = 3 # TopK召回数量 -activation_threshold = 3 # 激活阈值(召回次数→短期) +perceptual_topk = 5 # TopK召回数量 +perceptual_activation_threshold = 3 # 激活阈值(召回次数达到此值→转为短期记忆) # --- 短期记忆层配置 --- -short_term_max_memories = 30 # 短期记忆最大数量 +short_term_max_memories = 30 # 短期记忆最大数量(达到上限触发批量转移) short_term_transfer_threshold = 0.6 # 转移到长期记忆的重要性阈值(0.0-1.0) short_term_search_top_k = 5 # 搜索时返回的最大数量 -short_term_decay_factor = 0.98 # 衰减因子 # --- 长期记忆层配置 --- -long_term_batch_size = 10 # 批量转移大小 -long_term_decay_factor = 0.95 # 衰减因子(比短期记忆慢) -long_term_auto_transfer_interval = 600 # 自动转移间隔(秒) +long_term_batch_size = 10 # 批量转移大小(每次转移多少条短期记忆) +long_term_search_top_k = 5 # 长期记忆二次检索返回数量 -# --- Judge模型配置 --- -judge_model_name = "utils_small" # 用于决策的LLM模型 -judge_temperature = 0.1 # Judge模型的温度参数 -enable_judge_retrieval = true # 启用智能检索判断 +# --- 记忆检索配置 --- +enable_judge_retrieval = true # 启用智能检索判断(裁判模型评估是否需要二次检索) +judge_confidence_threshold = 0.7 # 裁判模型置信度阈值 [voice] enable_asr = true # 是否启用语音识别,启用后MoFox-Bot可以识别语音消息,启用该功能需要配置语音识别模型[model.voice] diff --git a/template/model_config_template.toml b/template/model_config_template.toml index 527ede4a3..18c4baef6 100644 --- a/template/model_config_template.toml +++ b/template/model_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "1.3.9" +version = "1.4.0" # 配置文件版本号迭代规则同bot_config.toml @@ -214,3 +214,25 @@ max_tokens = 800 model_list = ["deepseek-r1-distill-qwen-32b"] temperature = 0.7 max_tokens = 800 + +#------------记忆系统专用模型------------ + +[model_task_config.memory_short_term_builder] # 短期记忆构建模型(感知→短期格式化) +model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.2-Exp"] +temperature = 0.2 +max_tokens = 800 + +[model_task_config.memory_short_term_decider] # 短期记忆决策模型(决定合并/更新/新建/丢弃) +model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.2-Exp"] +temperature = 0.2 +max_tokens = 1000 + +[model_task_config.memory_long_term_builder] # 长期记忆构建模型(短期→长期图结构) +model_list = ["siliconflow-Qwen/Qwen3-Next-80B-A3B-Instruct"] +temperature = 0.2 +max_tokens = 1500 + +[model_task_config.memory_judge] # 记忆检索裁判模型(判断检索是否充足) +model_list = ["siliconflow-THUDM/GLM-4-9B-0414"] +temperature = 0.1 +max_tokens = 600