From 82bb2df3697f2042ec7760e2091bad559fbc3963 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Thu, 2 Oct 2025 11:03:47 +0800 Subject: [PATCH] =?UTF-8?q?feat(memory):=20=E5=A2=9E=E5=BC=BA=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E8=A7=84=E5=88=92=E4=BB=A5=E6=94=AF=E6=8C=81=E6=9C=AA?= =?UTF-8?q?=E8=AF=BB=E6=B6=88=E6=81=AF=E4=B8=8A=E4=B8=8B=E6=96=87=E5=88=86?= =?UTF-8?q?=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增未读消息上下文集成功能,提升记忆检索的精准度和相关性。 通过分析当前对话流中的未读消息内容、关键词和参与者信息, 生成更贴合实际对话场景的记忆检索策略。 主要改进: - 查询规划器现在能够结合未读消息生成语义查询 - 新增未读消息上下文收集和摘要构建机制 - 优化向量存储的兜底逻辑以兼容新的记忆结构 - 改进记忆转换过程中的调试和空内容处理 这些功能使记忆系统能够更好地理解当前对话的上下文, 提供更相关的历史记忆来支持生成更准确的回复。 --- .../memory_system/memory_query_planner.py | 64 ++++++- src/chat/memory_system/memory_system.py | 169 ++++++++++++++++-- .../memory_system/vector_memory_storage_v2.py | 45 +++-- src/chat/replyer/default_generator.py | 31 +++- 4 files changed, 265 insertions(+), 44 deletions(-) diff --git a/src/chat/memory_system/memory_query_planner.py b/src/chat/memory_system/memory_query_planner.py index e6f64c97b..690e26627 100644 --- a/src/chat/memory_system/memory_query_planner.py +++ b/src/chat/memory_system/memory_query_planner.py @@ -135,22 +135,76 @@ class MemoryQueryPlanner: persona = context.get("bot_personality") or context.get("bot_identity") or "未知" + # 构建未读消息上下文信息 + context_section = "" + if context.get("has_unread_context") and context.get("unread_messages_context"): + unread_context = context["unread_messages_context"] + unread_messages = unread_context.get("messages", []) + unread_keywords = unread_context.get("keywords", []) + unread_participants = unread_context.get("participants", []) + context_summary = unread_context.get("context_summary", "") + + if unread_messages: + # 构建未读消息摘要 + message_previews = [] + for msg in unread_messages[:5]: # 最多显示5条 + sender = msg.get("sender", "未知") + content = msg.get("content", "")[:100] # 限制每条消息长度 + message_previews.append(f"{sender}: {content}") + + context_section = f""" + +## 📋 未读消息上下文 (共{unread_context.get('total_count', 0)}条未读消息) +### 最近消息预览: +{chr(10).join(message_previews)} + +### 上下文关键词: +{', '.join(unread_keywords[:15]) if unread_keywords else '无'} + +### 对话参与者: +{', '.join(unread_participants) if unread_participants else '无'} + +### 上下文摘要: +{context_summary[:300] if context_summary else '无'} +""" + else: + context_section = """ + +## 📋 未读消息上下文: +无未读消息或上下文信息不可用 +""" + return f""" 你是一名记忆检索规划助手,请基于输入生成一个简洁的 JSON 检索计划。 +你的任务是分析当前查询并结合未读消息的上下文,生成更精准的记忆检索策略。 + 仅需提供以下字段: -- semantic_query: 用于向量召回的自然语言描述,要求具体且贴合当前查询; +- semantic_query: 用于向量召回的自然语言描述,要求具体且贴合当前查询和上下文; - memory_types: 建议检索的记忆类型列表,取值范围来自 MemoryType 枚举 (personal_fact,event,preference,opinion,relationship,emotion,knowledge,skill,goal,experience,contextual); - subject_includes: 建议出现在记忆主语中的人物或角色; - object_includes: 建议关注的对象、主题或关键信息; +- required_keywords: 建议必须包含的关键词(从上下文中提取); - recency: 推荐的时间偏好,可选 recent/any/historical; - limit: 推荐的最大返回数量 (1-15); -- notes: 额外补充说明(可选)。 +- emphasis: 检索重点,可选 balanced/contextual/recent/comprehensive。 请不要生成谓语字段,也不要额外补充其它参数。 -当前查询: "{query_text}" -已知的对话参与者: {participant_preview} -机器人设定: {persona} +## 当前查询: +"{query_text}" + +## 已知对话参与者: +{participant_preview} + +## 机器人设定: +{persona}{context_section} + +## 🎯 指导原则: +1. **上下文关联**: 优先分析与当前查询相关的未读消息内容和关键词 +2. **语义理解**: 结合上下文理解查询的真实意图,而非字面意思 +3. **参与者感知**: 考虑未读消息中的参与者,检索与他们相关的记忆 +4. **主题延续**: 关注未读消息中讨论的主题,检索相关的历史记忆 +5. **时间相关性**: 如果未读消息讨论最近的事件,偏向检索相关时期的记忆 请直接输出符合要求的 JSON 对象,禁止添加额外文本或 Markdown 代码块。 """ diff --git a/src/chat/memory_system/memory_system.py b/src/chat/memory_system/memory_system.py index 3d55ae5bf..6f82ef184 100644 --- a/src/chat/memory_system/memory_system.py +++ b/src/chat/memory_system/memory_system.py @@ -628,35 +628,22 @@ class MemorySystem: try: normalized_context = self._normalize_context(context, GLOBAL_MEMORY_SCOPE, None) - effective_limit = limit or self.config.final_recall_limit - + effective_limit = self.config.final_recall_limit + # === 阶段一:元数据粗筛(软性过滤) === coarse_filters = { "user_id": GLOBAL_MEMORY_SCOPE, # 必选:确保作用域正确 } - - # 可选:添加重要性阈值(过滤低价值记忆) - if hasattr(self.config, 'memory_importance_threshold'): - importance_threshold = self.config.memory_importance_threshold - if importance_threshold > 0: - coarse_filters["importance"] = {"$gte": importance_threshold} - logger.debug(f"[阶段一] 启用重要性过滤: >= {importance_threshold}") - - # 可选:添加时间范围(只搜索最近N天) - if hasattr(self.config, 'memory_recency_days'): - recency_days = self.config.memory_recency_days - if recency_days > 0: - cutoff_time = time.time() - (recency_days * 24 * 3600) - coarse_filters["created_at"] = {"$gte": cutoff_time} - logger.debug(f"[阶段一] 启用时间过滤: 最近 {recency_days} 天") # 应用查询规划(优化查询语句并构建元数据过滤) optimized_query = raw_query metadata_filters = {} - + if self.query_planner: try: - query_plan = await self.query_planner.plan_query(raw_query, normalized_context) + # 构建包含未读消息的增强上下文 + enhanced_context = await self._build_enhanced_query_context(raw_query, normalized_context) + query_plan = await self.query_planner.plan_query(raw_query, enhanced_context) # 使用LLM优化后的查询语句(更精确的语义表达) if getattr(query_plan, "semantic_query", None): @@ -877,6 +864,150 @@ class MemorySystem: return context + async def _build_enhanced_query_context(self, raw_query: str, normalized_context: Dict[str, Any]) -> Dict[str, Any]: + """构建包含未读消息综合上下文的增强查询上下文 + + Args: + raw_query: 原始查询文本 + normalized_context: 标准化后的基础上下文 + + Returns: + Dict[str, Any]: 包含未读消息综合信息的增强上下文 + """ + enhanced_context = dict(normalized_context) # 复制基础上下文 + + try: + # 获取stream_id以查找未读消息 + stream_id = normalized_context.get("stream_id") + if not stream_id: + logger.debug("未找到stream_id,使用基础上下文进行查询规划") + return enhanced_context + + # 获取未读消息作为上下文 + unread_messages_summary = await self._collect_unread_messages_context(stream_id) + + if unread_messages_summary: + enhanced_context["unread_messages_context"] = unread_messages_summary + enhanced_context["has_unread_context"] = True + + logger.debug(f"为查询规划构建了增强上下文,包含 {len(unread_messages_summary.get('messages', []))} 条未读消息") + else: + enhanced_context["has_unread_context"] = False + logger.debug("未找到未读消息,使用基础上下文进行查询规划") + + except Exception as e: + logger.warning(f"构建增强查询上下文失败: {e}", exc_info=True) + enhanced_context["has_unread_context"] = False + + return enhanced_context + + async def _collect_unread_messages_context(self, stream_id: str) -> Optional[Dict[str, Any]]: + """收集未读消息的综合上下文信息 + + Args: + stream_id: 流ID + + Returns: + Optional[Dict[str, Any]]: 未读消息的综合信息,包含消息列表、关键词、主题等 + """ + try: + from src.chat.message_receive.chat_stream import get_chat_manager + + chat_manager = get_chat_manager() + chat_stream = chat_manager.get_stream(stream_id) + + if not chat_stream or not hasattr(chat_stream, "context_manager"): + logger.debug(f"未找到stream_id={stream_id}的聊天流或上下文管理器") + return None + + # 获取未读消息 + context_manager = chat_stream.context_manager + unread_messages = context_manager.get_unread_messages() + + if not unread_messages: + logger.debug(f"stream_id={stream_id}没有未读消息") + return None + + # 构建未读消息摘要 + messages_summary = [] + all_keywords = set() + participant_names = set() + + for msg in unread_messages[:10]: # 限制处理最近10条未读消息 + try: + # 提取消息内容 + content = (getattr(msg, "processed_plain_text", None) or + getattr(msg, "display_message", None) or "") + if not content: + continue + + # 提取发送者信息 + sender_name = "未知用户" + if hasattr(msg, "user_info") and msg.user_info: + sender_name = (getattr(msg.user_info, "user_nickname", None) or + getattr(msg.user_info, "user_cardname", None) or + getattr(msg.user_info, "user_id", None) or "未知用户") + + participant_names.add(sender_name) + + # 添加到消息摘要 + messages_summary.append({ + "sender": sender_name, + "content": content[:200], # 限制长度避免过长 + "timestamp": getattr(msg, "time", None) + }) + + # 提取关键词(简单实现) + content_lower = content.lower() + # 这里可以添加更复杂的关键词提取逻辑 + words = [w.strip() for w in content_lower.split() if len(w.strip()) > 1] + all_keywords.update(words[:5]) # 每条消息最多取5个词 + + except Exception as msg_e: + logger.debug(f"处理未读消息时出错: {msg_e}") + continue + + if not messages_summary: + return None + + # 构建综合上下文信息 + unread_context = { + "messages": messages_summary, + "total_count": len(unread_messages), + "processed_count": len(messages_summary), + "keywords": list(all_keywords)[:20], # 最多20个关键词 + "participants": list(participant_names), + "context_summary": self._build_unread_context_summary(messages_summary) + } + + logger.debug(f"收集到未读消息上下文: {len(messages_summary)}条消息,{len(all_keywords)}个关键词,{len(participant_names)}个参与者") + return unread_context + + except Exception as e: + logger.warning(f"收集未读消息上下文失败: {e}", exc_info=True) + return None + + def _build_unread_context_summary(self, messages_summary: List[Dict[str, Any]]) -> str: + """构建未读消息的文本摘要 + + Args: + messages_summary: 未读消息摘要列表 + + Returns: + str: 未读消息的文本摘要 + """ + if not messages_summary: + return "" + + summary_parts = [] + for msg_info in messages_summary: + sender = msg_info.get("sender", "未知") + content = msg_info.get("content", "") + if content: + summary_parts.append(f"{sender}: {content}") + + return " | ".join(summary_parts) + async def _resolve_conversation_context(self, fallback_text: str, context: Optional[Dict[str, Any]]) -> str: """使用 stream_id 历史消息和相关记忆充实对话文本,默认回退到传入文本""" if not context: diff --git a/src/chat/memory_system/vector_memory_storage_v2.py b/src/chat/memory_system/vector_memory_storage_v2.py index fcb189065..51b37acae 100644 --- a/src/chat/memory_system/vector_memory_storage_v2.py +++ b/src/chat/memory_system/vector_memory_storage_v2.py @@ -281,26 +281,43 @@ class VectorMemoryStorage: memory_dict = orjson.loads(metadata["memory_data"]) return MemoryChunk.from_dict(memory_dict) - # 兜底:从基础字段重建 + # 兜底:从基础字段重建(使用新的结构化格式) + logger.warning(f"未找到memory_data,使用兜底逻辑重建记忆 (id={metadata.get('memory_id', 'unknown')})") + + # 构建符合MemoryChunk.from_dict期望的结构 memory_dict = { - "memory_id": metadata.get("memory_id", f"recovered_{int(time.time())}"), - "user_id": metadata.get("user_id", "unknown"), - "text_content": document, - "display": document, - "memory_type": metadata.get("memory_type", "general"), - "keywords": orjson.loads(metadata.get("keywords", "[]")), - "importance": metadata.get("importance", 0.5), - "timestamp": metadata.get("timestamp", time.time()), - "access_count": metadata.get("access_count", 0), - "last_access_time": metadata.get("last_access_time", 0), - "confidence": metadata.get("confidence", 0.8), - "metadata": {} + "metadata": { + "memory_id": metadata.get("memory_id", f"recovered_{int(time.time())}"), + "user_id": metadata.get("user_id", "unknown"), + "created_at": metadata.get("timestamp", time.time()), + "last_accessed": metadata.get("last_access_time", time.time()), + "last_modified": metadata.get("timestamp", time.time()), + "access_count": metadata.get("access_count", 0), + "relevance_score": 0.0, + "confidence": int(metadata.get("confidence", 2)), # MEDIUM + "importance": int(metadata.get("importance", 2)), # NORMAL + "source_context": None, + }, + "content": { + "subject": "", + "predicate": "", + "object": "", + "display": document # 使用document作为显示文本 + }, + "memory_type": metadata.get("memory_type", "contextual"), + "keywords": orjson.loads(metadata.get("keywords", "[]")) if isinstance(metadata.get("keywords"), str) else metadata.get("keywords", []), + "tags": [], + "categories": [], + "embedding": None, + "semantic_hash": None, + "related_memories": [], + "temporal_context": None } return MemoryChunk.from_dict(memory_dict) except Exception as e: - logger.warning(f"转换Vector结果到MemoryChunk失败: {e}") + logger.error(f"转换Vector结果到MemoryChunk失败: {e}", exc_info=True) return None def _get_from_cache(self, memory_id: str) -> Optional[MemoryChunk]: diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index d44e0a354..de9e8f798 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -587,15 +587,25 @@ class DefaultReplyer: # 转换格式以兼容现有代码 running_memories = [] if enhanced_memories: - for memory_chunk in enhanced_memories: + logger.debug(f"[记忆转换] 收到 {len(enhanced_memories)} 条原始记忆") + for idx, memory_chunk in enumerate(enhanced_memories, 1): + # 获取结构化内容的字符串表示 + structure_display = str(memory_chunk.content) if hasattr(memory_chunk, 'content') else "unknown" + + # 获取记忆内容,优先使用display + content = memory_chunk.display or memory_chunk.text_content or "" + + # 调试:记录每条记忆的内容获取情况 + logger.debug(f"[记忆转换] 第{idx}条: display={repr(memory_chunk.display)[:80]}, text_content={repr(memory_chunk.text_content)[:80]}, final_content={repr(content)[:80]}") + running_memories.append({ - "content": memory_chunk.display or memory_chunk.text_content or "", + "content": content, "memory_type": memory_chunk.memory_type.value, "confidence": memory_chunk.metadata.confidence.value, "importance": memory_chunk.metadata.importance.value, - "relevance": getattr(memory_chunk, 'relevance_score', 0.5), + "relevance": getattr(memory_chunk.metadata, 'relevance_score', 0.5), "source": memory_chunk.metadata.source, - "structure": memory_chunk.content_structure.value if memory_chunk.content_structure else "unknown", + "structure": structure_display, }) # 构建瞬时记忆字符串 @@ -604,7 +614,7 @@ class DefaultReplyer: if top_memory: instant_memory = top_memory[0].get("content", "") - logger.info(f"增强记忆系统检索到 {len(running_memories)} 条记忆") + logger.info(f"增强记忆系统检索到 {len(enhanced_memories)} 条原始记忆,转换为 {len(running_memories)} 条可用记忆") except Exception as e: logger.warning(f"增强记忆系统检索失败: {e}") @@ -640,10 +650,17 @@ class DefaultReplyer: # 调试相关度信息 relevance_info = [(m.get('memory_type', 'unknown'), m.get('relevance', 0.0)) for m in sorted_memories] logger.debug(f"记忆相关度信息: {relevance_info}") + logger.debug(f"[记忆构建] 准备将 {len(sorted_memories)} 条记忆添加到提示词") - for running_memory in sorted_memories: + for idx, running_memory in enumerate(sorted_memories, 1): content = running_memory.get('content', '') memory_type = running_memory.get('memory_type', 'unknown') + + # 跳过空内容 + if not content or not content.strip(): + logger.warning(f"[记忆构建] 跳过第 {idx} 条记忆:内容为空 (type={memory_type})") + logger.debug(f"[记忆构建] 空记忆详情: {running_memory}") + continue # 映射记忆类型到中文标签 type_mapping = { @@ -661,10 +678,12 @@ class DefaultReplyer: if "(类型:" in content and ")" in content: clean_content = content.split("(类型:")[0].strip() + logger.debug(f"[记忆构建] 添加第 {idx} 条记忆: [{chinese_type}] {clean_content[:50]}...") memory_parts.append(f"- **[{chinese_type}]** {clean_content}") memory_str = "\n".join(memory_parts) + "\n" has_any_memory = True + logger.debug(f"[记忆构建] 成功构建记忆字符串,包含 {len(memory_parts) - 2} 条记忆") # 添加瞬时记忆 if instant_memory: