feat(memory): 重构记忆系统检索机制并优化召回率

- 实现五阶段检索流程:元数据过滤→向量搜索→语义重排序→上下文过滤→增强重排序
- 添加回退机制保障检索健壮性,当主检索失败时自动降级到文本匹配
- 优化向量相似度阈值配置,提升记忆召回率
- 新增记忆融合候选收集机制,避免重复记忆存储
- 改进记忆格式化器,支持多种展示格式
- 增强向量存储加载和重建逻辑,确保数据持久化
- 优化记忆存储时机,移至回复生成完成后异步执行
- 添加详细的检索调试日志,便于问题排查
- 简化查询规划器提示模板,提升生成效率
This commit is contained in:
Windpicker-owo
2025-10-01 15:02:38 +08:00
parent 3d59024c3c
commit 99f77135c1
16 changed files with 2058 additions and 203 deletions

View File

@@ -351,7 +351,7 @@ class EnergyManager:
/ total_calculations
)
logger.info(f"聊天流 {stream_id} 最终能量: {final_energy:.3f} (原始: {total_energy:.3f}, 耗时: {calculation_time:.3f}s)")
logger.debug(f"聊天流 {stream_id} 最终能量: {final_energy:.3f} (原始: {total_energy:.3f}, 耗时: {calculation_time:.3f}s)")
return final_energy
def _apply_threshold_adjustment(self, energy: float) -> float:

View File

@@ -104,7 +104,6 @@ class EnhancedMemoryActivator:
response, (reasoning_content, model_name, _) = await self.key_words_model.generate_response_async(
prompt, temperature=0.5
)
keywords = list(get_keywords_from_json(response))
# 更新关键词缓存

View File

@@ -12,6 +12,7 @@ from dataclasses import dataclass
from src.common.logger import get_logger
from src.chat.memory_system.integration_layer import MemoryIntegrationLayer, IntegrationConfig, IntegrationMode
from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType
from src.chat.memory_system.memory_formatter import MemoryFormatter, FormatterConfig, format_memories_for_llm
from src.llm_models.utils_model import LLMRequest
logger = get_logger(__name__)
@@ -189,15 +190,21 @@ class EnhancedMemoryAdapter:
if not memories:
return ""
# 格式化记忆为提示词友好的Markdown结构
lines: List[str] = ["### 🧠 相关记忆 (Relevant Memories)", ""]
# 使用新的记忆格式化器
formatter_config = FormatterConfig(
include_timestamps=True,
include_memory_types=True,
include_confidence=False,
use_emoji_icons=True,
group_by_type=False,
max_display_length=150
)
for memory in memories:
type_label = MEMORY_TYPE_LABELS.get(memory.memory_type, memory.memory_type.value)
display_text = memory.display or memory.text_content
lines.append(f"- **[{type_label}]** {display_text}")
return "\n".join(lines)
return format_memories_for_llm(
memories=memories,
query_context=query,
config=formatter_config
)
async def get_enhanced_memory_summary(self, user_id: str) -> Dict[str, Any]:
"""获取增强记忆系统摘要"""

View File

@@ -22,7 +22,7 @@ from src.chat.memory_system.memory_chunk import MemoryChunk
from src.chat.memory_system.memory_builder import MemoryBuilder, MemoryExtractionError
from src.chat.memory_system.memory_fusion import MemoryFusionEngine
from src.chat.memory_system.vector_storage import VectorStorageManager, VectorStorageConfig
from src.chat.memory_system.metadata_index import MetadataIndexManager
from src.chat.memory_system.metadata_index import MetadataIndexManager, IndexType
from src.chat.memory_system.multi_stage_retrieval import MultiStageRetrieval, RetrievalConfig
from src.chat.memory_system.memory_query_planner import MemoryQueryPlanner
@@ -199,6 +199,22 @@ class EnhancedMemorySystem:
similarity_threshold=self.config.similarity_threshold
)
self.vector_storage = VectorStorageManager(vector_config)
# 尝试加载现有的向量数据
try:
await self.vector_storage.load_storage()
loaded_count = self.vector_storage.storage_stats.get("total_vectors", 0)
logger.info(f"✅ 向量存储数据加载完成,向量数量: {loaded_count}")
# 如果没有加载到向量,尝试重建索引
if loaded_count == 0:
logger.info("向量存储为空,尝试从缓存重建...")
await self._rebuild_vector_storage_if_needed()
except Exception as e:
logger.warning(f"向量存储数据加载失败: {e},将使用空索引")
await self._rebuild_vector_storage_if_needed()
self.metadata_index = MetadataIndexManager()
# 创建检索配置
retrieval_config = RetrievalConfig(
@@ -354,8 +370,12 @@ class EnhancedMemorySystem:
self.status = original_status
return []
# 3. 记忆融合与去重
fused_chunks = await self.fusion_engine.fuse_memories(memory_chunks)
# 3. 记忆融合与去重(包含与历史记忆的融合)
existing_candidates = await self._collect_fusion_candidates(memory_chunks)
fused_chunks = await self.fusion_engine.fuse_memories(
memory_chunks,
existing_candidates
)
# 4. 存储记忆
stored_count = await self._store_memories(fused_chunks)
@@ -375,6 +395,11 @@ class EnhancedMemorySystem:
len(fused_chunks),
stored_count,
build_time,
extra={
"generated_count": len(fused_chunks),
"stored_count": stored_count,
"build_duration_seconds": round(build_time, 4),
},
)
self.status = original_status
@@ -415,6 +440,101 @@ class EnhancedMemorySystem:
f"置信度={memory.metadata.confidence.name} | 内容={text}"
)
async def _collect_fusion_candidates(self, new_memories: List[MemoryChunk]) -> List[MemoryChunk]:
"""收集与新记忆相似的现有记忆,便于融合去重"""
if not new_memories:
return []
candidate_ids: Set[str] = set()
new_memory_ids = {
memory.memory_id
for memory in new_memories
if memory and getattr(memory, "memory_id", None)
}
# 基于指纹的直接匹配
for memory in new_memories:
try:
fingerprint = self._build_memory_fingerprint(memory)
fingerprint_key = self._fingerprint_key(memory.user_id, fingerprint)
existing_id = self._memory_fingerprints.get(fingerprint_key)
if existing_id and existing_id not in new_memory_ids:
candidate_ids.add(existing_id)
except Exception as exc:
logger.debug("构建记忆指纹失败,跳过候选收集: %s", exc)
# 基于主体索引的候选
subject_index = None
if self.metadata_index and hasattr(self.metadata_index, "indices"):
subject_index = self.metadata_index.indices.get(IndexType.SUBJECT)
if subject_index:
for memory in new_memories:
for subject in memory.subjects:
normalized = subject.strip().lower() if isinstance(subject, str) else ""
if not normalized:
continue
subject_candidates = subject_index.get(normalized)
if subject_candidates:
candidate_ids.update(subject_candidates)
# 基于向量搜索的候选
total_vectors = 0
if self.vector_storage and hasattr(self.vector_storage, "storage_stats"):
total_vectors = self.vector_storage.storage_stats.get("total_vectors", 0) or 0
if self.vector_storage and total_vectors > 0:
search_tasks = []
for memory in new_memories:
display_text = (memory.display or "").strip()
if not display_text:
continue
search_tasks.append(
self.vector_storage.search_similar_memories(
query_text=display_text,
limit=8,
scope_id=GLOBAL_MEMORY_SCOPE
)
)
if search_tasks:
search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
similarity_threshold = getattr(
self.fusion_engine,
"similarity_threshold",
self.config.similarity_threshold,
)
min_threshold = max(0.0, min(1.0, similarity_threshold * 0.8))
for result in search_results:
if isinstance(result, Exception):
logger.warning("融合候选向量搜索失败: %s", result)
continue
for memory_id, similarity in result:
if memory_id in new_memory_ids:
continue
if similarity is None or similarity < min_threshold:
continue
candidate_ids.add(memory_id)
existing_candidates: List[MemoryChunk] = []
cache = self.vector_storage.memory_cache if self.vector_storage else {}
for candidate_id in candidate_ids:
if candidate_id in new_memory_ids:
continue
candidate_memory = cache.get(candidate_id)
if candidate_memory:
existing_candidates.append(candidate_memory)
if existing_candidates:
logger.debug(
"融合候选收集完成,新记忆=%d,候选=%d",
len(new_memories),
len(existing_candidates),
)
return existing_candidates
async def process_conversation_memory(
self,
context: Dict[str, Any]
@@ -527,6 +647,29 @@ class EnhancedMemorySystem:
effective_limit = max(1, min(effective_limit, self.config.final_recall_limit))
normalized_context["resolved_query_text"] = resolved_query_text
query_debug_payload = {
"raw_query": raw_query,
"semantic_query": resolved_query_text,
"limit": effective_limit,
"planner_used": planner_ran,
"memory_types": [mt.value for mt in (query_plan.memory_types if query_plan else [])],
"subjects": getattr(query_plan, "subject_includes", []) if query_plan else [],
"objects": getattr(query_plan, "object_includes", []) if query_plan else [],
"recency": getattr(query_plan, "recency_preference", None) if query_plan else None,
"optional_keywords": getattr(query_plan, "optional_keywords", []) if query_plan else [],
}
try:
logger.info(
f"🔍 记忆检索指令 | raw='{raw_query}' | semantic='{resolved_query_text}' | limit={effective_limit}",
extra={"memory_query": query_debug_payload},
)
except Exception:
logger.info(
"🔍 记忆检索指令: %s",
orjson.dumps(query_debug_payload, ensure_ascii=False).decode("utf-8"),
)
if normalized_context.get("__memory_building__"):
logger.debug("当前处于记忆构建流程,跳过查询规划并进行降级检索")
self.status = MemorySystemStatus.BUILDING
@@ -1108,6 +1251,57 @@ class EnhancedMemorySystem:
except Exception as e:
logger.error(f"❌ 记忆系统关闭失败: {e}", exc_info=True)
async def _rebuild_vector_storage_if_needed(self):
"""重建向量存储(如果需要)"""
try:
# 检查是否有记忆缓存数据
if not hasattr(self.vector_storage, 'memory_cache') or not self.vector_storage.memory_cache:
logger.info("无记忆缓存数据,跳过向量存储重建")
return
logger.info(f"开始重建向量存储,记忆数量: {len(self.vector_storage.memory_cache)}")
# 收集需要重建向量的记忆
memories_to_rebuild = []
for memory_id, memory in self.vector_storage.memory_cache.items():
# 检查记忆是否有有效的 display 文本
if memory.display and memory.display.strip():
memories_to_rebuild.append(memory)
elif memory.text_content and memory.text_content.strip():
memories_to_rebuild.append(memory)
if not memories_to_rebuild:
logger.warning("没有找到可重建向量的记忆")
return
logger.info(f"准备为 {len(memories_to_rebuild)} 条记忆重建向量")
# 批量重建向量
batch_size = 10
rebuild_count = 0
for i in range(0, len(memories_to_rebuild), batch_size):
batch = memories_to_rebuild[i:i + batch_size]
try:
await self.vector_storage.store_memories(batch)
rebuild_count += len(batch)
if rebuild_count % 50 == 0:
logger.info(f"已重建向量: {rebuild_count}/{len(memories_to_rebuild)}")
except Exception as e:
logger.error(f"批量重建向量失败: {e}")
continue
# 保存重建的向量存储
await self.vector_storage.save_storage()
final_count = self.vector_storage.storage_stats.get("total_vectors", 0)
logger.info(f"✅ 向量存储重建完成,最终向量数量: {final_count}")
except Exception as e:
logger.error(f"❌ 向量存储重建失败: {e}", exc_info=True)
# 全局记忆系统实例
enhanced_memory_system: EnhancedMemorySystem = None

View File

@@ -0,0 +1,307 @@
# -*- coding: utf-8 -*-
"""
增强重排序器
实现文档设计的多维度评分模型
"""
import math
import time
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass
from enum import Enum
from src.common.logger import get_logger
from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType
logger = get_logger(__name__)
class IntentType(Enum):
"""对话意图类型"""
FACT_QUERY = "fact_query" # 事实查询
EVENT_RECALL = "event_recall" # 事件回忆
PREFERENCE_CHECK = "preference_check" # 偏好检查
GENERAL_CHAT = "general_chat" # 一般对话
UNKNOWN = "unknown" # 未知意图
@dataclass
class ReRankingConfig:
"""重排序配置"""
# 权重配置 (w1 + w2 + w3 + w4 = 1.0)
semantic_weight: float = 0.5 # 语义相似度权重
recency_weight: float = 0.2 # 时效性权重
usage_freq_weight: float = 0.2 # 使用频率权重
type_match_weight: float = 0.1 # 类型匹配权重
# 时效性衰减参数
recency_decay_rate: float = 0.1 # 时效性衰减率 (天)
# 使用频率计算参数
freq_log_base: float = 2.0 # 对数底数
freq_max_score: float = 5.0 # 最大频率得分
# 类型匹配权重映射
type_match_weights: Dict[str, Dict[str, float]] = None
def __post_init__(self):
"""初始化类型匹配权重"""
if self.type_match_weights is None:
self.type_match_weights = {
IntentType.FACT_QUERY.value: {
MemoryType.PERSONAL_FACT.value: 1.0,
MemoryType.KNOWLEDGE.value: 0.8,
MemoryType.PREFERENCE.value: 0.5,
MemoryType.EVENT.value: 0.3,
"default": 0.3
},
IntentType.EVENT_RECALL.value: {
MemoryType.EVENT.value: 1.0,
MemoryType.EXPERIENCE.value: 0.8,
MemoryType.EMOTION.value: 0.6,
MemoryType.PERSONAL_FACT.value: 0.5,
"default": 0.5
},
IntentType.PREFERENCE_CHECK.value: {
MemoryType.PREFERENCE.value: 1.0,
MemoryType.OPINION.value: 0.8,
MemoryType.GOAL.value: 0.6,
MemoryType.PERSONAL_FACT.value: 0.4,
"default": 0.4
},
IntentType.GENERAL_CHAT.value: {
"default": 0.8
},
IntentType.UNKNOWN.value: {
"default": 0.8
}
}
class IntentClassifier:
"""轻量级意图识别器"""
def __init__(self):
# 关键词模式匹配规则
self.patterns = {
IntentType.FACT_QUERY: [
# 中文模式
"我是", "我的", "我叫", "我在", "我住在", "我的职业", "我的工作",
"什么时候", "在哪里", "是什么", "多少", "几岁", "年龄",
# 英文模式
"what is", "where is", "when is", "how old", "my name", "i am", "i live"
],
IntentType.EVENT_RECALL: [
# 中文模式
"记得", "想起", "还记得", "那次", "上次", "之前", "以前", "曾经",
"发生过", "经历", "做过", "去过", "见过",
# 英文模式
"remember", "recall", "last time", "before", "previously", "happened", "experience"
],
IntentType.PREFERENCE_CHECK: [
# 中文模式
"喜欢", "不喜欢", "偏好", "爱好", "兴趣", "讨厌", "最爱", "最喜欢",
"习惯", "通常", "一般", "倾向于", "更喜欢",
# 英文模式
"like", "love", "hate", "prefer", "favorite", "usually", "tend to", "interest"
]
}
def classify_intent(self, query: str, context: Dict[str, Any]) -> IntentType:
"""识别对话意图"""
if not query:
return IntentType.UNKNOWN
query_lower = query.lower()
# 统计各意图的匹配分数
intent_scores = {intent: 0 for intent in IntentType}
for intent, patterns in self.patterns.items():
for pattern in patterns:
if pattern in query_lower:
intent_scores[intent] += 1
# 返回得分最高的意图
max_score = max(intent_scores.values())
if max_score == 0:
return IntentType.GENERAL_CHAT
for intent, score in intent_scores.items():
if score == max_score:
return intent
return IntentType.GENERAL_CHAT
class EnhancedReRanker:
"""增强重排序器 - 实现文档设计的多维度评分模型"""
def __init__(self, config: Optional[ReRankingConfig] = None):
self.config = config or ReRankingConfig()
self.intent_classifier = IntentClassifier()
# 验证权重和为1.0
total_weight = (
self.config.semantic_weight +
self.config.recency_weight +
self.config.usage_freq_weight +
self.config.type_match_weight
)
if abs(total_weight - 1.0) > 0.01:
logger.warning(f"重排序权重和不为1.0: {total_weight}, 将进行归一化")
# 归一化权重
self.config.semantic_weight /= total_weight
self.config.recency_weight /= total_weight
self.config.usage_freq_weight /= total_weight
self.config.type_match_weight /= total_weight
def rerank_memories(
self,
query: str,
candidate_memories: List[Tuple[str, MemoryChunk, float]], # (memory_id, memory, vector_similarity)
context: Dict[str, Any],
limit: int = 10
) -> List[Tuple[str, MemoryChunk, float]]:
"""
对候选记忆进行重排序
Args:
query: 查询文本
candidate_memories: 候选记忆列表 [(memory_id, memory, vector_similarity)]
context: 上下文信息
limit: 返回数量限制
Returns:
重排序后的记忆列表 [(memory_id, memory, final_score)]
"""
if not candidate_memories:
return []
# 识别查询意图
intent = self.intent_classifier.classify_intent(query, context)
logger.debug(f"识别到查询意图: {intent.value}")
# 计算每个候选记忆的最终得分
scored_memories = []
current_time = time.time()
for memory_id, memory, vector_sim in candidate_memories:
try:
# 1. 语义相似度得分 (已归一化到[0,1])
semantic_score = self._normalize_similarity(vector_sim)
# 2. 时效性得分
recency_score = self._calculate_recency_score(memory, current_time)
# 3. 使用频率得分
usage_freq_score = self._calculate_usage_frequency_score(memory)
# 4. 类型匹配得分
type_match_score = self._calculate_type_match_score(memory, intent)
# 计算最终得分
final_score = (
self.config.semantic_weight * semantic_score +
self.config.recency_weight * recency_score +
self.config.usage_freq_weight * usage_freq_score +
self.config.type_match_weight * type_match_score
)
scored_memories.append((memory_id, memory, final_score))
# 记录调试信息
logger.debug(
f"记忆评分 {memory_id[:8]}: semantic={semantic_score:.3f}, "
f"recency={recency_score:.3f}, freq={usage_freq_score:.3f}, "
f"type={type_match_score:.3f}, final={final_score:.3f}"
)
except Exception as e:
logger.error(f"计算记忆 {memory_id} 得分时出错: {e}")
# 使用向量相似度作为后备得分
scored_memories.append((memory_id, memory, vector_sim))
# 按最终得分降序排序
scored_memories.sort(key=lambda x: x[2], reverse=True)
# 返回前N个结果
result = scored_memories[:limit]
highest_score = result[0][2] if result else 0.0
logger.info(
f"重排序完成: 候选={len(candidate_memories)}, 返回={len(result)}, "
f"意图={intent.value}, 最高分={highest_score:.3f}"
)
return result
def _normalize_similarity(self, raw_similarity: float) -> float:
"""归一化相似度到[0,1]区间"""
# 假设原始相似度已经在[-1,1]或[0,1]区间
if raw_similarity < 0:
return (raw_similarity + 1) / 2 # 从[-1,1]映射到[0,1]
return min(1.0, max(0.0, raw_similarity)) # 确保在[0,1]区间
def _calculate_recency_score(self, memory: MemoryChunk, current_time: float) -> float:
"""
计算时效性得分
公式: Recency = 1 / (1 + decay_rate * days_old)
"""
last_accessed = memory.metadata.last_accessed or memory.metadata.created_at
days_old = (current_time - last_accessed) / (24 * 3600) # 转换为天数
if days_old < 0:
days_old = 0 # 处理时间异常
score = 1 / (1 + self.config.recency_decay_rate * days_old)
return min(1.0, max(0.0, score))
def _calculate_usage_frequency_score(self, memory: MemoryChunk) -> float:
"""
计算使用频率得分
公式: Usage_Freq = min(1.0, log2(access_count + 1) / max_score)
"""
access_count = memory.metadata.access_count
if access_count <= 0:
return 0.0
log_count = math.log2(access_count + 1)
score = log_count / self.config.freq_max_score
return min(1.0, max(0.0, score))
def _calculate_type_match_score(self, memory: MemoryChunk, intent: IntentType) -> float:
"""计算类型匹配得分"""
memory_type = memory.memory_type.value
intent_value = intent.value
# 获取对应意图的类型权重映射
type_weights = self.config.type_match_weights.get(intent_value, {})
# 查找具体类型的权重,如果没有则使用默认权重
score = type_weights.get(memory_type, type_weights.get("default", 0.8))
return min(1.0, max(0.0, score))
# 创建默认的重排序器实例
default_reranker = EnhancedReRanker()
def rerank_candidate_memories(
query: str,
candidate_memories: List[Tuple[str, MemoryChunk, float]],
context: Dict[str, Any],
limit: int = 10,
config: Optional[ReRankingConfig] = None
) -> List[Tuple[str, MemoryChunk, float]]:
"""
便捷函数:对候选记忆进行重排序
"""
if config:
reranker = EnhancedReRanker(config)
else:
reranker = default_reranker
return reranker.rerank_memories(query, candidate_memories, context, limit)

View File

@@ -3,11 +3,11 @@
记忆构建模块
从对话流中提取高质量、结构化记忆单元
输出格式要求:
{{
{
"memories": [
{{
{
"type": "记忆类型",
"display": "用于直接展示和检索的自然语言描述",
"display": "一句优雅自然的中文描述,用于直接展示及提示词拼接",
"subject": ["主体1", "主体2"],
"predicate": "谓语(动作/状态)",
"object": "宾语(对象/属性或结构体)",
@@ -15,16 +15,17 @@
"importance": "重要性等级(1-4)",
"confidence": "置信度(1-4)",
"reasoning": "提取理由"
}}
}
]
}}
}
注意:
1. `subject` 可包含多个主体,请用数组表示;若主体不明确,请根据上下文给出最合理的称呼
2. `display` 必须是一句完整畅的中文描述,可直接用于用户展示和向量搜索
3. 只提取确实值得记忆的信息,不要提取琐碎内容
4. 确保信息准确、具体、有价值
5. 重要性: 1=低, 2=一般, 3=高, 4=关键;置信度: 1=低, 2=中等, 3=高, 4=已验证
2. `display` 字段必填,必须是完整畅的自然语言,禁止依赖字符串拼接
3. 主谓宾用于索引和检索结构化信息,提示词构建仅使用 `display`
4. 只提取确实值得记忆的信息,不要提取琐碎内容
5. 确保信息准确、具体、有价值
6. 重要性: 1=低, 2=一般, 3=高, 4=关键;置信度: 1=低, 2=中等, 3=高, 4=已验证
"""
import re
@@ -397,6 +398,7 @@ class MemoryBuilder:
"memories": [
{{
"type": "记忆类型",
"display": "一句自然流畅的中文描述,用于直接展示和提示词构建",
"subject": "主语(通常是用户)",
"predicate": "谓语(动作/状态)",
"object": "宾语(对象/属性)",
@@ -409,11 +411,16 @@ class MemoryBuilder:
}}
注意:
1. 只提取确实值得记忆的信息,不要提取琐碎内容
2. 确保提取的信息准确、具体、有价值
3. 使用主谓宾结构确保信息清晰
4. 重要性等级: 1=低, 2=一般, 3=高, 4=关键
5. 置信度: 1=低, 2=中等, 3=高, 4=已验证
1. `display` 字段必填,必须是完整顺畅的自然语言,禁止依赖字符串拼接
2. **display 字段格式要求**: 使用自然流畅的中文描述,格式示例:
- 用户养了一只名叫Whiskers的猫。
- 用户特别喜欢拿铁咖啡。
- 在2024年5月15日用户提到对新项目感到很有压力。
- 用户认为这个电影很有趣。
3. 主谓宾用于索引和检索,提示词构建仅使用 `display` 的自然语言描述
4. 只提取确实值得记忆的信息,不要提取琐碎内容
5. 确保提取的信息准确、具体、有价值
6. 重要性等级: 1=低, 2=一般, 3=高, 4=关键;置信度: 1=低, 2=中等, 3=高, 4=已验证
## 🚨 时间处理要求(强制):
- **绝对时间优先**:任何涉及时间的记忆都必须使用绝对日期格式
@@ -532,19 +539,38 @@ class MemoryBuilder:
"confidence"
)
predicate_value = mem_data.get("predicate", "")
object_value = mem_data.get("object", "")
display_text = self._sanitize_display_text(mem_data.get("display"))
used_fallback_display = False
if not display_text:
display_text = self._compose_display_text(normalized_subject, predicate_value, object_value)
used_fallback_display = True
memory = create_memory_chunk(
user_id=user_id,
subject=normalized_subject,
predicate=mem_data.get("predicate", ""),
obj=mem_data.get("object", ""),
predicate=predicate_value,
obj=object_value,
memory_type=MemoryType(mem_data.get("type", "contextual")),
chat_id=context.get("chat_id"),
source_context=mem_data.get("reasoning", ""),
importance=importance_level,
confidence=confidence_level,
display=mem_data.get("display")
display=display_text
)
if used_fallback_display:
logger.warning(
"LLM 记忆缺少自然语言 display 字段,已基于主谓宾临时生成描述",
fallback_generated=True,
memory_type=memory.memory_type.value,
subjects=memory.content.to_subject_list(),
predicate=predicate_value,
object_payload=object_value,
)
# 添加关键词
keywords = mem_data.get("keywords", [])
for keyword in keywords:
@@ -755,6 +781,23 @@ class MemoryBuilder:
cleaned = re.sub(r"[、,,;]+$", "", cleaned)
return cleaned
def _sanitize_display_text(self, value: Any) -> str:
if value is None:
return ""
if isinstance(value, (list, dict)):
try:
value = orjson.dumps(value, ensure_ascii=False).decode("utf-8")
except Exception:
value = str(value)
text = str(value).strip()
if not text or text.lower() in {"null", "none", "undefined"}:
return ""
text = re.sub(r"[\s\u3000]+", " ", text)
return text.strip("\n ")
def _looks_like_system_identifier(self, value: str) -> bool:
if not value:
return False

View File

@@ -0,0 +1,337 @@
# -*- coding: utf-8 -*-
"""
记忆格式化器
将召回的记忆转化为LLM友好的Markdown格式
"""
from typing import List, Dict, Any, Optional
from datetime import datetime
from dataclasses import dataclass
from src.common.logger import get_logger
from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType
logger = get_logger(__name__)
@dataclass
class FormatterConfig:
"""格式化器配置"""
include_timestamps: bool = True # 是否包含时间信息
include_memory_types: bool = True # 是否包含记忆类型
include_confidence: bool = False # 是否包含置信度信息
max_display_length: int = 200 # 单条记忆最大显示长度
datetime_format: str = "%Y年%m月%d" # 时间格式
use_emoji_icons: bool = True # 是否使用emoji图标
group_by_type: bool = False # 是否按类型分组
use_bracket_format: bool = False # 是否使用方括号格式 [类型] 内容
compact_format: bool = False # 是否使用紧凑格式
class MemoryFormatter:
"""记忆格式化器 - 将记忆转化为提示词友好的格式"""
# 记忆类型对应的emoji图标
TYPE_EMOJI_MAP = {
MemoryType.PERSONAL_FACT: "👤",
MemoryType.EVENT: "📅",
MemoryType.PREFERENCE: "❤️",
MemoryType.OPINION: "💭",
MemoryType.RELATIONSHIP: "👥",
MemoryType.EMOTION: "😊",
MemoryType.KNOWLEDGE: "📚",
MemoryType.SKILL: "🛠️",
MemoryType.GOAL: "🎯",
MemoryType.EXPERIENCE: "🌟",
MemoryType.CONTEXTUAL: "💬"
}
# 记忆类型的中文标签 - 优化格式
TYPE_LABELS = {
MemoryType.PERSONAL_FACT: "个人事实",
MemoryType.EVENT: "事件",
MemoryType.PREFERENCE: "偏好",
MemoryType.OPINION: "观点",
MemoryType.RELATIONSHIP: "关系",
MemoryType.EMOTION: "情感",
MemoryType.KNOWLEDGE: "知识",
MemoryType.SKILL: "技能",
MemoryType.GOAL: "目标",
MemoryType.EXPERIENCE: "经验",
MemoryType.CONTEXTUAL: "上下文"
}
def __init__(self, config: Optional[FormatterConfig] = None):
self.config = config or FormatterConfig()
def format_memories_for_prompt(
self,
memories: List[MemoryChunk],
query_context: Optional[str] = None
) -> str:
"""
将记忆列表格式化为LLM提示词
Args:
memories: 记忆列表
query_context: 查询上下文(可选)
Returns:
格式化的Markdown文本
"""
if not memories:
return ""
lines = ["## 🧠 相关记忆回顾", ""]
if query_context:
lines.extend([
f"*查询上下文: {query_context}*",
""
])
if self.config.group_by_type:
lines.extend(self._format_memories_by_type(memories))
else:
lines.extend(self._format_memories_chronologically(memories))
return "\n".join(lines)
def _format_memories_by_type(self, memories: List[MemoryChunk]) -> List[str]:
"""按类型分组格式化记忆"""
# 按类型分组
grouped_memories = {}
for memory in memories:
memory_type = memory.memory_type
if memory_type not in grouped_memories:
grouped_memories[memory_type] = []
grouped_memories[memory_type].append(memory)
lines = []
# 为每个类型生成格式化文本
for memory_type, type_memories in grouped_memories.items():
emoji = self.TYPE_EMOJI_MAP.get(memory_type, "📝")
label = self.TYPE_LABELS.get(memory_type, memory_type.value)
lines.extend([
f"### {emoji} {label}",
""
])
for memory in type_memories:
formatted_item = self._format_single_memory(memory, include_type=False)
lines.append(formatted_item)
lines.append("") # 类型间空行
return lines
def _format_memories_chronologically(self, memories: List[MemoryChunk]) -> List[str]:
"""按时间顺序格式化记忆"""
lines = []
for i, memory in enumerate(memories, 1):
formatted_item = self._format_single_memory(memory, include_type=True, index=i)
lines.append(formatted_item)
return lines
def _format_single_memory(
self,
memory: MemoryChunk,
include_type: bool = True,
index: Optional[int] = None
) -> str:
"""格式化单条记忆"""
# 如果启用方括号格式,使用新格式
if self.config.use_bracket_format:
return self._format_single_memory_bracket(memory)
# 获取显示文本
display_text = memory.display or memory.text_content
if len(display_text) > self.config.max_display_length:
display_text = display_text[:self.config.max_display_length - 3] + "..."
# 构建前缀
prefix_parts = []
# 添加序号
if index is not None:
prefix_parts.append(f"{index}.")
# 添加类型标签
if include_type and self.config.include_memory_types:
if self.config.use_emoji_icons:
emoji = self.TYPE_EMOJI_MAP.get(memory.memory_type, "📝")
prefix_parts.append(f"**{emoji}")
else:
label = self.TYPE_LABELS.get(memory.memory_type, memory.memory_type.value)
prefix_parts.append(f"**[{label}]")
# 添加时间信息
if self.config.include_timestamps:
timestamp = memory.metadata.created_at
if timestamp > 0:
dt = datetime.fromtimestamp(timestamp)
time_str = dt.strftime(self.config.datetime_format)
if self.config.use_emoji_icons:
prefix_parts.append(f"{time_str}")
else:
prefix_parts.append(f"({time_str})")
# 添加置信度信息
if self.config.include_confidence:
confidence = memory.metadata.confidence.value
confidence_stars = "" * confidence + "" * (4 - confidence)
prefix_parts.append(f"信度:{confidence_stars}")
# 构建完整格式
if prefix_parts:
if self.config.include_memory_types and self.config.use_emoji_icons:
prefix = " ".join(prefix_parts) + "** "
else:
prefix = " ".join(prefix_parts) + " "
return f"- {prefix}{display_text}"
else:
return f"- {display_text}"
def _format_single_memory_bracket(self, memory: MemoryChunk) -> str:
"""格式化单条记忆 - 使用方括号格式 [类型] 内容"""
# 获取显示文本
display_text = memory.display or memory.text_content
# 如果启用紧凑格式,只显示核心内容
if self.config.compact_format:
if len(display_text) > self.config.max_display_length:
display_text = display_text[:self.config.max_display_length - 3] + "..."
else:
# 非紧凑格式可以包含时间信息
if self.config.include_timestamps:
timestamp = memory.metadata.created_at
if timestamp > 0:
dt = datetime.fromtimestamp(timestamp)
time_str = dt.strftime("%Y年%m月%d")
# 将时间信息自然地整合到内容中
if "" not in display_text and "" not in display_text:
display_text = f"{time_str}{display_text}"
# 获取类型标签
label = self.TYPE_LABELS.get(memory.memory_type, memory.memory_type.value)
# 构建方括号格式: **[类型]** 内容
return f"- **[{label}]** {display_text}"
def format_memory_summary(self, memories: List[MemoryChunk]) -> str:
"""生成记忆摘要统计"""
if not memories:
return "暂无相关记忆。"
# 统计信息
total_count = len(memories)
type_counts = {}
for memory in memories:
memory_type = memory.memory_type
type_counts[memory_type] = type_counts.get(memory_type, 0) + 1
# 生成摘要
lines = [f"**记忆摘要**: 共找到 {total_count} 条相关记忆"]
if len(type_counts) > 1:
type_summaries = []
for memory_type, count in type_counts.items():
emoji = self.TYPE_EMOJI_MAP.get(memory_type, "📝")
label = self.TYPE_LABELS.get(memory_type, memory_type.value)
type_summaries.append(f"{emoji}{label} {count}")
lines.append(f"包括: {', '.join(type_summaries)}")
return " | ".join(lines)
def format_for_debug(self, memories: List[MemoryChunk]) -> str:
"""生成调试格式的记忆列表"""
if not memories:
return "无记忆数据"
lines = ["### 记忆调试信息", ""]
for i, memory in enumerate(memories, 1):
lines.extend([
f"**记忆 {i}** (ID: {memory.memory_id[:8]})",
f"- 类型: {memory.memory_type.value}",
f"- 内容: {memory.display[:100]}{'...' if len(memory.display) > 100 else ''}",
f"- 访问次数: {memory.metadata.access_count}",
f"- 置信度: {memory.metadata.confidence.value}/4",
f"- 重要性: {memory.metadata.importance.value}/4",
f"- 创建时间: {datetime.fromtimestamp(memory.metadata.created_at).strftime('%Y-%m-%d %H:%M')}",
""
])
return "\n".join(lines)
# 创建默认格式化器实例
default_formatter = MemoryFormatter()
def format_memories_for_llm(
memories: List[MemoryChunk],
query_context: Optional[str] = None,
config: Optional[FormatterConfig] = None
) -> str:
"""
便捷函数将记忆格式化为LLM提示词
"""
if config:
formatter = MemoryFormatter(config)
else:
formatter = default_formatter
return formatter.format_memories_for_prompt(memories, query_context)
def format_memory_summary(
memories: List[MemoryChunk],
config: Optional[FormatterConfig] = None
) -> str:
"""
便捷函数:生成记忆摘要
"""
if config:
formatter = MemoryFormatter(config)
else:
formatter = default_formatter
return formatter.format_memory_summary(memories)
def format_memories_bracket_style(
memories: List[MemoryChunk],
query_context: Optional[str] = None,
compact: bool = True,
include_timestamps: bool = True
) -> str:
"""
便捷函数:使用方括号格式格式化记忆
Args:
memories: 记忆列表
query_context: 查询上下文
compact: 是否使用紧凑格式
include_timestamps: 是否包含时间信息
Returns:
格式化的Markdown文本
"""
config = FormatterConfig(
use_bracket_format=True,
compact_format=compact,
include_timestamps=include_timestamps,
include_memory_types=True,
use_emoji_icons=False,
group_by_type=False
)
formatter = MemoryFormatter(config)
return formatter.format_memories_for_prompt(memories, query_context)

View File

@@ -136,25 +136,23 @@ class MemoryQueryPlanner:
persona = context.get("bot_personality") or context.get("bot_identity") or "未知"
return f"""
你是一名记忆检索分析师,将根据对话查询生成结构化的检索计划。
请结合提供的上下文输出一个JSON对象字段含义如下
- semantic_query: 提供给向量检索的自然语言查询,要求清晰具体;
- memory_types: 建议检索的记忆类型数组,取值范围参见 MemoryType 枚举 (personal_fact,event,preference,opinion,relationship,emotion,knowledge,skill,goal,experience,contextual)
- subject_includes: 需要出现在记忆主语中的人或角色列表
- object_includes: 记忆中需要提到的重要对象主题关键词列表
- required_keywords: 检索时必须包含的关键词
- optional_keywords: 可以提升相关性的附加关键词
- owner_filters: 如果需要限制检索所属主体请列出用户ID或其它标识
- recency: 建议的时间偏好,可选 recent/any/historical
- emphasis: 检索策略倾向,可选 precision/recall/balanced
- limit: 推荐的最大返回数量(1-15之间)
- notes: 额外说明,可选。
你是一名记忆检索规划助手,请基于输入生成一个简洁的 JSON 检索计划。
仅需提供以下字段
- semantic_query: 用于向量召回的自然语言描述,要求具体且贴合当前查询
- memory_types: 建议检索的记忆类型列表,取值范围来自 MemoryType 枚举 (personal_fact,event,preference,opinion,relationship,emotion,knowledge,skill,goal,experience,contextual)
- subject_includes: 建议出现在记忆主语中的人或角色;
- object_includes: 建议关注的对象主题关键信息
- recency: 推荐的时间偏好,可选 recent/any/historical
- limit: 推荐的最大返回数量 (1-15)
- notes: 额外补充说明(可选)。
请不要生成谓语字段,也不要额外补充其它参数。
当前查询: "{query_text}"
已知的对话参与者: {participant_preview}
机器人设定: {persona}
请输出符合要求的JSON禁止添加额外说明或Markdown代码块。
直接输出符合要求的 JSON 对象,禁止添加额外文本或 Markdown 代码块。
"""
def _extract_json_payload(self, response: str) -> Optional[str]:

File diff suppressed because it is too large Load Diff

View File

@@ -130,24 +130,32 @@ class VectorStorageManager:
async def generate_query_embedding(self, query_text: str) -> Optional[List[float]]:
"""生成查询向量,用于记忆召回"""
if not query_text:
logger.warning("查询文本为空,无法生成向量")
return None
try:
await self.initialize_embedding_model()
logger.debug(f"开始生成查询向量,文本: '{query_text[:50]}{'...' if len(query_text) > 50 else ''}'")
embedding, _ = await self.embedding_model.get_embedding(query_text)
if not embedding:
logger.warning("嵌入模型返回空向量")
return None
logger.debug(f"生成的向量维度: {len(embedding)}, 期望维度: {self.config.dimension}")
if len(embedding) != self.config.dimension:
logger.warning(
logger.error(
"查询向量维度不匹配: 期望 %d, 实际 %d",
self.config.dimension,
len(embedding)
)
return None
return self._normalize_vector(embedding)
normalized_vector = self._normalize_vector(embedding)
logger.debug(f"查询向量生成成功,向量范围: [{min(normalized_vector):.4f}, {max(normalized_vector):.4f}]")
return normalized_vector
except Exception as exc:
logger.error(f"❌ 生成查询向量失败: {exc}", exc_info=True)
@@ -195,19 +203,39 @@ class VectorStorageManager:
logger.error(f"❌ 向量存储失败: {e}", exc_info=True)
def _prepare_embedding_text(self, memory: MemoryChunk) -> str:
"""准备用于嵌入的文本"""
# 构建包含丰富信息的文本
text_parts = [
memory.text_content,
f"类型: {memory.memory_type.value}",
f"关键词: {', '.join(memory.keywords)}",
f"标签: {', '.join(memory.tags)}"
]
"""准备用于嵌入的文本,仅使用自然语言展示内容"""
display_text = (memory.display or "").strip()
if display_text:
return display_text
if memory.metadata.emotional_context:
text_parts.append(f"情感: {memory.metadata.emotional_context}")
fallback_text = (memory.text_content or "").strip()
if fallback_text:
return fallback_text
return " | ".join(text_parts)
subjects = "".join(s.strip() for s in memory.subjects if s and isinstance(s, str))
predicate = (memory.content.predicate or "").strip()
obj = memory.content.object
if isinstance(obj, dict):
object_parts = []
for key, value in obj.items():
if value is None:
continue
if isinstance(value, (list, tuple)):
preview = "".join(str(item) for item in value[:3])
object_parts.append(f"{key}:{preview}")
else:
object_parts.append(f"{key}:{value}")
object_text = ", ".join(object_parts)
else:
object_text = str(obj or "").strip()
composite_parts = [part for part in [subjects, predicate, object_text] if part]
if composite_parts:
return " ".join(composite_parts)
logger.debug("记忆 %s 缺少可用展示文本,使用占位符生成嵌入输入", memory.memory_id)
return memory.memory_id
async def _batch_generate_and_store_embeddings(self, memory_texts: List[Tuple[str, str]]):
"""批量生成和存储嵌入向量"""
@@ -345,12 +373,16 @@ class VectorStorageManager:
start_time = time.time()
try:
logger.debug(f"开始向量搜索: query_text='{query_text[:30] if query_text else 'None'}', limit={limit}")
if query_vector is None:
if not query_text:
logger.warning("查询向量和查询文本都为空")
return []
query_vector = await self.generate_query_embedding(query_text)
if not query_vector:
logger.warning("查询向量生成失败")
return []
scope_filter: Optional[str] = None
@@ -364,6 +396,25 @@ class VectorStorageManager:
# 规范化查询向量
query_vector = self._normalize_vector(query_vector)
logger.debug(f"查询向量维度: {len(query_vector)}, 存储总向量数: {self.storage_stats['total_vectors']}")
# 检查向量索引状态
if not self.vector_index:
logger.error("向量索引未初始化")
return []
total_vectors = 0
if hasattr(self.vector_index, 'ntotal'):
total_vectors = self.vector_index.ntotal
elif hasattr(self.vector_index, 'vectors'):
total_vectors = len(self.vector_index.vectors)
logger.debug(f"向量索引中实际向量数: {total_vectors}")
if total_vectors == 0:
logger.warning("向量索引为空,无法执行搜索")
return []
# 执行向量搜索
with self._lock:
if hasattr(self.vector_index, 'search'):
@@ -377,31 +428,55 @@ class VectorStorageManager:
# 设置IVF搜索参数
nprobe = min(self.vector_index.nlist, 10)
self.vector_index.nprobe = nprobe
logger.debug(f"IVF搜索参数: nprobe={nprobe}")
distances, indices = self.vector_index.search(query_array, min(limit, self.storage_stats["total_vectors"]))
search_limit = min(limit, total_vectors)
logger.debug(f"执行FAISS搜索搜索限制: {search_limit}")
distances, indices = self.vector_index.search(query_array, search_limit)
distances = distances.flatten().tolist()
indices = indices.flatten().tolist()
logger.debug(f"FAISS搜索结果: {len(distances)} 个距离值, {len(indices)} 个索引")
else:
# 简单索引
logger.debug("使用简单向量索引执行搜索")
results = self.vector_index.search(query_vector, limit)
distances = [score for _, score in results]
indices = [idx for idx, _ in results]
logger.debug(f"简单索引搜索结果: {len(results)} 个结果")
# 处理搜索结果
results = []
valid_results = 0
invalid_indices = 0
filtered_by_scope = 0
for distance, index in zip(distances, indices):
if index == -1: # FAISS的无效索引标记
invalid_indices += 1
continue
memory_id = self.index_to_memory_id.get(index)
if memory_id:
if scope_filter:
memory = self.memory_cache.get(memory_id)
if memory and str(memory.user_id) != scope_filter:
continue
if not memory_id:
logger.debug(f"索引 {index} 没有对应的记忆ID")
invalid_indices += 1
continue
similarity = max(0.0, min(1.0, distance)) # 确保在0-1范围内
results.append((memory_id, similarity))
if scope_filter:
memory = self.memory_cache.get(memory_id)
if memory and str(memory.user_id) != scope_filter:
filtered_by_scope += 1
continue
similarity = max(0.0, min(1.0, distance)) # 确保在0-1范围内
results.append((memory_id, similarity))
valid_results += 1
logger.debug(
f"搜索结果处理: 总距离={len(distances)}, 有效结果={valid_results}, "
f"无效索引={invalid_indices}, 作用域过滤={filtered_by_scope}"
)
# 更新统计
search_time = time.time() - start_time
@@ -411,10 +486,16 @@ class VectorStorageManager:
self.storage_stats["total_searches"]
)
return results[:limit]
final_results = results[:limit]
logger.info(
f"向量搜索完成: 查询='{query_text[:20] if query_text else 'vector'}' "
f"耗时={search_time:.3f}s, 返回={len(final_results)}个结果"
)
return final_results
except Exception as e:
logger.error(f"❌ 向量搜索失败: {e}")
logger.error(f"❌ 向量搜索失败: {e}", exc_info=True)
return []
async def get_memory_by_id(self, memory_id: str) -> Optional[MemoryChunk]:
@@ -601,21 +682,28 @@ class VectorStorageManager:
}
# 加载FAISS索引如果可用
index_loaded = False
if FAISS_AVAILABLE:
index_file = self.storage_path / "vector_index.faiss"
if index_file.exists() and hasattr(self.vector_index, 'load'):
if index_file.exists():
try:
loaded_index = faiss.read_index(str(index_file))
# 如果索引类型匹配,则替换
if type(loaded_index) == type(self.vector_index):
self.vector_index = loaded_index
logger.info("✅ FAISS索引加载完成")
index_loaded = True
logger.info("✅ FAISS索引文件加载完成")
else:
logger.warning("索引类型不匹配,重新构建索引")
await self._rebuild_index()
except Exception as e:
logger.warning(f"加载FAISS索引失败: {e},重新构建")
await self._rebuild_index()
else:
logger.info("FAISS索引文件不存在将重新构建")
# 如果索引没有成功加载且有向量数据,则重建索引
if not index_loaded and self.vector_cache:
logger.info(f"检测到 {len(self.vector_cache)} 个向量缓存,重建索引")
await self._rebuild_index()
# 加载统计信息
stats_file = self.storage_path / "storage_stats.json"
@@ -634,22 +722,69 @@ class VectorStorageManager:
async def _rebuild_index(self):
"""重建向量索引"""
try:
logger.info("正在重建向量索引...")
logger.info(f"正在重建向量索引...向量数量: {len(self.vector_cache)}")
# 重新初始化索引
self._initialize_index()
# 重新添加所有向量
for memory_id, embedding in self.vector_cache.items():
if embedding:
memory = self.memory_cache.get(memory_id)
if memory:
await self._add_single_memory(memory, embedding)
# 清空映射关系
self.memory_id_to_index.clear()
self.index_to_memory_id.clear()
logger.info("✅ 向量索引重建完成")
if not self.vector_cache:
logger.warning("没有向量缓存数据,跳过重建")
return
# 准备向量数据
memory_ids = []
vectors = []
for memory_id, embedding in self.vector_cache.items():
if embedding and len(embedding) == self.config.dimension:
memory_ids.append(memory_id)
vectors.append(self._normalize_vector(embedding))
else:
logger.debug(f"跳过无效向量: {memory_id}, 维度: {len(embedding) if embedding else 0}")
if not vectors:
logger.warning("没有有效的向量数据")
return
logger.info(f"准备重建 {len(vectors)} 个向量到索引")
# 批量添加向量到FAISS索引
if hasattr(self.vector_index, 'add'):
# FAISS索引
vector_array = np.array(vectors, dtype='float32')
# 特殊处理IVF索引
if self.config.index_type == "ivf" and hasattr(self.vector_index, 'train'):
logger.info("训练IVF索引...")
self.vector_index.train(vector_array)
# 添加向量
self.vector_index.add(vector_array)
# 重建映射关系
for i, memory_id in enumerate(memory_ids):
self.memory_id_to_index[memory_id] = i
self.index_to_memory_id[i] = memory_id
else:
# 简单索引
for i, (memory_id, vector) in enumerate(zip(memory_ids, vectors)):
index_id = self.vector_index.add_vector(vector)
self.memory_id_to_index[memory_id] = index_id
self.index_to_memory_id[index_id] = memory_id
# 更新统计
self.storage_stats["total_vectors"] = len(self.memory_id_to_index)
final_count = getattr(self.vector_index, 'ntotal', len(self.memory_id_to_index))
logger.info(f"✅ 向量索引重建完成,索引中向量数: {final_count}")
except Exception as e:
logger.error(f"❌ 重建向量索引失败: {e}")
logger.error(f"❌ 重建向量索引失败: {e}", exc_info=True)
async def optimize_storage(self):
"""优化存储"""

View File

@@ -60,7 +60,7 @@ class SingleStreamContextManager:
self.last_access_time = time.time()
# 启动流的循环任务(如果还未启动)
await stream_loop_manager.start_stream_loop(self.stream_id)
logger.info(f"添加消息到单流上下文: {self.stream_id} (兴趣度待计算)")
logger.info(f"添加消息{message.processed_plain_text}到单流上下文: {self.stream_id}")
return True
except Exception as e:
logger.error(f"添加消息到单流上下文失败 {self.stream_id}: {e}", exc_info=True)

View File

@@ -329,6 +329,13 @@ class DefaultReplyer:
logger.error(f"LLM 生成失败: {llm_e}")
return False, None, prompt # LLM 调用失败则无法生成回复
# 回复生成成功后,异步存储聊天记忆(不阻塞返回)
try:
await self._store_chat_memory_async(reply_to, reply_message)
except Exception as memory_e:
# 记忆存储失败不应该影响回复生成的成功返回
logger.warning(f"记忆存储失败,但不影响回复生成: {memory_e}")
return True, llm_response, prompt
except UserWarning as uw:
@@ -571,15 +578,7 @@ class DefaultReplyer:
context=memory_context
)
# 异步存储聊天历史(非阻塞)
asyncio.create_task(
remember_message(
message=chat_history,
user_id=memory_user_id,
chat_id=stream.stream_id,
context=memory_context
)
)
# 注意:记忆存储已迁移到回复生成完成后进行,不在查询阶段执行
# 转换格式以兼容现有代码
running_memories = []
@@ -1676,6 +1675,143 @@ class DefaultReplyer:
logger.error(f"获取AFC关系信息失败: {e}")
return f"你与{sender}是普通朋友关系。"
async def _store_chat_memory_async(self, reply_to: str, reply_message: Optional[Dict[str, Any]] = None):
"""
异步存储聊天记忆从build_memory_block迁移而来
Args:
reply_to: 回复对象
reply_message: 回复的原始消息
"""
try:
if not global_config.memory.enable_memory or not global_config.memory.enable_instant_memory:
return
# 使用增强记忆系统存储记忆
from src.chat.memory_system.enhanced_memory_integration import remember_message
stream = self.chat_stream
user_info_obj = getattr(stream, "user_info", None)
group_info_obj = getattr(stream, "group_info", None)
memory_user_id = str(stream.stream_id)
memory_user_display = None
memory_aliases = []
user_info_dict = {}
if user_info_obj is not None:
raw_user_id = getattr(user_info_obj, "user_id", None)
if raw_user_id:
memory_user_id = str(raw_user_id)
if hasattr(user_info_obj, "to_dict"):
try:
user_info_dict = user_info_obj.to_dict() # type: ignore[attr-defined]
except Exception:
user_info_dict = {}
candidate_keys = [
"user_cardname",
"user_nickname",
"nickname",
"remark",
"display_name",
"user_name",
]
for key in candidate_keys:
value = user_info_dict.get(key)
if isinstance(value, str) and value.strip():
stripped = value.strip()
if memory_user_display is None:
memory_user_display = stripped
elif stripped not in memory_aliases:
memory_aliases.append(stripped)
attr_keys = [
"user_cardname",
"user_nickname",
"nickname",
"remark",
"display_name",
"name",
]
for attr in attr_keys:
value = getattr(user_info_obj, attr, None)
if isinstance(value, str) and value.strip():
stripped = value.strip()
if memory_user_display is None:
memory_user_display = stripped
elif stripped not in memory_aliases:
memory_aliases.append(stripped)
alias_values = (
user_info_dict.get("aliases")
or user_info_dict.get("alias_names")
or user_info_dict.get("alias")
)
if isinstance(alias_values, (list, tuple, set)):
for alias in alias_values:
if isinstance(alias, str) and alias.strip():
stripped = alias.strip()
if stripped not in memory_aliases and stripped != memory_user_display:
memory_aliases.append(stripped)
memory_context = {
"user_id": memory_user_id,
"user_display_name": memory_user_display or "",
"user_name": memory_user_display or "",
"nickname": memory_user_display or "",
"sender_name": memory_user_display or "",
"platform": getattr(stream, "platform", None),
"chat_id": stream.stream_id,
"stream_id": stream.stream_id,
}
if memory_aliases:
memory_context["user_aliases"] = memory_aliases
if group_info_obj is not None:
group_name = getattr(group_info_obj, "group_name", None) or getattr(group_info_obj, "group_nickname", None)
if group_name:
memory_context["group_name"] = str(group_name)
group_id = getattr(group_info_obj, "group_id", None)
if group_id:
memory_context["group_id"] = str(group_id)
memory_context = {key: value for key, value in memory_context.items() if value}
# 构建聊天历史用于存储
message_list_before_short = await get_raw_msg_before_timestamp_with_chat(
chat_id=stream.stream_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.33),
)
chat_history = await build_readable_messages(
message_list_before_short,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
show_actions=True,
)
# 异步存储聊天历史(完全非阻塞)
asyncio.create_task(
remember_message(
message=chat_history,
user_id=memory_user_id,
chat_id=stream.stream_id,
context=memory_context
)
)
logger.debug(f"已启动记忆存储任务,用户: {memory_user_display or memory_user_id}")
except Exception as e:
logger.error(f"存储聊天记忆失败: {e}")
def weighted_sample_no_replacement(items, weights, k) -> list:
"""

View File

@@ -339,6 +339,7 @@ class Prompt:
pre_built_params["relation_info_block"] = self.parameters.relation_info_block
if self.parameters.memory_block:
pre_built_params["memory_block"] = self.parameters.memory_block
logger.debug("使用预构建的memory_block跳过实时构建")
if self.parameters.tool_info_block:
pre_built_params["tool_info_block"] = self.parameters.tool_info_block
if self.parameters.knowledge_prompt:
@@ -351,8 +352,11 @@ class Prompt:
tasks.append(self._build_expression_habits())
task_names.append("expression_habits")
# 记忆块应该在回复前预构建,这里优先使用预构建的结果
if self.parameters.enable_memory and not pre_built_params.get("memory_block"):
tasks.append(self._build_memory_block())
# 如果没有预构建的记忆块,则快速构建一个简化版本
logger.debug("memory_block未预构建执行快速构建作为后备方案")
tasks.append(self._build_memory_block_fast())
task_names.append("memory_block")
if self.parameters.enable_relation and not pre_built_params.get("relation_info_block"):
@@ -373,7 +377,7 @@ class Prompt:
# 性能优化 - 为不同任务设置不同的超时时间
task_timeouts = {
"memory_block": 15.0, # 记忆系统
"memory_block": 15.0, # 记忆系统 - 降低超时时间,鼓励预构建
"tool_info": 15.0, # 工具信息
"relation_info": 10.0, # 关系信息
"knowledge_info": 10.0, # 知识库查询
@@ -575,21 +579,42 @@ class Prompt:
instant_memory = None
# 构建记忆块
memory_parts = []
existing_contents = set()
if running_memories:
memory_parts.append("以下是当前在聊天中,你回忆起的记忆:")
for memory in running_memories:
content = memory["content"]
memory_parts.append(f"- {content}")
existing_contents.add(content)
try:
# 使用记忆格式化器进行格式化
from src.chat.memory_system.memory_formatter import format_memories_bracket_style
# 转换记忆数据格式
formatted_memories = []
for memory in running_memories:
formatted_memories.append({
"display": memory.get("display", memory.get("content", "")),
"memory_type": memory.get("memory_type", "personal_fact"),
"metadata": memory.get("metadata", {})
})
# 使用方括号格式格式化记忆
memory_block = format_memories_bracket_style(
formatted_memories,
query_context=self.parameters.target
)
except Exception as e:
logger.warning(f"记忆格式化失败,使用简化格式: {e}")
# 备用简化格式
memory_parts = ["以下是当前在聊天中,你回忆起的记忆:"]
for memory in running_memories:
content = memory["content"]
memory_parts.append(f"- {content}")
memory_block = "\n".join(memory_parts)
else:
memory_block = ""
# 添加即时记忆
if instant_memory:
if instant_memory not in existing_contents:
memory_parts.append(f"- 最相关记忆:{instant_memory}")
memory_block = "\n".join(memory_parts) if memory_parts else ""
if memory_block:
memory_block += f"\n- 最相关记忆:{instant_memory}"
else:
memory_block = f"- 最相关记忆:{instant_memory}"
return {"memory_block": memory_block}
@@ -597,6 +622,30 @@ class Prompt:
logger.error(f"构建记忆块失败: {e}")
return {"memory_block": ""}
async def _build_memory_block_fast(self) -> Dict[str, Any]:
"""快速构建记忆块(简化版本,用于未预构建时的后备方案)"""
if not global_config.memory.enable_memory:
return {"memory_block": ""}
try:
from src.chat.memory_system.enhanced_memory_activator import enhanced_memory_activator
# 简化的快速查询,只获取即时记忆
instant_memory = await enhanced_memory_activator.get_instant_memory(
target_message=self.parameters.target, chat_id=self.parameters.chat_id
)
if instant_memory:
memory_block = f"- 相关记忆:{instant_memory}"
else:
memory_block = ""
return {"memory_block": memory_block}
except Exception as e:
logger.warning(f"快速构建记忆块失败: {e}")
return {"memory_block": ""}
async def _build_relation_info(self) -> Dict[str, Any]:
"""构建关系信息"""
try:

View File

@@ -76,59 +76,78 @@ class MainSystem:
def signal_handler(signum, frame):
logger.info("收到退出信号,正在优雅关闭系统...")
self._cleanup()
sys.exit(0)
import asyncio
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# 如果事件循环正在运行,创建任务并设置回调
async def cleanup_and_exit():
await self._async_cleanup()
sys.exit(0)
task = asyncio.create_task(cleanup_and_exit())
# 添加任务完成回调,确保程序退出
task.add_done_callback(lambda t: None)
else:
# 如果事件循环未运行,使用同步清理
self._cleanup()
sys.exit(0)
except Exception as e:
logger.error(f"信号处理失败: {e}")
sys.exit(1)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def _cleanup(self):
"""清理资源"""
async def _async_cleanup(self):
"""异步清理资源"""
try:
# 停止消息管理器
from src.chat.message_manager import message_manager
import asyncio
try:
from src.chat.message_manager import message_manager
await message_manager.stop()
logger.info("🛑 消息管理器已停止")
except Exception as e:
logger.error(f"停止消息管理器时出错: {e}")
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(message_manager.stop())
else:
loop.run_until_complete(message_manager.stop())
logger.info("🛑 消息管理器已停止")
except Exception as e:
logger.error(f"停止消息管理器时出错: {e}")
try:
# 停止消息重组器
from src.plugin_system.core.event_manager import event_manager
from src.plugin_system import EventType
asyncio.run(event_manager.trigger_event(EventType.ON_STOP, permission_group="SYSTEM"))
try:
from src.plugin_system.core.event_manager import event_manager
from src.plugin_system import EventType
from src.utils.message_chunker import reassembler
from src.utils.message_chunker import reassembler
await event_manager.trigger_event(EventType.ON_STOP, permission_group="SYSTEM")
await reassembler.stop_cleanup_task()
logger.info("🛑 消息重组器已停止")
except Exception as e:
logger.error(f"停止消息重组器时出错: {e}")
# 停止增强记忆系统
try:
if global_config.memory.enable_memory:
await self.enhanced_memory_manager.shutdown()
logger.info("🛑 增强记忆系统已停止")
except Exception as e:
logger.error(f"停止增强记忆系统时出错: {e}")
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(reassembler.stop_cleanup_task())
else:
loop.run_until_complete(reassembler.stop_cleanup_task())
logger.info("🛑 消息重组器已停止")
except Exception as e:
logger.error(f"停止消息重组器时出错: {e}")
logger.error(f"异步清理资源时出错: {e}")
def _cleanup(self):
"""同步清理资源(向后兼容)"""
import asyncio
try:
# 停止增强记忆系统
if global_config.memory.enable_memory:
import asyncio
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(self.enhanced_memory_manager.shutdown())
else:
loop.run_until_complete(self.enhanced_memory_manager.shutdown())
logger.info("🛑 增强记忆系统已停止")
loop = asyncio.get_event_loop()
if loop.is_running():
# 如果循环正在运行,创建异步清理任务
asyncio.create_task(self._async_cleanup())
else:
# 如果循环未运行,直接运行异步清理
loop.run_until_complete(self._async_cleanup())
except Exception as e:
logger.error(f"停止增强记忆系统时出错: {e}")
logger.error(f"同步清理资源时出错: {e}")
async def _message_process_wrapper(self, message_data: Dict[str, Any]):
"""并行处理消息的包装器"""

View File

@@ -77,7 +77,7 @@ class AffinityChatter(BaseChatter):
# 执行动作(如果规划器返回了动作)
execution_result = {"executed_count": len(actions) if actions else 0}
if actions:
logger.info(f"聊天流 {self.stream_id} 生成了 {len(actions)} 个动作")
logger.debug(f"聊天流 {self.stream_id} 生成了 {len(actions)} 个动作")
# 更新统计
self.stats["messages_processed"] += 1

View File

@@ -263,9 +263,9 @@ enhanced_memory_auto_save = true # 是否自动保存增强记忆
min_memory_length = 10 # 最小记忆长度
max_memory_length = 500 # 最大记忆长度
memory_value_threshold = 0.7 # 记忆价值阈值,低于该值的记忆会被丢弃
vector_similarity_threshold = 0.8 # 向量相似度阈值
semantic_similarity_threshold = 0.6 # 语义重排阶段的最低匹配阈值
memory_value_threshold = 0.5 # 记忆价值阈值,低于该值的记忆会被丢弃
vector_similarity_threshold = 0.4 # 向量相似度阈值
semantic_similarity_threshold = 0.4 # 语义重排阶段的最低匹配阈值
metadata_filter_limit = 100 # 元数据过滤阶段返回数量上限
vector_search_limit = 50 # 向量搜索阶段返回数量上限
@@ -277,7 +277,7 @@ semantic_weight = 0.3 # 综合评分中语义匹配的权重
context_weight = 0.2 # 综合评分中上下文关联的权重
recency_weight = 0.1 # 综合评分中时效性的权重
fusion_similarity_threshold = 0.85 # 记忆融合时的相似度阈值
fusion_similarity_threshold = 0.6 # 记忆融合时的相似度阈值
deduplication_window_hours = 24 # 记忆去重窗口(小时)
enable_memory_cache = true # 是否启用本地记忆缓存