Merge branch 'feature/memory-graph-system' of https://github.com/MoFox-Studio/MoFox_Bot into feature/memory-graph-system

This commit is contained in:
tt-P607
2025-11-06 23:56:59 +08:00
9 changed files with 1454 additions and 389 deletions

View File

@@ -182,12 +182,14 @@ class MessageStorageBatcher:
is_command = message.is_command or False
is_public_notice = message.is_public_notice or False
notice_type = message.notice_type
actions = message.actions
# 序列化actions列表为JSON字符串
actions = orjson.dumps(message.actions).decode("utf-8") if message.actions else None
should_reply = message.should_reply
should_act = message.should_act
additional_config = message.additional_config
key_words = ""
key_words_lite = ""
# 确保关键词字段是字符串格式(如果不是,则序列化)
key_words = MessageStorage._serialize_keywords(message.key_words)
key_words_lite = MessageStorage._serialize_keywords(message.key_words_lite)
memorized_times = 0
user_platform = message.user_info.platform if message.user_info else ""
@@ -254,7 +256,8 @@ class MessageStorageBatcher:
is_command = message.is_command
is_public_notice = getattr(message, "is_public_notice", False)
notice_type = getattr(message, "notice_type", None)
actions = getattr(message, "actions", None)
# 序列化actions列表为JSON字符串
actions = orjson.dumps(getattr(message, "actions", None)).decode("utf-8") if getattr(message, "actions", None) else None
should_reply = getattr(message, "should_reply", None)
should_act = getattr(message, "should_act", None)
additional_config = getattr(message, "additional_config", None)
@@ -580,6 +583,11 @@ class MessageStorage:
is_picid = False
is_notify = False
is_command = False
is_public_notice = False
notice_type = None
actions = None
should_reply = False
should_act = False
key_words = ""
key_words_lite = ""
else:
@@ -593,6 +601,12 @@ class MessageStorage:
is_picid = message.is_picid
is_notify = message.is_notify
is_command = message.is_command
is_public_notice = getattr(message, "is_public_notice", False)
notice_type = getattr(message, "notice_type", None)
# 序列化actions列表为JSON字符串
actions = orjson.dumps(getattr(message, "actions", None)).decode("utf-8") if getattr(message, "actions", None) else None
should_reply = getattr(message, "should_reply", False)
should_act = getattr(message, "should_act", False)
# 序列化关键词列表为JSON字符串
key_words = MessageStorage._serialize_keywords(message.key_words)
key_words_lite = MessageStorage._serialize_keywords(message.key_words_lite)
@@ -666,6 +680,11 @@ class MessageStorage:
is_picid=is_picid,
is_notify=is_notify,
is_command=is_command,
is_public_notice=is_public_notice,
notice_type=notice_type,
actions=actions,
should_reply=should_reply,
should_act=should_act,
key_words=key_words,
key_words_lite=key_words_lite,
)

View File

@@ -401,6 +401,66 @@ class MemoryConfig(ValidatedConfigBase):
memory_system_load_balancing: bool = Field(default=True, description="启用记忆系统负载均衡")
memory_build_throttling: bool = Field(default=True, description="启用记忆构建节流")
memory_priority_queue_enabled: bool = Field(default=True, description="启用记忆优先级队列")
# === 记忆图系统配置 (Memory Graph System) ===
# 新一代记忆系统的配置项
enable: bool = Field(default=True, description="启用记忆图系统")
data_dir: str = Field(default="data/memory_graph", description="记忆数据存储目录")
# 向量存储配置
vector_collection_name: str = Field(default="memory_nodes", description="向量集合名称")
vector_db_path: str = Field(default="data/memory_graph/chroma_db", description="向量数据库路径")
# 检索配置
search_top_k: int = Field(default=10, description="默认检索返回数量")
search_min_importance: float = Field(default=0.3, description="最小重要性阈值")
search_similarity_threshold: float = Field(default=0.5, description="向量相似度阈值")
search_max_expand_depth: int = Field(default=2, description="检索时图扩展深度0-3")
search_expand_semantic_threshold: float = Field(default=0.3, description="图扩展时语义相似度阈值建议0.3-0.5,过低可能引入无关记忆,过高无法扩展)")
enable_query_optimization: bool = Field(default=True, description="启用查询优化")
# 检索权重配置 (记忆图系统)
search_vector_weight: float = Field(default=0.4, description="向量相似度权重")
search_graph_distance_weight: float = Field(default=0.2, description="图距离权重")
search_importance_weight: float = Field(default=0.2, description="重要性权重")
search_recency_weight: float = Field(default=0.2, description="时效性权重")
# 记忆整合配置
consolidation_enabled: bool = Field(default=False, description="是否启用记忆整合")
consolidation_interval_hours: float = Field(default=2.0, description="整合任务执行间隔(小时)")
consolidation_deduplication_threshold: float = Field(default=0.93, description="相似记忆去重阈值")
consolidation_time_window_hours: float = Field(default=2.0, description="整合时间窗口(小时)- 统一用于去重和关联")
consolidation_max_batch_size: int = Field(default=30, description="单次最多处理的记忆数量")
# 记忆关联配置(整合功能的子模块)
consolidation_linking_enabled: bool = Field(default=True, description="是否启用记忆关联建立")
consolidation_linking_max_candidates: int = Field(default=10, description="每个记忆最多关联的候选数")
consolidation_linking_max_memories: int = Field(default=20, description="单次最多处理的记忆总数")
consolidation_linking_min_importance: float = Field(default=0.5, description="最低重要性阈值")
consolidation_linking_pre_filter_threshold: float = Field(default=0.7, description="向量相似度预筛选阈值")
consolidation_linking_max_pairs_for_llm: int = Field(default=5, description="最多发送给LLM分析的候选对数")
consolidation_linking_min_confidence: float = Field(default=0.7, description="LLM分析最低置信度阈值")
consolidation_linking_llm_temperature: float = Field(default=0.2, description="LLM分析温度参数")
consolidation_linking_llm_max_tokens: int = Field(default=1500, description="LLM分析最大输出长度")
# 遗忘配置 (记忆图系统)
forgetting_enabled: bool = Field(default=True, description="是否启用自动遗忘")
forgetting_activation_threshold: float = Field(default=0.1, description="激活度阈值")
forgetting_min_importance: float = Field(default=0.8, description="最小保护重要性")
# 激活配置
activation_decay_rate: float = Field(default=0.9, description="激活度衰减率")
activation_propagation_strength: float = Field(default=0.5, description="激活传播强度")
activation_propagation_depth: int = Field(default=2, description="激活传播深度")
# 性能配置
max_memory_nodes_per_memory: int = Field(default=10, description="每个记忆最多包含的节点数")
max_related_memories: int = Field(default=5, description="相关记忆最大数量")
# 节点去重合并配置
node_merger_similarity_threshold: float = Field(default=0.85, description="节点去重相似度阈值")
node_merger_context_match_required: bool = Field(default=True, description="节点合并是否要求上下文匹配")
node_merger_merge_batch_size: int = Field(default=50, description="节点合并批量处理大小")
class MoodConfig(ValidatedConfigBase):

View File

@@ -1,272 +0,0 @@
"""
记忆图系统配置管理
"""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Optional
@dataclass
class ConsolidationConfig:
"""记忆整理配置"""
interval_hours: int = 6 # 整理间隔(小时)
batch_size: int = 100 # 每次处理记忆数量
enable_auto_discovery: bool = True # 是否启用自动关联发现
enable_conflict_detection: bool = True # 是否启用冲突检测
@dataclass
class RetrievalConfig:
"""记忆检索配置"""
default_mode: str = "auto" # auto/fast/deep
max_expand_depth: int = 2 # 最大图扩展深度
vector_weight: float = 0.4 # 向量相似度权重
graph_distance_weight: float = 0.2 # 图距离权重
importance_weight: float = 0.2 # 重要性权重
recency_weight: float = 0.2 # 时效性权重
def __post_init__(self):
"""验证权重总和"""
total = self.vector_weight + self.graph_distance_weight + self.importance_weight + self.recency_weight
if abs(total - 1.0) > 0.01:
raise ValueError(f"权重总和必须为1.0,当前为 {total}")
@dataclass
class NodeMergerConfig:
"""节点去重配置"""
similarity_threshold: float = 0.85 # 相似度阈值
context_match_required: bool = True # 是否要求上下文匹配
merge_batch_size: int = 50 # 批量处理大小
def __post_init__(self):
"""验证阈值范围"""
if not 0.0 <= self.similarity_threshold <= 1.0:
raise ValueError(f"相似度阈值必须在 [0, 1] 范围内,当前为 {self.similarity_threshold}")
@dataclass
class StorageConfig:
"""存储配置"""
data_dir: Path = field(default_factory=lambda: Path("data/memory_graph"))
vector_collection_name: str = "memory_nodes"
graph_file_name: str = "memory_graph.json"
enable_persistence: bool = True # 是否启用持久化
auto_save_interval: int = 300 # 自动保存间隔(秒)
@dataclass
class MemoryGraphConfig:
"""记忆图系统总配置"""
# 基础配置
enable: bool = True # 是否启用记忆图系统
data_dir: Path = field(default_factory=lambda: Path("data/memory_graph"))
# 向量存储配置
vector_collection_name: str = "memory_nodes"
vector_db_path: Path = field(default_factory=lambda: Path("data/memory_graph/chroma_db"))
# 检索配置
search_top_k: int = 10
search_min_importance: float = 0.3
search_similarity_threshold: float = 0.5
enable_query_optimization: bool = True
# 整合配置
consolidation_enabled: bool = True
consolidation_interval_hours: float = 1.0
consolidation_similarity_threshold: float = 0.85
consolidation_time_window_hours: int = 24
# 自动关联配置
auto_link_enabled: bool = True # 是否启用自动关联
auto_link_max_candidates: int = 5 # 每个记忆最多关联候选数
auto_link_min_confidence: float = 0.7 # 最低置信度阈值
# 遗忘配置
forgetting_enabled: bool = True
forgetting_activation_threshold: float = 0.1
forgetting_min_importance: float = 0.8
# 激活配置
activation_decay_rate: float = 0.9
activation_propagation_strength: float = 0.5
activation_propagation_depth: int = 1
# 性能配置
max_memory_nodes_per_memory: int = 10
max_related_memories: int = 5
# 旧配置(向后兼容)
consolidation: ConsolidationConfig = field(default_factory=ConsolidationConfig)
retrieval: RetrievalConfig = field(default_factory=RetrievalConfig)
node_merger: NodeMergerConfig = field(default_factory=NodeMergerConfig)
storage: StorageConfig = field(default_factory=StorageConfig)
# 时间衰减配置
decay_rates: Dict[str, float] = field(
default_factory=lambda: {
"EVENT": 0.05, # 事件衰减较快
"FACT": 0.01, # 事实衰减慢
"RELATION": 0.005, # 关系衰减很慢
"OPINION": 0.03, # 观点中等衰减
}
)
# 嵌入模型配置
embedding_model: Optional[str] = None # 如果为None则使用系统默认
embedding_dimension: int = 384 # 默认使用 sentence-transformers 的维度
# 调试和日志
enable_debug_logging: bool = False
enable_visualization: bool = False # 是否启用记忆可视化
@classmethod
def from_bot_config(cls, bot_config) -> MemoryGraphConfig:
"""从bot_config加载配置"""
try:
# 尝试获取配置优先使用memory兼容memory_graph
if hasattr(bot_config, 'memory') and bot_config.memory is not None:
mg_config = bot_config.memory
elif hasattr(bot_config, 'memory_graph'):
mg_config = bot_config.memory_graph
config = cls(
enable=getattr(mg_config, 'enable', True),
data_dir=Path(getattr(mg_config, 'data_dir', 'data/memory_graph')),
vector_collection_name=getattr(mg_config, 'vector_collection_name', 'memory_nodes'),
vector_db_path=Path(getattr(mg_config, 'vector_db_path', 'data/memory_graph/chroma_db')),
search_top_k=getattr(mg_config, 'search_top_k', 10),
search_min_importance=getattr(mg_config, 'search_min_importance', 0.3),
search_similarity_threshold=getattr(mg_config, 'search_similarity_threshold', 0.5),
enable_query_optimization=getattr(mg_config, 'enable_query_optimization', True),
consolidation_enabled=getattr(mg_config, 'consolidation_enabled', True),
consolidation_interval_hours=getattr(mg_config, 'consolidation_interval_hours', 1.0),
consolidation_similarity_threshold=getattr(mg_config, 'consolidation_similarity_threshold', 0.85),
consolidation_time_window_hours=getattr(mg_config, 'consolidation_time_window_hours', 24),
auto_link_enabled=getattr(mg_config, 'auto_link_enabled', True),
auto_link_max_candidates=getattr(mg_config, 'auto_link_max_candidates', 5),
auto_link_min_confidence=getattr(mg_config, 'auto_link_min_confidence', 0.7),
forgetting_enabled=getattr(mg_config, 'forgetting_enabled', True),
forgetting_activation_threshold=getattr(mg_config, 'forgetting_activation_threshold', 0.1),
forgetting_min_importance=getattr(mg_config, 'forgetting_min_importance', 0.8),
activation_decay_rate=getattr(mg_config, 'activation_decay_rate', 0.9),
activation_propagation_strength=getattr(mg_config, 'activation_propagation_strength', 0.5),
activation_propagation_depth=getattr(mg_config, 'activation_propagation_depth', 1),
max_memory_nodes_per_memory=getattr(mg_config, 'max_memory_nodes_per_memory', 10),
max_related_memories=getattr(mg_config, 'max_related_memories', 5),
# 检索配置
retrieval=RetrievalConfig(
max_expand_depth=getattr(mg_config, 'search_max_expand_depth', 2),
vector_weight=getattr(mg_config, 'search_vector_weight', 0.4),
graph_distance_weight=getattr(mg_config, 'search_graph_distance_weight', 0.2),
importance_weight=getattr(mg_config, 'search_importance_weight', 0.2),
recency_weight=getattr(mg_config, 'search_recency_weight', 0.2),
),
)
return config
else:
# 没有找到memory_graph配置使用默认值
return cls()
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.warning(f"从bot_config加载memory_graph配置失败使用默认配置: {e}")
return cls()
@classmethod
def from_dict(cls, config_dict: Dict) -> MemoryGraphConfig:
"""从字典创建配置"""
return cls(
# 新配置字段
enable=config_dict.get("enable", True),
data_dir=Path(config_dict.get("data_dir", "data/memory_graph")),
vector_collection_name=config_dict.get("vector_collection_name", "memory_nodes"),
vector_db_path=Path(config_dict.get("vector_db_path", "data/memory_graph/chroma_db")),
search_top_k=config_dict.get("search_top_k", 10),
search_min_importance=config_dict.get("search_min_importance", 0.3),
search_similarity_threshold=config_dict.get("search_similarity_threshold", 0.5),
enable_query_optimization=config_dict.get("enable_query_optimization", True),
consolidation_enabled=config_dict.get("consolidation_enabled", True),
consolidation_interval_hours=config_dict.get("consolidation_interval_hours", 1.0),
consolidation_similarity_threshold=config_dict.get("consolidation_similarity_threshold", 0.85),
consolidation_time_window_hours=config_dict.get("consolidation_time_window_hours", 24),
auto_link_enabled=config_dict.get("auto_link_enabled", True),
auto_link_max_candidates=config_dict.get("auto_link_max_candidates", 5),
auto_link_min_confidence=config_dict.get("auto_link_min_confidence", 0.7),
forgetting_enabled=config_dict.get("forgetting_enabled", True),
forgetting_activation_threshold=config_dict.get("forgetting_activation_threshold", 0.1),
forgetting_min_importance=config_dict.get("forgetting_min_importance", 0.8),
activation_decay_rate=config_dict.get("activation_decay_rate", 0.9),
activation_propagation_strength=config_dict.get("activation_propagation_strength", 0.5),
activation_propagation_depth=config_dict.get("activation_propagation_depth", 1),
max_memory_nodes_per_memory=config_dict.get("max_memory_nodes_per_memory", 10),
max_related_memories=config_dict.get("max_related_memories", 5),
# 旧配置字段(向后兼容)
consolidation=ConsolidationConfig(**config_dict.get("consolidation", {})),
retrieval=RetrievalConfig(
max_expand_depth=config_dict.get("search_max_expand_depth", 2),
vector_weight=config_dict.get("search_vector_weight", 0.4),
graph_distance_weight=config_dict.get("search_graph_distance_weight", 0.2),
importance_weight=config_dict.get("search_importance_weight", 0.2),
recency_weight=config_dict.get("search_recency_weight", 0.2),
**config_dict.get("retrieval", {})
),
node_merger=NodeMergerConfig(**config_dict.get("node_merger", {})),
storage=StorageConfig(**config_dict.get("storage", {})),
decay_rates=config_dict.get("decay_rates", cls().decay_rates),
embedding_model=config_dict.get("embedding_model"),
embedding_dimension=config_dict.get("embedding_dimension", 384),
enable_debug_logging=config_dict.get("enable_debug_logging", False),
enable_visualization=config_dict.get("enable_visualization", False),
)
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"consolidation": {
"interval_hours": self.consolidation.interval_hours,
"batch_size": self.consolidation.batch_size,
"enable_auto_discovery": self.consolidation.enable_auto_discovery,
"enable_conflict_detection": self.consolidation.enable_conflict_detection,
},
"retrieval": {
"default_mode": self.retrieval.default_mode,
"max_expand_depth": self.retrieval.max_expand_depth,
"vector_weight": self.retrieval.vector_weight,
"graph_distance_weight": self.retrieval.graph_distance_weight,
"importance_weight": self.retrieval.importance_weight,
"recency_weight": self.retrieval.recency_weight,
},
"node_merger": {
"similarity_threshold": self.node_merger.similarity_threshold,
"context_match_required": self.node_merger.context_match_required,
"merge_batch_size": self.node_merger.merge_batch_size,
},
"storage": {
"data_dir": str(self.storage.data_dir),
"vector_collection_name": self.storage.vector_collection_name,
"graph_file_name": self.storage.graph_file_name,
"enable_persistence": self.storage.enable_persistence,
"auto_save_interval": self.storage.auto_save_interval,
},
"decay_rates": self.decay_rates,
"embedding_model": self.embedding_model,
"embedding_dimension": self.embedding_dimension,
"enable_debug_logging": self.enable_debug_logging,
"enable_visualization": self.enable_visualization,
}
# 默认配置实例
DEFAULT_CONFIG = MemoryGraphConfig()

View File

@@ -4,12 +4,13 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Optional, Tuple
import numpy as np
from src.common.logger import get_logger
from src.memory_graph.config import NodeMergerConfig
from src.config.official_configs import MemoryConfig
from src.memory_graph.models import MemoryNode, NodeType
from src.memory_graph.storage.graph_store import GraphStore
from src.memory_graph.storage.vector_store import VectorStore
@@ -31,7 +32,7 @@ class NodeMerger:
self,
vector_store: VectorStore,
graph_store: GraphStore,
config: Optional[NodeMergerConfig] = None,
config: MemoryConfig,
):
"""
初始化节点合并器
@@ -39,15 +40,15 @@ class NodeMerger:
Args:
vector_store: 向量存储
graph_store: 图存储
config: 配置对象
config: 记忆配置对象
"""
self.vector_store = vector_store
self.graph_store = graph_store
self.config = config or NodeMergerConfig()
self.config = config
logger.info(
f"初始化节点合并器: threshold={self.config.similarity_threshold}, "
f"context_match={self.config.context_match_required}"
f"初始化节点合并器: threshold={self.config.node_merger_similarity_threshold}, "
f"context_match={self.config.node_merger_context_match_required}"
)
async def find_similar_nodes(
@@ -71,7 +72,7 @@ class NodeMerger:
logger.warning(f"节点 {node.id} 没有 embedding无法查找相似节点")
return []
threshold = threshold or self.config.similarity_threshold
threshold = threshold or self.config.node_merger_similarity_threshold
try:
# 在向量存储中搜索相似节点
@@ -121,7 +122,7 @@ class NodeMerger:
是否应该合并
"""
# 1. 检查相似度阈值
if similarity < self.config.similarity_threshold:
if similarity < self.config.node_merger_similarity_threshold:
return False
# 2. 非常高的相似度(>0.95)直接合并
@@ -130,7 +131,7 @@ class NodeMerger:
return True
# 3. 如果不要求上下文匹配,则通过相似度判断
if not self.config.context_match_required:
if not self.config.node_merger_context_match_required:
return True
# 4. 检查上下文匹配

View File

@@ -15,6 +15,7 @@ from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
from src.config.config import global_config
from src.config.official_configs import MemoryConfig
from src.memory_graph.core.builder import MemoryBuilder
from src.memory_graph.core.extractor import MemoryExtractor
from src.memory_graph.models import Memory, MemoryEdge, MemoryNode, MemoryType, NodeType, EdgeType
@@ -24,6 +25,10 @@ from src.memory_graph.storage.vector_store import VectorStore
from src.memory_graph.tools.memory_tools import MemoryTools
from src.memory_graph.utils.embeddings import EmbeddingGenerator
import uuid
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import numpy as np
logger = logging.getLogger(__name__)
@@ -53,7 +58,7 @@ class MemoryManager:
if not global_config.memory or not getattr(global_config.memory, 'enable', False):
raise ValueError("记忆系统未启用,请在配置文件中启用 [memory] enable = true")
self.config = global_config.memory
self.config: MemoryConfig = global_config.memory
self.data_dir = data_dir or Path(getattr(self.config, 'data_dir', 'data/memory_graph'))
# 存储组件
@@ -132,12 +137,18 @@ class MemoryManager:
embedding_generator=self.embedding_generator,
)
# 检查配置值
expand_depth = self.config.search_max_expand_depth
expand_semantic_threshold = self.config.search_expand_semantic_threshold
logger.info(f"📊 配置检查: search_max_expand_depth={expand_depth}, search_expand_semantic_threshold={expand_semantic_threshold}")
self.tools = MemoryTools(
vector_store=self.vector_store,
graph_store=self.graph_store,
persistence_manager=self.persistence,
embedding_generator=self.embedding_generator,
max_expand_depth=getattr(self.config, 'search_max_expand_depth', 1), # 从配置读取默认深度
max_expand_depth=expand_depth, # 从配置读取图扩展深度
expand_semantic_threshold=expand_semantic_threshold, # 从配置读取图扩展语义阈值
)
self._initialized = True
@@ -433,7 +444,7 @@ class MemoryManager:
min_importance: float = 0.0,
include_forgotten: bool = False,
use_multi_query: bool = True,
expand_depth: int = 1,
expand_depth: int | None = None,
context: Optional[Dict[str, Any]] = None,
) -> List[Memory]:
"""
@@ -468,7 +479,7 @@ class MemoryManager:
"query": query,
"top_k": top_k,
"use_multi_query": use_multi_query,
"expand_depth": expand_depth, # 传递图扩展深度
"expand_depth": expand_depth or global_config.memory.search_max_expand_depth, # 传递图扩展深度
"context": context,
}
@@ -967,108 +978,232 @@ class MemoryManager:
async def consolidate_memories(
self,
similarity_threshold: float = 0.85,
time_window_hours: int = 24,
time_window_hours: float = 24.0,
max_batch_size: int = 50,
) -> Dict[str, Any]:
"""
整理记忆:合并相似记忆
整理记忆:直接合并去重相似记忆(不创建新边)
性能优化版本:
1. 使用 asyncio.create_task 在后台执行,避免阻塞主流程
2. 向量计算批量处理,减少重复计算
3. 延迟保存,批量写入数据库
4. 更频繁的协作式多任务让出
Args:
similarity_threshold: 相似度阈值
similarity_threshold: 相似度阈值默认0.85建议提高到0.9减少误判)
time_window_hours: 时间窗口(小时)
max_batch_size: 单次最多处理的记忆数量
Returns:
整理结果
整理结果(如果是异步执行,返回启动状态)
"""
if not self._initialized:
await self.initialize()
try:
logger.info(f"开始记忆整理 (similarity_threshold={similarity_threshold}, time_window={time_window_hours}h)...")
logger.info(f"🚀 启动记忆整理任务 (similarity_threshold={similarity_threshold}, time_window={time_window_hours}h, max_batch={max_batch_size})...")
# 创建后台任务执行整理
task = asyncio.create_task(
self._consolidate_memories_background(
similarity_threshold=similarity_threshold,
time_window_hours=time_window_hours,
max_batch_size=max_batch_size
)
)
# 返回任务启动状态,不等待完成
return {
"task_started": True,
"task_id": id(task),
"message": "记忆整理任务已在后台启动"
}
except Exception as e:
logger.error(f"启动记忆整理任务失败: {e}", exc_info=True)
return {"error": str(e), "task_started": False}
async def _consolidate_memories_background(
self,
similarity_threshold: float,
time_window_hours: float,
max_batch_size: int,
) -> None:
"""
后台执行记忆整理的具体实现
这个方法会在独立任务中运行,不阻塞主流程
"""
try:
result = {
"merged_count": 0,
"checked_count": 0,
"skipped_count": 0,
}
# 获取最近创建的记忆
cutoff_time = datetime.now() - timedelta(hours=time_window_hours)
all_memories = self.graph_store.get_all_memories()
recent_memories = [
mem for mem in all_memories
if mem.created_at >= cutoff_time and not mem.metadata.get("forgotten", False)
]
if not recent_memories:
logger.info("没有需要整理的记忆")
return result
logger.info(f"找到 {len(recent_memories)} 条待整理记忆")
logger.info("✅ 记忆整理完成: 没有需要整理的记忆")
return
# 限制批量处理数量
if len(recent_memories) > max_batch_size:
logger.info(f"📊 记忆数量 {len(recent_memories)} 超过批量限制 {max_batch_size},仅处理最新的 {max_batch_size}")
recent_memories = sorted(recent_memories, key=lambda m: m.created_at, reverse=True)[:max_batch_size]
result["skipped_count"] = len(all_memories) - max_batch_size
logger.info(f"📋 找到 {len(recent_memories)} 条待整理记忆")
result["checked_count"] = len(recent_memories)
# 按记忆类型分组
# 按记忆类型分组,减少跨类型比较
memories_by_type: Dict[str, List[Memory]] = {}
for mem in recent_memories:
mem_type = mem.metadata.get("memory_type", "")
if mem_type not in memories_by_type:
memories_by_type[mem_type] = []
memories_by_type[mem_type].append(mem)
# 记录需要删除的记忆,延迟批量删除
to_delete: List[Tuple[Memory, str]] = [] # (memory, reason)
deleted_ids = set()
# 对每个类型的记忆进行相似度检测
for mem_type, memories in memories_by_type.items():
if len(memories) < 2:
continue
logger.debug(f"检查类型 '{mem_type}'{len(memories)} 条记忆")
# 使用向量相似度检测
for i in range(len(memories)):
for j in range(i + 1, len(memories)):
mem_i = memories[i]
mem_j = memories[j]
# 获取主题节点的向量
topic_i = next((n for n in mem_i.nodes if n.node_type == NodeType.TOPIC), None)
topic_j = next((n for n in mem_j.nodes if n.node_type == NodeType.TOPIC), None)
if not topic_i or not topic_j:
logger.debug(f"🔍 检查类型 '{mem_type}'{len(memories)} 条记忆")
# 预提取所有主题节点的嵌入向量
embeddings_map: Dict[str, "np.ndarray"] = {}
valid_memories = []
for mem in memories:
topic_node = next((n for n in mem.nodes if n.node_type == NodeType.TOPIC), None)
if topic_node and topic_node.embedding is not None:
embeddings_map[mem.id] = topic_node.embedding
valid_memories.append(mem)
# 批量计算相似度矩阵(比逐个计算更高效)
import numpy as np
for i in range(len(valid_memories)):
# 更频繁的协作式多任务让出
if i % 5 == 0:
await asyncio.sleep(0.001) # 1ms让出
mem_i = valid_memories[i]
if mem_i.id in deleted_ids:
continue
for j in range(i + 1, len(valid_memories)):
if valid_memories[j].id in deleted_ids:
continue
if topic_i.embedding is None or topic_j.embedding is None:
continue
# 计算余弦相似度
import numpy as np
similarity = np.dot(topic_i.embedding, topic_j.embedding) / (
np.linalg.norm(topic_i.embedding) * np.linalg.norm(topic_j.embedding)
)
mem_j = valid_memories[j]
# 快速向量相似度计算
embedding_i = embeddings_map[mem_i.id]
embedding_j = embeddings_map[mem_j.id]
# 优化的余弦相似度计算
similarity = self._fast_cosine_similarity(embedding_i, embedding_j)
if similarity >= similarity_threshold:
# 合并记忆:保留重要性高的,删除另一个
# 决定保留哪个记忆
if mem_i.importance >= mem_j.importance:
keep_mem, remove_mem = mem_i, mem_j
else:
keep_mem, remove_mem = mem_j, mem_i
logger.info(
f"合并相似记忆 (similarity={similarity:.3f}): "
f"保留 {keep_mem.id}, 删除 {remove_mem.id}"
logger.debug(
f"🔄 标记相似记忆 (similarity={similarity:.3f}): "
f"保留 {keep_mem.id[:8]}, 删除 {remove_mem.id[:8]}"
)
# 增保留记忆的重要性
keep_mem.importance = min(1.0, keep_mem.importance + 0.1)
keep_mem.activation = min(1.0, keep_mem.activation + 0.1)
# 删除相似记忆
await self.delete_memory(remove_mem.id)
# 增保留记忆的重要性
keep_mem.importance = min(1.0, keep_mem.importance + 0.05)
# 累加访问次数
if hasattr(keep_mem, 'access_count') and hasattr(remove_mem, 'access_count'):
keep_mem.access_count += remove_mem.access_count
# 标记为待删除(不立即删除)
to_delete.append((remove_mem, f"与记忆 {keep_mem.id[:8]} 相似度 {similarity:.3f}"))
deleted_ids.add(remove_mem.id)
result["merged_count"] += 1
logger.info(f"记忆整理完成: {result}")
return result
# 每处理完一个类型就让出控制权
await asyncio.sleep(0.005) # 5ms让出
# 批量删除标记的记忆
if to_delete:
logger.info(f"🗑️ 开始批量删除 {len(to_delete)} 条相似记忆")
for memory, reason in to_delete:
try:
# 从向量存储删除节点
for node in memory.nodes:
if node.embedding is not None:
await self.vector_store.delete_node(node.id)
# 从图存储删除记忆
self.graph_store.remove_memory(memory.id)
except Exception as e:
logger.warning(f"删除记忆 {memory.id[:8]} 失败: {e}")
# 批量保存一次性写入减少I/O
await self.persistence.save_graph_store(self.graph_store)
logger.info(f"💾 批量保存完成")
logger.info(f"✅ 记忆整理完成: {result}")
except Exception as e:
logger.error(f"记忆整理失败: {e}", exc_info=True)
return {"error": str(e), "merged_count": 0, "checked_count": 0}
logger.error(f"记忆整理失败: {e}", exc_info=True)
def _fast_cosine_similarity(self, vec1: "np.ndarray", vec2: "np.ndarray") -> float:
"""
快速余弦相似度计算(优化版本)
Args:
vec1, vec2: 向量
Returns:
余弦相似度
"""
try:
import numpy as np
# 避免重复的类型检查和转换
# 向量应该是numpy数组如果不是转换一次
if not isinstance(vec1, np.ndarray):
vec1 = np.asarray(vec1, dtype=np.float32)
if not isinstance(vec2, np.ndarray):
vec2 = np.asarray(vec2, dtype=np.float32)
# 使用更高效的范数计算
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
# 直接计算点积和除法
return float(np.dot(vec1, vec2) / (norm1 * norm2))
except Exception as e:
logger.warning(f"计算余弦相似度失败: {e}")
return 0.0
async def auto_link_memories(
self,
@@ -1441,14 +1576,14 @@ class MemoryManager:
async def maintenance(self) -> Dict[str, Any]:
"""
执行维护任务
执行维护任务(优化版本)
包括:
- 记忆整理(合并相似记忆
- 清理过期记忆
- 记忆整理(异步后台执行
- 自动关联记忆(轻量级执行)
- 自动遗忘低激活度记忆
- 保存数据
Returns:
维护结果
"""
@@ -1456,49 +1591,355 @@ class MemoryManager:
await self.initialize()
try:
logger.info("开始执行记忆系统维护...")
logger.info("🔧 开始执行记忆系统维护(优化版)...")
result = {
"consolidated": 0,
"consolidation_task": "none",
"linked": 0,
"forgotten": 0,
"deleted": 0,
"saved": False,
"total_time": 0,
}
# 1. 记忆整理(合并相似记忆)
start_time = datetime.now()
# 1. 记忆整理(异步后台执行,不阻塞主流程)
if getattr(self.config, 'consolidation_enabled', False):
logger.info("🚀 启动异步记忆整理任务...")
consolidate_result = await self.consolidate_memories(
similarity_threshold=getattr(self.config, 'consolidation_similarity_threshold', 0.9),
time_window_hours=getattr(self.config, 'consolidation_time_window_hours', 24.0)
similarity_threshold=getattr(self.config, 'consolidation_deduplication_threshold', 0.93),
time_window_hours=getattr(self.config, 'consolidation_time_window_hours', 2.0), # 统一时间窗口
max_batch_size=getattr(self.config, 'consolidation_max_batch_size', 30)
)
result["consolidated"] = consolidate_result.get("merged_count", 0)
# 2. 自动关联记忆(发现和建立关系)
if getattr(self.config, 'auto_link_enabled', True):
link_result = await self.auto_link_memories()
if consolidate_result.get("task_started"):
result["consolidation_task"] = f"background_task_{consolidate_result.get('task_id', 'unknown')}"
logger.info("✅ 记忆整理任务已启动到后台执行")
else:
result["consolidation_task"] = "failed"
logger.warning("❌ 记忆整理任务启动失败")
# 2. 自动关联记忆(使用统一的时间窗口)
if getattr(self.config, 'consolidation_linking_enabled', True):
logger.info("🔗 执行轻量级自动关联...")
link_result = await self._lightweight_auto_link_memories()
result["linked"] = link_result.get("linked_count", 0)
# 3. 自动遗忘
# 3. 自动遗忘(快速执行)
if getattr(self.config, 'forgetting_enabled', True):
logger.info("🗑️ 执行自动遗忘...")
forgotten_count = await self.auto_forget_memories(
threshold=getattr(self.config, 'forgetting_activation_threshold', 0.1)
)
result["forgotten"] = forgotten_count
# 4. 清理非常旧的已遗忘记忆(可选
# TODO: 实现清理逻辑
# 5. 保存数据
await self.persistence.save_graph_store(self.graph_store)
result["saved"] = True
# 4. 保存数据(如果记忆整理不在后台执行
if result["consolidation_task"] == "none":
await self.persistence.save_graph_store(self.graph_store)
result["saved"] = True
logger.info("💾 数据保存完成")
self._last_maintenance = datetime.now()
logger.info(f"维护完成: {result}")
# 计算维护耗时
total_time = (datetime.now() - start_time).total_seconds()
result["total_time"] = total_time
logger.info(f"✅ 维护完成 (耗时 {total_time:.2f}s): {result}")
return result
except Exception as e:
logger.error(f"维护失败: {e}", exc_info=True)
return {"error": str(e)}
logger.error(f"维护失败: {e}", exc_info=True)
return {"error": str(e), "total_time": 0}
async def _lightweight_auto_link_memories(
self,
time_window_hours: float = None, # 从配置读取
max_candidates: int = None, # 从配置读取
max_memories: int = None, # 从配置读取
) -> Dict[str, Any]:
"""
智能轻量级自动关联记忆保留LLM判断优化性能
优化策略:
1. 从配置读取处理参数,尊重用户设置
2. 使用向量相似度预筛选仅对高相似度记忆调用LLM
3. 批量LLM调用减少网络开销
4. 异步执行,避免阻塞
"""
try:
result = {
"checked_count": 0,
"linked_count": 0,
"llm_calls": 0,
}
# 从配置读取参数,使用统一的时间窗口
if time_window_hours is None:
time_window_hours = getattr(self.config, 'consolidation_time_window_hours', 2.0)
if max_candidates is None:
max_candidates = getattr(self.config, 'consolidation_linking_max_candidates', 10)
if max_memories is None:
max_memories = getattr(self.config, 'consolidation_linking_max_memories', 20)
# 获取用户配置时间窗口内的记忆
time_threshold = datetime.now() - timedelta(hours=time_window_hours)
all_memories = self.graph_store.get_all_memories()
recent_memories = [
mem for mem in all_memories
if mem.created_at >= time_threshold
and not mem.metadata.get("forgotten", False)
and mem.importance >= getattr(self.config, 'consolidation_linking_min_importance', 0.5) # 从配置读取重要性阈值
]
if len(recent_memories) > max_memories:
recent_memories = sorted(recent_memories, key=lambda m: m.created_at, reverse=True)[:max_memories]
if len(recent_memories) < 2:
logger.debug("记忆数量不足,跳过智能关联")
return result
logger.debug(f"🧠 智能关联: 检查 {len(recent_memories)} 条重要记忆")
# 第一步:向量相似度预筛选,找到潜在关联对
candidate_pairs = []
import numpy as np
for i, memory in enumerate(recent_memories):
# 获取主题节点
topic_node = next(
(n for n in memory.nodes if n.node_type == NodeType.TOPIC),
None
)
if not topic_node or topic_node.embedding is None:
continue
# 与其他记忆计算相似度
for j, other_memory in enumerate(recent_memories[i+1:], i+1):
other_topic = next(
(n for n in other_memory.nodes if n.node_type == NodeType.TOPIC),
None
)
if not other_topic or other_topic.embedding is None:
continue
# 快速相似度计算
similarity = self._fast_cosine_similarity(
topic_node.embedding,
other_topic.embedding
)
# 使用配置的预筛选阈值
pre_filter_threshold = getattr(self.config, 'consolidation_linking_pre_filter_threshold', 0.7)
if similarity >= pre_filter_threshold:
candidate_pairs.append((memory, other_memory, similarity))
# 让出控制权
if i % 3 == 0:
await asyncio.sleep(0.001)
logger.debug(f"🔍 预筛选找到 {len(candidate_pairs)} 个候选关联对")
if not candidate_pairs:
return result
# 第二步批量LLM分析使用配置的最大候选对数
max_pairs_for_llm = getattr(self.config, 'consolidation_linking_max_pairs_for_llm', 5)
if len(candidate_pairs) <= max_pairs_for_llm:
link_relations = await self._batch_analyze_memory_relations(candidate_pairs)
result["llm_calls"] = 1
# 第三步建立LLM确认的关联
for relation_info in link_relations:
try:
memory_a, memory_b = relation_info["memory_pair"]
relation_type = relation_info["relation_type"]
confidence = relation_info["confidence"]
# 创建关联边
edge = MemoryEdge(
id=f"smart_edge_{uuid.uuid4().hex[:12]}",
source_id=memory_a.subject_id,
target_id=memory_b.subject_id,
relation=relation_type,
edge_type=EdgeType.RELATION,
importance=confidence,
metadata={
"auto_linked": True,
"method": "llm_analyzed",
"vector_similarity": relation_info.get("vector_similarity", 0.0),
"confidence": confidence,
"reasoning": relation_info.get("reasoning", ""),
"created_at": datetime.now().isoformat(),
}
)
# 添加到图
self.graph_store.graph.add_edge(
edge.source_id,
edge.target_id,
edge_id=edge.id,
relation=edge.relation,
edge_type=edge.edge_type.value,
importance=edge.importance,
metadata=edge.metadata,
)
memory_a.edges.append(edge)
result["linked_count"] += 1
logger.debug(f"🧠 智能关联: {memory_a.id[:8]} --[{relation_type}]--> {memory_b.id[:8]} (置信度={confidence:.2f})")
except Exception as e:
logger.warning(f"建立智能关联失败: {e}")
continue
# 保存关联结果
if result["linked_count"] > 0:
await self.persistence.save_graph_store(self.graph_store)
logger.debug(f"✅ 智能关联完成: 建立了 {result['linked_count']} 个关联LLM调用 {result['llm_calls']}")
return result
except Exception as e:
logger.error(f"智能关联失败: {e}", exc_info=True)
return {"error": str(e), "checked_count": 0, "linked_count": 0}
async def _batch_analyze_memory_relations(
self,
candidate_pairs: List[Tuple[Memory, Memory, float]]
) -> List[Dict[str, Any]]:
"""
批量分析记忆关系优化LLM调用
Args:
candidate_pairs: 候选记忆对列表,每项包含 (memory_a, memory_b, vector_similarity)
Returns:
关系分析结果列表
"""
try:
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
llm = LLMRequest(
model_set=model_config.model_task_config.utils_small,
request_type="memory.batch_relation_analysis"
)
# 格式化所有候选记忆对
candidates_text = ""
for i, (mem_a, mem_b, similarity) in enumerate(candidate_pairs):
desc_a = self._format_memory_for_llm(mem_a)
desc_b = self._format_memory_for_llm(mem_b)
candidates_text += f"""
候选对 {i+1}:
记忆A: {desc_a}
记忆B: {desc_b}
向量相似度: {similarity:.3f}
"""
# 构建批量分析提示词(使用配置的置信度阈值)
min_confidence = getattr(self.config, 'consolidation_linking_min_confidence', 0.7)
prompt = f"""你是记忆关系分析专家。请批量分析以下候选记忆对之间的关系。
**关系类型说明:**
- 导致: A的发生导致了B的发生因果关系
- 引用: A提到或涉及B引用关系
- 相似: A和B描述相似的内容相似关系
- 相反: A和B表达相反的观点对立关系
- 关联: A和B存在某种关联但不属于以上类型一般关联
**候选记忆对:**
{candidates_text}
**任务要求:**
1. 对每个候选对,判断是否存在有意义的关系
2. 如果存在关系,指定关系类型和置信度(0.0-1.0)
3. 简要说明判断理由
4. 只返回置信度 >= {min_confidence} 的关系
5. 优先考虑因果、引用等强关系,谨慎建立相似关系
**输出格式JSON**
```json
[
{{
"candidate_id": 1,
"has_relation": true,
"relation_type": "导致",
"confidence": 0.85,
"reasoning": "记忆A描述的原因导致记忆B的结果"
}},
{{
"candidate_id": 2,
"has_relation": false,
"reasoning": "两者无明显关联"
}}
]
```
请分析并输出JSON结果"""
# 调用LLM使用配置的参数
llm_temperature = getattr(self.config, 'consolidation_linking_llm_temperature', 0.2)
llm_max_tokens = getattr(self.config, 'consolidation_linking_llm_max_tokens', 1500)
response, _ = await llm.generate_response_async(
prompt,
temperature=llm_temperature,
max_tokens=llm_max_tokens,
)
# 解析响应
import json
import re
# 提取JSON
json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
json_str = response.strip()
try:
analysis_results = json.loads(json_str)
except json.JSONDecodeError:
logger.warning(f"LLM返回格式错误尝试修复: {response[:200]}")
# 尝试简单修复
json_str = re.sub(r'[\r\n\t]', '', json_str)
analysis_results = json.loads(json_str)
# 转换为结果格式
relations = []
for result in analysis_results:
if not result.get("has_relation", False):
continue
confidence = result.get("confidence", 0.0)
if confidence < min_confidence: # 使用配置的置信度阈值
continue
candidate_id = result.get("candidate_id", 0) - 1
if 0 <= candidate_id < len(candidate_pairs):
mem_a, mem_b, vector_similarity = candidate_pairs[candidate_id]
relations.append({
"memory_pair": (mem_a, mem_b),
"relation_type": result.get("relation_type", "关联"),
"confidence": confidence,
"reasoning": result.get("reasoning", ""),
"vector_similarity": vector_similarity,
})
logger.debug(f"🧠 LLM批量分析完成: 发现 {len(relations)} 个关系")
return relations
except Exception as e:
logger.error(f"LLM批量关系分析失败: {e}", exc_info=True)
return []
async def start_maintenance_scheduler(self) -> None:
"""

View File

@@ -35,22 +35,27 @@ class MemoryTools:
persistence_manager: PersistenceManager,
embedding_generator: Optional[EmbeddingGenerator] = None,
max_expand_depth: int = 1,
expand_semantic_threshold: float = 0.3,
):
"""
初始化工具集
Args:
vector_store: 向量存储
graph_store: 图存储
persistence_manager: 持久化管理器
embedding_generator: 嵌入生成器(可选)
max_expand_depth: 图扩展深度的默认值(从配置读取)
expand_semantic_threshold: 图扩展时语义相似度阈值(从配置读取)
"""
self.vector_store = vector_store
self.graph_store = graph_store
self.persistence_manager = persistence_manager
self._initialized = False
self.max_expand_depth = max_expand_depth # 保存配置的默认值
self.expand_semantic_threshold = expand_semantic_threshold # 保存配置的语义阈值
logger.info(f"MemoryTools 初始化: max_expand_depth={max_expand_depth}, expand_semantic_threshold={expand_semantic_threshold}")
# 初始化组件
self.extractor = MemoryExtractor()
@@ -505,7 +510,7 @@ class MemoryTools:
initial_memory_ids=list(initial_memory_ids),
query_embedding=query_embedding,
max_depth=expand_depth,
semantic_threshold=0.5,
semantic_threshold=self.expand_semantic_threshold, # 使用配置的阈值
max_expanded=top_k * 2
)