feat(memory): 增强查询规划以支持未读消息上下文分析

新增未读消息上下文集成功能,提升记忆检索的精准度和相关性。
通过分析当前对话流中的未读消息内容、关键词和参与者信息,
生成更贴合实际对话场景的记忆检索策略。

主要改进:
- 查询规划器现在能够结合未读消息生成语义查询
- 新增未读消息上下文收集和摘要构建机制
- 优化向量存储的兜底逻辑以兼容新的记忆结构
- 改进记忆转换过程中的调试和空内容处理

这些功能使记忆系统能够更好地理解当前对话的上下文,
提供更相关的历史记忆来支持生成更准确的回复。
This commit is contained in:
Windpicker-owo
2025-10-02 11:03:47 +08:00
parent 59bda71f29
commit 82bb2df369
4 changed files with 265 additions and 44 deletions

View File

@@ -135,22 +135,76 @@ class MemoryQueryPlanner:
persona = context.get("bot_personality") or context.get("bot_identity") or "未知"
# 构建未读消息上下文信息
context_section = ""
if context.get("has_unread_context") and context.get("unread_messages_context"):
unread_context = context["unread_messages_context"]
unread_messages = unread_context.get("messages", [])
unread_keywords = unread_context.get("keywords", [])
unread_participants = unread_context.get("participants", [])
context_summary = unread_context.get("context_summary", "")
if unread_messages:
# 构建未读消息摘要
message_previews = []
for msg in unread_messages[:5]: # 最多显示5条
sender = msg.get("sender", "未知")
content = msg.get("content", "")[:100] # 限制每条消息长度
message_previews.append(f"{sender}: {content}")
context_section = f"""
## 📋 未读消息上下文 (共{unread_context.get('total_count', 0)}条未读消息)
### 最近消息预览:
{chr(10).join(message_previews)}
### 上下文关键词:
{', '.join(unread_keywords[:15]) if unread_keywords else ''}
### 对话参与者:
{', '.join(unread_participants) if unread_participants else ''}
### 上下文摘要:
{context_summary[:300] if context_summary else ''}
"""
else:
context_section = """
## 📋 未读消息上下文:
无未读消息或上下文信息不可用
"""
return f"""
你是一名记忆检索规划助手,请基于输入生成一个简洁的 JSON 检索计划。
你的任务是分析当前查询并结合未读消息的上下文,生成更精准的记忆检索策略。
仅需提供以下字段:
- semantic_query: 用于向量召回的自然语言描述,要求具体且贴合当前查询;
- semantic_query: 用于向量召回的自然语言描述,要求具体且贴合当前查询和上下文
- memory_types: 建议检索的记忆类型列表,取值范围来自 MemoryType 枚举 (personal_fact,event,preference,opinion,relationship,emotion,knowledge,skill,goal,experience,contextual)
- subject_includes: 建议出现在记忆主语中的人物或角色;
- object_includes: 建议关注的对象、主题或关键信息;
- required_keywords: 建议必须包含的关键词(从上下文中提取);
- recency: 推荐的时间偏好,可选 recent/any/historical
- limit: 推荐的最大返回数量 (1-15)
- notes: 额外补充说明(可选)
- emphasis: 检索重点,可选 balanced/contextual/recent/comprehensive
请不要生成谓语字段,也不要额外补充其它参数。
当前查询: "{query_text}"
已知的对话参与者: {participant_preview}
机器人设定: {persona}
## 当前查询:
"{query_text}"
## 已知对话参与者:
{participant_preview}
## 机器人设定:
{persona}{context_section}
## 🎯 指导原则:
1. **上下文关联**: 优先分析与当前查询相关的未读消息内容和关键词
2. **语义理解**: 结合上下文理解查询的真实意图,而非字面意思
3. **参与者感知**: 考虑未读消息中的参与者,检索与他们相关的记忆
4. **主题延续**: 关注未读消息中讨论的主题,检索相关的历史记忆
5. **时间相关性**: 如果未读消息讨论最近的事件,偏向检索相关时期的记忆
请直接输出符合要求的 JSON 对象,禁止添加额外文本或 Markdown 代码块。
"""

View File

@@ -628,35 +628,22 @@ class MemorySystem:
try:
normalized_context = self._normalize_context(context, GLOBAL_MEMORY_SCOPE, None)
effective_limit = limit or self.config.final_recall_limit
effective_limit = self.config.final_recall_limit
# === 阶段一:元数据粗筛(软性过滤) ===
coarse_filters = {
"user_id": GLOBAL_MEMORY_SCOPE, # 必选:确保作用域正确
}
# 可选:添加重要性阈值(过滤低价值记忆)
if hasattr(self.config, 'memory_importance_threshold'):
importance_threshold = self.config.memory_importance_threshold
if importance_threshold > 0:
coarse_filters["importance"] = {"$gte": importance_threshold}
logger.debug(f"[阶段一] 启用重要性过滤: >= {importance_threshold}")
# 可选添加时间范围只搜索最近N天
if hasattr(self.config, 'memory_recency_days'):
recency_days = self.config.memory_recency_days
if recency_days > 0:
cutoff_time = time.time() - (recency_days * 24 * 3600)
coarse_filters["created_at"] = {"$gte": cutoff_time}
logger.debug(f"[阶段一] 启用时间过滤: 最近 {recency_days}")
# 应用查询规划(优化查询语句并构建元数据过滤)
optimized_query = raw_query
metadata_filters = {}
if self.query_planner:
try:
query_plan = await self.query_planner.plan_query(raw_query, normalized_context)
# 构建包含未读消息的增强上下文
enhanced_context = await self._build_enhanced_query_context(raw_query, normalized_context)
query_plan = await self.query_planner.plan_query(raw_query, enhanced_context)
# 使用LLM优化后的查询语句更精确的语义表达
if getattr(query_plan, "semantic_query", None):
@@ -877,6 +864,150 @@ class MemorySystem:
return context
async def _build_enhanced_query_context(self, raw_query: str, normalized_context: Dict[str, Any]) -> Dict[str, Any]:
"""构建包含未读消息综合上下文的增强查询上下文
Args:
raw_query: 原始查询文本
normalized_context: 标准化后的基础上下文
Returns:
Dict[str, Any]: 包含未读消息综合信息的增强上下文
"""
enhanced_context = dict(normalized_context) # 复制基础上下文
try:
# 获取stream_id以查找未读消息
stream_id = normalized_context.get("stream_id")
if not stream_id:
logger.debug("未找到stream_id使用基础上下文进行查询规划")
return enhanced_context
# 获取未读消息作为上下文
unread_messages_summary = await self._collect_unread_messages_context(stream_id)
if unread_messages_summary:
enhanced_context["unread_messages_context"] = unread_messages_summary
enhanced_context["has_unread_context"] = True
logger.debug(f"为查询规划构建了增强上下文,包含 {len(unread_messages_summary.get('messages', []))} 条未读消息")
else:
enhanced_context["has_unread_context"] = False
logger.debug("未找到未读消息,使用基础上下文进行查询规划")
except Exception as e:
logger.warning(f"构建增强查询上下文失败: {e}", exc_info=True)
enhanced_context["has_unread_context"] = False
return enhanced_context
async def _collect_unread_messages_context(self, stream_id: str) -> Optional[Dict[str, Any]]:
"""收集未读消息的综合上下文信息
Args:
stream_id: 流ID
Returns:
Optional[Dict[str, Any]]: 未读消息的综合信息,包含消息列表、关键词、主题等
"""
try:
from src.chat.message_receive.chat_stream import get_chat_manager
chat_manager = get_chat_manager()
chat_stream = chat_manager.get_stream(stream_id)
if not chat_stream or not hasattr(chat_stream, "context_manager"):
logger.debug(f"未找到stream_id={stream_id}的聊天流或上下文管理器")
return None
# 获取未读消息
context_manager = chat_stream.context_manager
unread_messages = context_manager.get_unread_messages()
if not unread_messages:
logger.debug(f"stream_id={stream_id}没有未读消息")
return None
# 构建未读消息摘要
messages_summary = []
all_keywords = set()
participant_names = set()
for msg in unread_messages[:10]: # 限制处理最近10条未读消息
try:
# 提取消息内容
content = (getattr(msg, "processed_plain_text", None) or
getattr(msg, "display_message", None) or "")
if not content:
continue
# 提取发送者信息
sender_name = "未知用户"
if hasattr(msg, "user_info") and msg.user_info:
sender_name = (getattr(msg.user_info, "user_nickname", None) or
getattr(msg.user_info, "user_cardname", None) or
getattr(msg.user_info, "user_id", None) or "未知用户")
participant_names.add(sender_name)
# 添加到消息摘要
messages_summary.append({
"sender": sender_name,
"content": content[:200], # 限制长度避免过长
"timestamp": getattr(msg, "time", None)
})
# 提取关键词(简单实现)
content_lower = content.lower()
# 这里可以添加更复杂的关键词提取逻辑
words = [w.strip() for w in content_lower.split() if len(w.strip()) > 1]
all_keywords.update(words[:5]) # 每条消息最多取5个词
except Exception as msg_e:
logger.debug(f"处理未读消息时出错: {msg_e}")
continue
if not messages_summary:
return None
# 构建综合上下文信息
unread_context = {
"messages": messages_summary,
"total_count": len(unread_messages),
"processed_count": len(messages_summary),
"keywords": list(all_keywords)[:20], # 最多20个关键词
"participants": list(participant_names),
"context_summary": self._build_unread_context_summary(messages_summary)
}
logger.debug(f"收集到未读消息上下文: {len(messages_summary)}条消息,{len(all_keywords)}个关键词,{len(participant_names)}个参与者")
return unread_context
except Exception as e:
logger.warning(f"收集未读消息上下文失败: {e}", exc_info=True)
return None
def _build_unread_context_summary(self, messages_summary: List[Dict[str, Any]]) -> str:
"""构建未读消息的文本摘要
Args:
messages_summary: 未读消息摘要列表
Returns:
str: 未读消息的文本摘要
"""
if not messages_summary:
return ""
summary_parts = []
for msg_info in messages_summary:
sender = msg_info.get("sender", "未知")
content = msg_info.get("content", "")
if content:
summary_parts.append(f"{sender}: {content}")
return " | ".join(summary_parts)
async def _resolve_conversation_context(self, fallback_text: str, context: Optional[Dict[str, Any]]) -> str:
"""使用 stream_id 历史消息和相关记忆充实对话文本,默认回退到传入文本"""
if not context:

View File

@@ -281,26 +281,43 @@ class VectorMemoryStorage:
memory_dict = orjson.loads(metadata["memory_data"])
return MemoryChunk.from_dict(memory_dict)
# 兜底:从基础字段重建
# 兜底:从基础字段重建(使用新的结构化格式)
logger.warning(f"未找到memory_data使用兜底逻辑重建记忆 (id={metadata.get('memory_id', 'unknown')})")
# 构建符合MemoryChunk.from_dict期望的结构
memory_dict = {
"memory_id": metadata.get("memory_id", f"recovered_{int(time.time())}"),
"user_id": metadata.get("user_id", "unknown"),
"text_content": document,
"display": document,
"memory_type": metadata.get("memory_type", "general"),
"keywords": orjson.loads(metadata.get("keywords", "[]")),
"importance": metadata.get("importance", 0.5),
"timestamp": metadata.get("timestamp", time.time()),
"access_count": metadata.get("access_count", 0),
"last_access_time": metadata.get("last_access_time", 0),
"confidence": metadata.get("confidence", 0.8),
"metadata": {}
"metadata": {
"memory_id": metadata.get("memory_id", f"recovered_{int(time.time())}"),
"user_id": metadata.get("user_id", "unknown"),
"created_at": metadata.get("timestamp", time.time()),
"last_accessed": metadata.get("last_access_time", time.time()),
"last_modified": metadata.get("timestamp", time.time()),
"access_count": metadata.get("access_count", 0),
"relevance_score": 0.0,
"confidence": int(metadata.get("confidence", 2)), # MEDIUM
"importance": int(metadata.get("importance", 2)), # NORMAL
"source_context": None,
},
"content": {
"subject": "",
"predicate": "",
"object": "",
"display": document # 使用document作为显示文本
},
"memory_type": metadata.get("memory_type", "contextual"),
"keywords": orjson.loads(metadata.get("keywords", "[]")) if isinstance(metadata.get("keywords"), str) else metadata.get("keywords", []),
"tags": [],
"categories": [],
"embedding": None,
"semantic_hash": None,
"related_memories": [],
"temporal_context": None
}
return MemoryChunk.from_dict(memory_dict)
except Exception as e:
logger.warning(f"转换Vector结果到MemoryChunk失败: {e}")
logger.error(f"转换Vector结果到MemoryChunk失败: {e}", exc_info=True)
return None
def _get_from_cache(self, memory_id: str) -> Optional[MemoryChunk]:

View File

@@ -587,15 +587,25 @@ class DefaultReplyer:
# 转换格式以兼容现有代码
running_memories = []
if enhanced_memories:
for memory_chunk in enhanced_memories:
logger.debug(f"[记忆转换] 收到 {len(enhanced_memories)} 条原始记忆")
for idx, memory_chunk in enumerate(enhanced_memories, 1):
# 获取结构化内容的字符串表示
structure_display = str(memory_chunk.content) if hasattr(memory_chunk, 'content') else "unknown"
# 获取记忆内容优先使用display
content = memory_chunk.display or memory_chunk.text_content or ""
# 调试:记录每条记忆的内容获取情况
logger.debug(f"[记忆转换] 第{idx}条: display={repr(memory_chunk.display)[:80]}, text_content={repr(memory_chunk.text_content)[:80]}, final_content={repr(content)[:80]}")
running_memories.append({
"content": memory_chunk.display or memory_chunk.text_content or "",
"content": content,
"memory_type": memory_chunk.memory_type.value,
"confidence": memory_chunk.metadata.confidence.value,
"importance": memory_chunk.metadata.importance.value,
"relevance": getattr(memory_chunk, 'relevance_score', 0.5),
"relevance": getattr(memory_chunk.metadata, 'relevance_score', 0.5),
"source": memory_chunk.metadata.source,
"structure": memory_chunk.content_structure.value if memory_chunk.content_structure else "unknown",
"structure": structure_display,
})
# 构建瞬时记忆字符串
@@ -604,7 +614,7 @@ class DefaultReplyer:
if top_memory:
instant_memory = top_memory[0].get("content", "")
logger.info(f"增强记忆系统检索到 {len(running_memories)} 条记忆")
logger.info(f"增强记忆系统检索到 {len(enhanced_memories)} 条原始记忆,转换为 {len(running_memories)}可用记忆")
except Exception as e:
logger.warning(f"增强记忆系统检索失败: {e}")
@@ -640,10 +650,17 @@ class DefaultReplyer:
# 调试相关度信息
relevance_info = [(m.get('memory_type', 'unknown'), m.get('relevance', 0.0)) for m in sorted_memories]
logger.debug(f"记忆相关度信息: {relevance_info}")
logger.debug(f"[记忆构建] 准备将 {len(sorted_memories)} 条记忆添加到提示词")
for running_memory in sorted_memories:
for idx, running_memory in enumerate(sorted_memories, 1):
content = running_memory.get('content', '')
memory_type = running_memory.get('memory_type', 'unknown')
# 跳过空内容
if not content or not content.strip():
logger.warning(f"[记忆构建] 跳过第 {idx} 条记忆:内容为空 (type={memory_type})")
logger.debug(f"[记忆构建] 空记忆详情: {running_memory}")
continue
# 映射记忆类型到中文标签
type_mapping = {
@@ -661,10 +678,12 @@ class DefaultReplyer:
if "(类型:" in content and "" in content:
clean_content = content.split("(类型:")[0].strip()
logger.debug(f"[记忆构建] 添加第 {idx} 条记忆: [{chinese_type}] {clean_content[:50]}...")
memory_parts.append(f"- **[{chinese_type}]** {clean_content}")
memory_str = "\n".join(memory_parts) + "\n"
has_any_memory = True
logger.debug(f"[记忆构建] 成功构建记忆字符串,包含 {len(memory_parts) - 2} 条记忆")
# 添加瞬时记忆
if instant_memory: