feat: 采用三层内存系统实现统一内存管理器

- 添加了UnifiedMemoryManager,以整合感知层、短期记忆层和长期记忆层。
- 实现了初始化、消息添加和内存搜索功能。
- 引入了记忆从短期存储到长期存储的自动转移机制。
- 开发了用于结构化内存表示的内存格式化工具。
- 增强日志记录功能,以便在内存操作过程中更好地进行追踪。
This commit is contained in:
Windpicker-owo
2025-11-18 16:17:25 +08:00
parent b5cfa41d36
commit dc3ad19809
19 changed files with 1544 additions and 1686 deletions

View File

@@ -31,7 +31,7 @@ def _get_unified_memory_manager():
global _unified_memory_manager
if _unified_memory_manager is None:
try:
from src.memory_graph.three_tier.manager_singleton import get_unified_memory_manager
from src.memory_graph.manager_singleton import get_unified_memory_manager
_unified_memory_manager = get_unified_memory_manager()
except Exception as e:

View File

@@ -564,7 +564,7 @@ class DefaultReplyer:
return f"{expression_habits_title}\n{expression_habits_block}"
async def build_memory_block(self, chat_history: str, target: str) -> str:
"""构建记忆块
"""构建记忆块(使用三层记忆系统)
Args:
chat_history: 聊天历史记录
@@ -573,149 +573,13 @@ class DefaultReplyer:
Returns:
str: 记忆信息字符串
"""
# 使用新的记忆图系统检索记忆(带智能查询优化)
all_memories = []
try:
from src.memory_graph.manager_singleton import get_memory_manager, is_initialized
if is_initialized():
manager = get_memory_manager()
if manager:
# 构建查询上下文
stream = self.chat_stream
user_info_obj = getattr(stream, "user_info", None)
sender_name = ""
if user_info_obj:
sender_name = getattr(user_info_obj, "user_nickname", "") or getattr(user_info_obj, "user_cardname", "")
# 格式化聊天历史为更友好的格式
formatted_history = ""
if chat_history:
# 移除过长的历史记录,只保留最近部分
lines = chat_history.strip().split("\n")
recent_lines = lines[-10:] if len(lines) > 10 else lines
formatted_history = "\n".join(recent_lines)
query_context = {
"chat_history": formatted_history,
"sender": sender_name,
}
# 使用记忆管理器的智能检索(多查询策略)
memories = []
if global_config.memory:
memories = []
if global_config.memory:
top_k = global_config.memory.search_top_k
min_importance = global_config.memory.search_min_importance
memories = await manager.search_memories(
query=target,
top_k=top_k,
min_importance=min_importance,
include_forgotten=False,
use_multi_query=True,
context=query_context,
)
if memories:
logger.info(f"[记忆图] 检索到 {len(memories)} 条相关记忆")
# 使用新的格式化工具构建完整的记忆描述
from src.memory_graph.utils.memory_formatter import (
format_memory_for_prompt,
get_memory_type_label,
)
for memory in memories:
# 使用格式化工具生成完整的主谓宾描述
content = format_memory_for_prompt(memory, include_metadata=False)
# 获取记忆类型
mem_type = memory.memory_type.value if memory.memory_type else "未知"
if content:
all_memories.append({
"content": content,
"memory_type": mem_type,
"importance": memory.importance,
"relevance": 0.7,
"source": "memory_graph",
})
logger.debug(f"[记忆构建] 格式化记忆: [{mem_type}] {content[:50]}...")
else:
logger.debug("[记忆图] 未找到相关记忆")
except Exception as e:
logger.debug(f"[记忆图] 检索失败: {e}")
all_memories = []
# 构建记忆字符串,使用方括号格式
memory_str = ""
has_any_memory = False
# 添加长期记忆(来自记忆图系统)
if all_memories:
# 使用方括号格式
memory_parts = ["### 🧠 相关记忆 (Relevant Memories)", ""]
# 按相关度排序,并记录相关度信息用于调试
sorted_memories = sorted(all_memories, key=lambda x: x.get("relevance", 0.0), reverse=True)
# 调试相关度信息
relevance_info = [(m.get("memory_type", "unknown"), m.get("relevance", 0.0)) for m in sorted_memories]
logger.debug(f"记忆相关度信息: {relevance_info}")
logger.debug(f"[记忆构建] 准备将 {len(sorted_memories)} 条记忆添加到提示词")
for idx, running_memory in enumerate(sorted_memories, 1):
content = running_memory.get("content", "")
memory_type = running_memory.get("memory_type", "unknown")
# 跳过空内容
if not content or not content.strip():
logger.warning(f"[记忆构建] 跳过第 {idx} 条记忆:内容为空 (type={memory_type})")
logger.debug(f"[记忆构建] 空记忆详情: {running_memory}")
continue
# 使用记忆图的类型映射(优先)或全局映射
try:
from src.memory_graph.utils.memory_formatter import get_memory_type_label
chinese_type = get_memory_type_label(memory_type)
except ImportError:
# 回退到全局映射
chinese_type = get_memory_type_chinese_label(memory_type)
# 提取纯净内容(如果包含旧格式的元数据)
clean_content = content
if "(类型:" in content and "" in content:
clean_content = content.split("(类型:")[0].strip()
logger.debug(f"[记忆构建] 添加第 {idx} 条记忆: [{chinese_type}] {clean_content[:50]}...")
memory_parts.append(f"- **[{chinese_type}]** {clean_content}")
memory_str = "\n".join(memory_parts) + "\n"
has_any_memory = True
logger.debug(f"[记忆构建] 成功构建记忆字符串,包含 {len(memory_parts) - 2} 条记忆")
# 瞬时记忆由另一套系统处理,这里不再添加
# 只有当完全没有任何记忆时才返回空字符串
return memory_str if has_any_memory else ""
async def build_three_tier_memory_block(self, chat_history: str, target: str) -> str:
"""构建三层记忆块(感知记忆 + 短期记忆 + 长期记忆)
Args:
chat_history: 聊天历史记录
target: 目标消息内容
Returns:
str: 三层记忆信息字符串
"""
# 检查是否启用三层记忆系统
if not (global_config.three_tier_memory and global_config.three_tier_memory.enable):
return ""
try:
from src.memory_graph.three_tier.manager_singleton import get_unified_memory_manager
from src.memory_graph.manager_singleton import get_unified_memory_manager
from src.memory_graph.utils.memory_formatter import format_memory_for_prompt
unified_manager = get_unified_memory_manager()
if not unified_manager:
@@ -737,45 +601,45 @@ class DefaultReplyer:
short_term_memories = search_result.get("short_term_memories", [])
long_term_memories = search_result.get("long_term_memories", [])
memory_parts = ["### 🔮 三层记忆系统 (Three-Tier Memory)", ""]
memory_parts = ["### 🧠 相关记忆 (Relevant Memories)", ""]
# 添加感知记忆(最近的消息块)
if perceptual_blocks:
memory_parts.append("#### 🌊 感知记忆 (Perceptual Memory)")
memory_parts.append("#### 🌊 感知记忆")
for block in perceptual_blocks[:2]: # 最多显示2个块
# MemoryBlock 对象有 messages 属性(列表)
messages = block.messages if hasattr(block, 'messages') else []
if messages:
block_content = "".join([f"{msg.get('sender_name', msg.get('sender_id', ''))}: {msg.get('content', '')[:30]}" for msg in messages[:3]])
block_content = "".join([
f"{msg.get('sender_name', msg.get('sender_id', ''))}: {msg.get('content', '')[:30]}"
for msg in messages[:3]
])
memory_parts.append(f"- {block_content}")
memory_parts.append("")
# 添加短期记忆(结构化活跃记忆)
if short_term_memories:
memory_parts.append("#### 💭 短期记忆 (Short-Term Memory)")
memory_parts.append("#### 💭 短期记忆")
for mem in short_term_memories[:3]: # 最多显示3条
# ShortTermMemory 对象有属性而非字典
if hasattr(mem, 'subject') and hasattr(mem, 'topic') and hasattr(mem, 'object'):
subject = mem.subject or ""
topic = mem.topic or ""
obj = mem.object or ""
content = f"{subject} {topic} {obj}" if all([subject, topic, obj]) else (mem.content if hasattr(mem, 'content') else str(mem))
else:
content = mem.content if hasattr(mem, 'content') else str(mem)
memory_parts.append(f"- {content}")
content = format_memory_for_prompt(mem, include_metadata=False)
if content:
memory_parts.append(f"- {content}")
memory_parts.append("")
# 添加长期记忆(图谱记忆)
if long_term_memories:
memory_parts.append("#### 🧠 长期记忆 (Long-Term Memory)")
memory_parts.append("#### 🗄️ 长期记忆")
for mem in long_term_memories[:3]: # 最多显示3条
# Memory 对象有 content 属性
content = mem.content if hasattr(mem, 'content') else str(mem)
memory_parts.append(f"- {content}")
content = format_memory_for_prompt(mem, include_metadata=False)
if content:
memory_parts.append(f"- {content}")
memory_parts.append("")
total_count = len(perceptual_blocks) + len(short_term_memories) + len(long_term_memories)
logger.info(f"[三层记忆] 检索到 {total_count} 条记忆 (感知:{len(perceptual_blocks)}, 短期:{len(short_term_memories)}, 长期:{len(long_term_memories)})")
if total_count > 0:
logger.info(
f"[三层记忆] 检索到 {total_count} 条记忆 "
f"(感知:{len(perceptual_blocks)}, 短期:{len(short_term_memories)}, 长期:{len(long_term_memories)})"
)
return "\n".join(memory_parts) if len(memory_parts) > 2 else ""
@@ -783,6 +647,8 @@ class DefaultReplyer:
logger.error(f"[三层记忆] 检索失败: {e}", exc_info=True)
return ""
async def build_tool_info(self, chat_history: str, sender: str, target: str, enable_tool: bool = True) -> str:
"""构建工具信息块
@@ -1405,9 +1271,6 @@ class DefaultReplyer:
"memory_block": asyncio.create_task(
self._time_and_run_task(self.build_memory_block(chat_talking_prompt_short, target), "memory_block")
),
"three_tier_memory": asyncio.create_task(
self._time_and_run_task(self.build_three_tier_memory_block(chat_talking_prompt_short, target), "three_tier_memory")
),
"tool_info": asyncio.create_task(
self._time_and_run_task(
self.build_tool_info(chat_talking_prompt_short, sender, target, enable_tool=enable_tool),

View File

@@ -149,6 +149,12 @@ class ModelTaskConfig(ValidatedConfigBase):
# 处理配置文件中命名不一致的问题
utils_video: TaskConfig = Field(..., description="视频分析模型配置(兼容配置文件中的命名)")
# 记忆系统专用模型配置
memory_short_term_builder: TaskConfig = Field(..., description="短期记忆构建模型配置(感知→短期格式化)")
memory_short_term_decider: TaskConfig = Field(..., description="短期记忆决策模型配置(合并/更新/新建/丢弃)")
memory_long_term_builder: TaskConfig = Field(..., description="长期记忆构建模型配置(短期→长期图结构)")
memory_judge: TaskConfig = Field(..., description="记忆检索裁判模型配置(判断检索是否充足)")
@property
def video_analysis(self) -> TaskConfig:
"""视频分析模型配置(提供向后兼容的属性访问)"""

View File

@@ -249,7 +249,7 @@ class MainSystem:
# 停止增强记忆系统
# 停止三层记忆系统
try:
from src.memory_graph.three_tier.manager_singleton import get_unified_memory_manager, shutdown_unified_memory_manager
from src.memory_graph.manager_singleton import get_unified_memory_manager, shutdown_unified_memory_manager
if get_unified_memory_manager():
cleanup_tasks.append(("三层记忆系统", shutdown_unified_memory_manager()))
@@ -480,7 +480,7 @@ MoFox_Bot(第三方修改版)
# 初始化三层记忆系统(如果启用)
try:
if global_config.three_tier_memory and global_config.three_tier_memory.enable:
from src.memory_graph.three_tier.manager_singleton import initialize_unified_memory_manager
from src.memory_graph.manager_singleton import initialize_unified_memory_manager
logger.info("三层记忆系统已启用,正在初始化...")
await initialize_unified_memory_manager()
logger.info("三层记忆系统初始化成功")

View File

@@ -17,7 +17,7 @@ from typing import Any
from src.common.logger import get_logger
from src.memory_graph.manager import MemoryManager
from src.memory_graph.models import Memory, MemoryType, NodeType
from src.memory_graph.three_tier.models import GraphOperation, GraphOperationType, ShortTermMemory
from src.memory_graph.models import GraphOperation, GraphOperationType, ShortTermMemory
logger = get_logger(__name__)
@@ -249,9 +249,9 @@ class LongTermMemoryManager:
# 构建提示词
prompt = self._build_graph_operation_prompt(stm, similar_memories)
# 调用 LLM
# 调用长期记忆构建模型
llm = LLMRequest(
model_set=model_config.model_task_config.utils_small,
model_set=model_config.model_task_config.memory_long_term_builder,
request_type="long_term_memory.graph_operations",
)
@@ -509,6 +509,10 @@ class LongTermMemoryManager:
async def _execute_update_memory(self, op: GraphOperation) -> None:
"""执行更新记忆操作"""
memory_id = op.target_id
if not memory_id:
logger.error("更新操作缺少目标记忆ID")
return
updates = op.parameters.get("updated_fields", {})
success = await self.memory_manager.update_memory(memory_id, **updates)

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,7 @@
"""
记忆系统管理单例
提供全局访问的 MemoryManager 实例
提供全局访问的 MemoryManager 和 UnifiedMemoryManager 实例
"""
from __future__ import annotations
@@ -13,10 +13,18 @@ from src.memory_graph.manager import MemoryManager
logger = get_logger(__name__)
# 全局 MemoryManager 实例
# 全局 MemoryManager 实例(旧的单层记忆系统,已弃用)
_memory_manager: MemoryManager | None = None
_initialized: bool = False
# 全局 UnifiedMemoryManager 实例(新的三层记忆系统)
_unified_memory_manager = None
# ============================================================================
# 旧的单层记忆系统 API已弃用保留用于向后兼容
# ============================================================================
async def initialize_memory_manager(
data_dir: Path | str | None = None,
@@ -104,3 +112,97 @@ async def shutdown_memory_manager():
def is_initialized() -> bool:
"""检查 MemoryManager 是否已初始化"""
return _initialized and _memory_manager is not None
# ============================================================================
# 新的三层记忆系统 API推荐使用
# ============================================================================
async def initialize_unified_memory_manager():
"""
初始化统一记忆管理器(三层记忆系统)
从全局配置读取参数
Returns:
初始化后的管理器实例,未启用返回 None
"""
global _unified_memory_manager
if _unified_memory_manager is not None:
logger.warning("统一记忆管理器已经初始化")
return _unified_memory_manager
try:
from src.config.config import global_config
from src.memory_graph.unified_manager import UnifiedMemoryManager
# 检查是否启用三层记忆系统
if not hasattr(global_config, "three_tier_memory") or not getattr(
global_config.three_tier_memory, "enable", False
):
logger.warning("三层记忆系统未启用,跳过初始化")
return None
config = global_config.three_tier_memory
# 创建管理器实例
_unified_memory_manager = UnifiedMemoryManager(
data_dir=Path(getattr(config, "data_dir", "data/memory_graph/three_tier")),
# 感知记忆配置
perceptual_max_blocks=getattr(config, "perceptual_max_blocks", 50),
perceptual_block_size=getattr(config, "perceptual_block_size", 5),
perceptual_activation_threshold=getattr(config, "perceptual_activation_threshold", 3),
perceptual_recall_top_k=getattr(config, "perceptual_recall_top_k", 5),
perceptual_recall_threshold=getattr(config, "perceptual_recall_threshold", 0.55),
# 短期记忆配置
short_term_max_memories=getattr(config, "short_term_max_memories", 30),
short_term_transfer_threshold=getattr(config, "short_term_transfer_threshold", 0.6),
# 长期记忆配置
long_term_batch_size=getattr(config, "long_term_batch_size", 10),
long_term_search_top_k=getattr(config, "long_term_search_top_k", 5),
long_term_decay_factor=getattr(config, "long_term_decay_factor", 0.95),
# 智能检索配置
judge_confidence_threshold=getattr(config, "judge_confidence_threshold", 0.7),
)
# 初始化
await _unified_memory_manager.initialize()
logger.info("✅ 统一记忆管理器单例已初始化")
return _unified_memory_manager
except Exception as e:
logger.error(f"初始化统一记忆管理器失败: {e}", exc_info=True)
raise
def get_unified_memory_manager():
"""
获取统一记忆管理器实例(三层记忆系统)
Returns:
管理器实例,未初始化返回 None
"""
if _unified_memory_manager is None:
logger.warning("统一记忆管理器尚未初始化,请先调用 initialize_unified_memory_manager()")
return _unified_memory_manager
async def shutdown_unified_memory_manager() -> None:
"""关闭统一记忆管理器"""
global _unified_memory_manager
if _unified_memory_manager is None:
logger.warning("统一记忆管理器未初始化,无需关闭")
return
try:
await _unified_memory_manager.shutdown()
_unified_memory_manager = None
logger.info("✅ 统一记忆管理器已关闭")
except Exception as e:
logger.error(f"关闭统一记忆管理器失败: {e}", exc_info=True)

View File

@@ -1,7 +1,7 @@
"""
记忆图系统核心数据模型
定义节点、边、记忆等核心数据结构
定义节点、边、记忆等核心数据结构(包含三层记忆系统)
"""
from __future__ import annotations
@@ -15,6 +15,50 @@ from typing import Any
import numpy as np
# ============================================================================
# 三层记忆系统枚举
# ============================================================================
class MemoryTier(Enum):
"""记忆层级枚举"""
PERCEPTUAL = "perceptual" # 感知记忆层
SHORT_TERM = "short_term" # 短期记忆层
LONG_TERM = "long_term" # 长期记忆层
class GraphOperationType(Enum):
"""图操作类型枚举"""
CREATE_NODE = "create_node" # 创建节点
UPDATE_NODE = "update_node" # 更新节点
DELETE_NODE = "delete_node" # 删除节点
MERGE_NODES = "merge_nodes" # 合并节点
CREATE_EDGE = "create_edge" # 创建边
UPDATE_EDGE = "update_edge" # 更新边
DELETE_EDGE = "delete_edge" # 删除边
CREATE_MEMORY = "create_memory" # 创建记忆
UPDATE_MEMORY = "update_memory" # 更新记忆
DELETE_MEMORY = "delete_memory" # 删除记忆
MERGE_MEMORIES = "merge_memories" # 合并记忆
class ShortTermOperation(Enum):
"""短期记忆操作类型枚举"""
MERGE = "merge" # 合并到现有记忆
UPDATE = "update" # 更新现有记忆
CREATE_NEW = "create_new" # 创建新记忆
DISCARD = "discard" # 丢弃(低价值)
KEEP_SEPARATE = "keep_separate" # 保持独立(暂不合并)
# ============================================================================
# 图谱系统枚举
# ============================================================================
class NodeType(Enum):
"""节点类型枚举"""
@@ -305,3 +349,329 @@ class StagedMemory:
consolidated_at=datetime.fromisoformat(data["consolidated_at"]) if data.get("consolidated_at") else None,
merge_history=data.get("merge_history", []),
)
# ============================================================================
# 三层记忆系统数据模型
# ============================================================================
@dataclass
class MemoryBlock:
"""
感知记忆块
表示 n 条消息组成的一个语义单元,是感知记忆的基本单位。
"""
id: str # 记忆块唯一ID
messages: list[dict[str, Any]] # 原始消息列表(包含消息内容、发送者、时间等)
combined_text: str # 合并后的文本(用于生成向量)
embedding: np.ndarray | None = None # 整个块的向量表示
created_at: datetime = field(default_factory=datetime.now)
recall_count: int = 0 # 被召回次数(用于判断是否激活)
last_recalled: datetime | None = None # 最后一次被召回的时间
position_in_stack: int = 0 # 在记忆堆中的位置0=最顶层)
metadata: dict[str, Any] = field(default_factory=dict) # 额外元数据
def __post_init__(self):
"""后初始化处理"""
if not self.id:
self.id = f"block_{uuid.uuid4().hex[:12]}"
def to_dict(self) -> dict[str, Any]:
"""转换为字典(用于序列化)"""
return {
"id": self.id,
"messages": self.messages,
"combined_text": self.combined_text,
"created_at": self.created_at.isoformat(),
"recall_count": self.recall_count,
"last_recalled": self.last_recalled.isoformat() if self.last_recalled else None,
"position_in_stack": self.position_in_stack,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> MemoryBlock:
"""从字典创建记忆块"""
return cls(
id=data["id"],
messages=data["messages"],
combined_text=data["combined_text"],
embedding=None, # 向量数据需要单独加载
created_at=datetime.fromisoformat(data["created_at"]),
recall_count=data.get("recall_count", 0),
last_recalled=datetime.fromisoformat(data["last_recalled"]) if data.get("last_recalled") else None,
position_in_stack=data.get("position_in_stack", 0),
metadata=data.get("metadata", {}),
)
def increment_recall(self) -> None:
"""增加召回计数"""
self.recall_count += 1
self.last_recalled = datetime.now()
def __str__(self) -> str:
return f"MemoryBlock({self.id[:8]}, messages={len(self.messages)}, recalls={self.recall_count})"
@dataclass
class PerceptualMemory:
"""
感知记忆(记忆堆的完整状态)
全局单例,管理所有感知记忆块
"""
blocks: list[MemoryBlock] = field(default_factory=list) # 记忆块列表(有序,新的在前)
max_blocks: int = 50 # 记忆堆最大容量
block_size: int = 5 # 每个块包含的消息数量
pending_messages: list[dict[str, Any]] = field(default_factory=list) # 等待组块的消息缓存
created_at: datetime = field(default_factory=datetime.now)
metadata: dict[str, Any] = field(default_factory=dict) # 全局元数据
def to_dict(self) -> dict[str, Any]:
"""转换为字典(用于序列化)"""
return {
"blocks": [block.to_dict() for block in self.blocks],
"max_blocks": self.max_blocks,
"block_size": self.block_size,
"pending_messages": self.pending_messages,
"created_at": self.created_at.isoformat(),
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PerceptualMemory:
"""从字典创建感知记忆"""
return cls(
blocks=[MemoryBlock.from_dict(b) for b in data.get("blocks", [])],
max_blocks=data.get("max_blocks", 50),
block_size=data.get("block_size", 5),
pending_messages=data.get("pending_messages", []),
created_at=datetime.fromisoformat(data["created_at"]),
metadata=data.get("metadata", {}),
)
@dataclass
class ShortTermMemory:
"""
短期记忆
结构化的活跃记忆,介于感知记忆和长期记忆之间。
使用与长期记忆相同的 Memory 结构,但不包含图关系。
"""
id: str # 短期记忆唯一ID
content: str # 记忆的文本内容LLM 结构化后的描述)
embedding: np.ndarray | None = None # 向量表示
importance: float = 0.5 # 重要性评分 [0-1]
source_block_ids: list[str] = field(default_factory=list) # 来源感知记忆块ID列表
created_at: datetime = field(default_factory=datetime.now)
last_accessed: datetime = field(default_factory=datetime.now)
access_count: int = 0 # 访问次数
metadata: dict[str, Any] = field(default_factory=dict) # 额外元数据
# 记忆结构化字段(与长期记忆 Memory 兼容)
subject: str | None = None # 主体
topic: str | None = None # 主题
object: str | None = None # 客体
memory_type: str | None = None # 记忆类型
attributes: dict[str, str] = field(default_factory=dict) # 属性
def __post_init__(self):
"""后初始化处理"""
if not self.id:
self.id = f"stm_{uuid.uuid4().hex[:12]}"
# 确保重要性在有效范围内
self.importance = max(0.0, min(1.0, self.importance))
def to_dict(self) -> dict[str, Any]:
"""转换为字典(用于序列化)"""
return {
"id": self.id,
"content": self.content,
"importance": self.importance,
"source_block_ids": self.source_block_ids,
"created_at": self.created_at.isoformat(),
"last_accessed": self.last_accessed.isoformat(),
"access_count": self.access_count,
"metadata": self.metadata,
"subject": self.subject,
"topic": self.topic,
"object": self.object,
"memory_type": self.memory_type,
"attributes": self.attributes,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> ShortTermMemory:
"""从字典创建短期记忆"""
return cls(
id=data["id"],
content=data["content"],
embedding=None, # 向量数据需要单独加载
importance=data.get("importance", 0.5),
source_block_ids=data.get("source_block_ids", []),
created_at=datetime.fromisoformat(data["created_at"]),
last_accessed=datetime.fromisoformat(data.get("last_accessed", data["created_at"])),
access_count=data.get("access_count", 0),
metadata=data.get("metadata", {}),
subject=data.get("subject"),
topic=data.get("topic"),
object=data.get("object"),
memory_type=data.get("memory_type"),
attributes=data.get("attributes", {}),
)
def update_access(self) -> None:
"""更新访问记录"""
self.last_accessed = datetime.now()
self.access_count += 1
def __str__(self) -> str:
return f"ShortTermMemory({self.id[:8]}, content={self.content[:30]}..., importance={self.importance:.2f})"
@dataclass
class GraphOperation:
"""
图操作指令
表示一个对长期记忆图的原子操作,由 LLM 生成。
"""
operation_type: GraphOperationType # 操作类型
target_id: str | None = None # 目标对象ID节点/边/记忆ID
target_ids: list[str] = field(default_factory=list) # 多个目标ID用于合并操作
parameters: dict[str, Any] = field(default_factory=dict) # 操作参数
reason: str = "" # 操作原因LLM 的推理过程)
confidence: float = 1.0 # 操作置信度 [0-1]
def __post_init__(self):
"""后初始化处理"""
self.confidence = max(0.0, min(1.0, self.confidence))
def to_dict(self) -> dict[str, Any]:
"""转换为字典"""
return {
"operation_type": self.operation_type.value,
"target_id": self.target_id,
"target_ids": self.target_ids,
"parameters": self.parameters,
"reason": self.reason,
"confidence": self.confidence,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> GraphOperation:
"""从字典创建操作"""
return cls(
operation_type=GraphOperationType(data["operation_type"]),
target_id=data.get("target_id"),
target_ids=data.get("target_ids", []),
parameters=data.get("parameters", {}),
reason=data.get("reason", ""),
confidence=data.get("confidence", 1.0),
)
def __str__(self) -> str:
return f"GraphOperation({self.operation_type.value}, target={self.target_id}, confidence={self.confidence:.2f})"
@dataclass
class JudgeDecision:
"""
裁判模型决策结果
用于判断检索到的记忆是否充足
"""
is_sufficient: bool # 是否充足
confidence: float = 0.5 # 置信度 [0-1]
reasoning: str = "" # 推理过程
additional_queries: list[str] = field(default_factory=list) # 额外需要检索的 query
missing_aspects: list[str] = field(default_factory=list) # 缺失的信息维度
def __post_init__(self):
"""后初始化处理"""
self.confidence = max(0.0, min(1.0, self.confidence))
def to_dict(self) -> dict[str, Any]:
"""转换为字典"""
return {
"is_sufficient": self.is_sufficient,
"confidence": self.confidence,
"reasoning": self.reasoning,
"additional_queries": self.additional_queries,
"missing_aspects": self.missing_aspects,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> JudgeDecision:
"""从字典创建决策"""
return cls(
is_sufficient=data["is_sufficient"],
confidence=data.get("confidence", 0.5),
reasoning=data.get("reasoning", ""),
additional_queries=data.get("additional_queries", []),
missing_aspects=data.get("missing_aspects", []),
)
def __str__(self) -> str:
status = "充足" if self.is_sufficient else "不足"
return f"JudgeDecision({status}, confidence={self.confidence:.2f}, extra_queries={len(self.additional_queries)})"
@dataclass
class ShortTermDecision:
"""
短期记忆决策结果
LLM 对新短期记忆的处理决策
"""
operation: ShortTermOperation # 操作类型
target_memory_id: str | None = None # 目标记忆ID用于 MERGE/UPDATE
merged_content: str | None = None # 合并后的内容
reasoning: str = "" # 推理过程
confidence: float = 1.0 # 置信度 [0-1]
updated_importance: float | None = None # 更新后的重要性
updated_metadata: dict[str, Any] = field(default_factory=dict) # 更新后的元数据
def __post_init__(self):
"""后初始化处理"""
self.confidence = max(0.0, min(1.0, self.confidence))
if self.updated_importance is not None:
self.updated_importance = max(0.0, min(1.0, self.updated_importance))
def to_dict(self) -> dict[str, Any]:
"""转换为字典"""
return {
"operation": self.operation.value,
"target_memory_id": self.target_memory_id,
"merged_content": self.merged_content,
"reasoning": self.reasoning,
"confidence": self.confidence,
"updated_importance": self.updated_importance,
"updated_metadata": self.updated_metadata,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> ShortTermDecision:
"""从字典创建决策"""
return cls(
operation=ShortTermOperation(data["operation"]),
target_memory_id=data.get("target_memory_id"),
merged_content=data.get("merged_content"),
reasoning=data.get("reasoning", ""),
confidence=data.get("confidence", 1.0),
updated_importance=data.get("updated_importance"),
updated_metadata=data.get("updated_metadata", {}),
)
def __str__(self) -> str:
return f"ShortTermDecision({self.operation.value}, target={self.target_memory_id}, confidence={self.confidence:.2f})"

View File

@@ -18,7 +18,7 @@ from typing import Any
import numpy as np
from src.common.logger import get_logger
from src.memory_graph.three_tier.models import MemoryBlock, PerceptualMemory
from src.memory_graph.models import MemoryBlock, PerceptualMemory
from src.memory_graph.utils.embeddings import EmbeddingGenerator
from src.memory_graph.utils.similarity import cosine_similarity
@@ -75,6 +75,13 @@ class PerceptualMemoryManager:
f"block_size={block_size}, activation_threshold={activation_threshold})"
)
@property
def memory(self) -> PerceptualMemory:
"""获取感知记忆对象(保证非 None"""
if self.perceptual_memory is None:
raise RuntimeError("感知记忆管理器未初始化")
return self.perceptual_memory
async def initialize(self) -> None:
"""初始化管理器"""
if self._initialized:

View File

@@ -1,7 +1,7 @@
"""
记忆系统插件工具
记忆系统插件工具(已废弃)
将 MemoryTools 适配为 BaseTool 格式,供 LLM 使用
警告:记忆创建不再由工具负责,而是通过三级记忆系统自动处理
"""
from __future__ import annotations
@@ -15,7 +15,15 @@ from src.plugin_system.base.component_types import ToolParamType
logger = get_logger(__name__)
class CreateMemoryTool(BaseTool):
# ========== 以下工具类已废弃 ==========
# 记忆系统现在采用三级记忆架构:
# 1. 感知记忆:自动收集消息块
# 2. 短期记忆:激活后由模型格式化
# 3. 长期记忆:定期转移到图结构
#
# 不再需要LLM手动调用工具创建记忆
class _DeprecatedCreateMemoryTool(BaseTool):
"""创建记忆工具"""
name = "create_memory"
@@ -129,8 +137,8 @@ class CreateMemoryTool(BaseTool):
}
class LinkMemoriesTool(BaseTool):
"""关联记忆工具"""
class _DeprecatedLinkMemoriesTool(BaseTool):
"""关联记忆工具(已废弃)"""
name = "link_memories"
description = "在两个记忆之间建立关联关系。用于连接相关的记忆,形成知识网络。"
@@ -189,8 +197,8 @@ class LinkMemoriesTool(BaseTool):
}
class SearchMemoriesTool(BaseTool):
"""搜索记忆工具"""
class _DeprecatedSearchMemoriesTool(BaseTool):
"""搜索记忆工具(已废弃)"""
name = "search_memories"
description = "搜索相关的记忆。根据查询词搜索记忆库,返回最相关的记忆。"

View File

@@ -18,7 +18,7 @@ from typing import Any
import numpy as np
from src.common.logger import get_logger
from src.memory_graph.three_tier.models import (
from src.memory_graph.models import (
MemoryBlock,
ShortTermDecision,
ShortTermMemory,
@@ -194,9 +194,9 @@ class ShortTermMemoryManager:
请输出JSON"""
# 调用 LLM
# 调用短期记忆构建模型
llm = LLMRequest(
model_set=model_config.model_task_config.utils_small,
model_set=model_config.model_task_config.memory_short_term_builder,
request_type="short_term_memory.extract",
)
@@ -299,9 +299,9 @@ class ShortTermMemoryManager:
请输出JSON"""
# 调用 LLM
# 调用短期记忆决策模型
llm = LLMRequest(
model_set=model_config.model_task_config.utils_small,
model_set=model_config.model_task_config.memory_short_term_decider,
request_type="short_term_memory.decide",
)

View File

@@ -1,38 +0,0 @@
"""
三层记忆系统 (Three-Tier Memory System)
分层架构:
1. 感知记忆层 (Perceptual Memory Layer) - 消息块的短期缓存
2. 短期记忆层 (Short-term Memory Layer) - 结构化的活跃记忆
3. 长期记忆层 (Long-term Memory Layer) - 持久化的图结构记忆
设计灵感来源于人脑的记忆机制和 Mem0 项目。
"""
from .models import (
MemoryBlock,
PerceptualMemory,
ShortTermMemory,
GraphOperation,
GraphOperationType,
JudgeDecision,
)
from .perceptual_manager import PerceptualMemoryManager
from .short_term_manager import ShortTermMemoryManager
from .long_term_manager import LongTermMemoryManager
from .unified_manager import UnifiedMemoryManager
__all__ = [
# 数据模型
"MemoryBlock",
"PerceptualMemory",
"ShortTermMemory",
"GraphOperation",
"GraphOperationType",
"JudgeDecision",
# 管理器
"PerceptualMemoryManager",
"ShortTermMemoryManager",
"LongTermMemoryManager",
"UnifiedMemoryManager",
]

View File

@@ -1,101 +0,0 @@
"""
三层记忆系统单例管理器
提供全局访问点
"""
from pathlib import Path
from src.common.logger import get_logger
from src.config.config import global_config
from src.memory_graph.three_tier.unified_manager import UnifiedMemoryManager
logger = get_logger(__name__)
# 全局单例
_unified_memory_manager: UnifiedMemoryManager | None = None
async def initialize_unified_memory_manager() -> UnifiedMemoryManager:
"""
初始化统一记忆管理器
从全局配置读取参数
Returns:
初始化后的管理器实例
"""
global _unified_memory_manager
if _unified_memory_manager is not None:
logger.warning("统一记忆管理器已经初始化")
return _unified_memory_manager
try:
# 检查是否启用三层记忆系统
if not hasattr(global_config, "three_tier_memory") or not getattr(
global_config.three_tier_memory, "enable", False
):
logger.warning("三层记忆系统未启用,跳过初始化")
return None
config = global_config.three_tier_memory
# 创建管理器实例
_unified_memory_manager = UnifiedMemoryManager(
data_dir=Path(getattr(config, "data_dir", "data/memory_graph/three_tier")),
# 感知记忆配置
perceptual_max_blocks=getattr(config, "perceptual_max_blocks", 50),
perceptual_block_size=getattr(config, "perceptual_block_size", 5),
perceptual_activation_threshold=getattr(config, "perceptual_activation_threshold", 3),
perceptual_recall_top_k=getattr(config, "perceptual_recall_top_k", 5),
perceptual_recall_threshold=getattr(config, "perceptual_recall_threshold", 0.55),
# 短期记忆配置
short_term_max_memories=getattr(config, "short_term_max_memories", 30),
short_term_transfer_threshold=getattr(config, "short_term_transfer_threshold", 0.6),
# 长期记忆配置
long_term_batch_size=getattr(config, "long_term_batch_size", 10),
long_term_search_top_k=getattr(config, "long_term_search_top_k", 5),
long_term_decay_factor=getattr(config, "long_term_decay_factor", 0.95),
# 智能检索配置
judge_confidence_threshold=getattr(config, "judge_confidence_threshold", 0.7),
)
# 初始化
await _unified_memory_manager.initialize()
logger.info("✅ 统一记忆管理器单例已初始化")
return _unified_memory_manager
except Exception as e:
logger.error(f"初始化统一记忆管理器失败: {e}", exc_info=True)
raise
def get_unified_memory_manager() -> UnifiedMemoryManager | None:
"""
获取统一记忆管理器实例
Returns:
管理器实例,未初始化返回 None
"""
if _unified_memory_manager is None:
logger.warning("统一记忆管理器尚未初始化,请先调用 initialize_unified_memory_manager()")
return _unified_memory_manager
async def shutdown_unified_memory_manager() -> None:
"""关闭统一记忆管理器"""
global _unified_memory_manager
if _unified_memory_manager is None:
logger.warning("统一记忆管理器未初始化,无需关闭")
return
try:
await _unified_memory_manager.shutdown()
_unified_memory_manager = None
logger.info("✅ 统一记忆管理器已关闭")
except Exception as e:
logger.error(f"关闭统一记忆管理器失败: {e}", exc_info=True)

View File

@@ -1,369 +0,0 @@
"""
三层记忆系统的核心数据模型
定义感知记忆块、短期记忆、图操作语言等数据结构
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
import numpy as np
class MemoryTier(Enum):
"""记忆层级枚举"""
PERCEPTUAL = "perceptual" # 感知记忆层
SHORT_TERM = "short_term" # 短期记忆层
LONG_TERM = "long_term" # 长期记忆层
class GraphOperationType(Enum):
"""图操作类型枚举"""
CREATE_NODE = "create_node" # 创建节点
UPDATE_NODE = "update_node" # 更新节点
DELETE_NODE = "delete_node" # 删除节点
MERGE_NODES = "merge_nodes" # 合并节点
CREATE_EDGE = "create_edge" # 创建边
UPDATE_EDGE = "update_edge" # 更新边
DELETE_EDGE = "delete_edge" # 删除边
CREATE_MEMORY = "create_memory" # 创建记忆
UPDATE_MEMORY = "update_memory" # 更新记忆
DELETE_MEMORY = "delete_memory" # 删除记忆
MERGE_MEMORIES = "merge_memories" # 合并记忆
class ShortTermOperation(Enum):
"""短期记忆操作类型枚举"""
MERGE = "merge" # 合并到现有记忆
UPDATE = "update" # 更新现有记忆
CREATE_NEW = "create_new" # 创建新记忆
DISCARD = "discard" # 丢弃(低价值)
KEEP_SEPARATE = "keep_separate" # 保持独立(暂不合并)
@dataclass
class MemoryBlock:
"""
感知记忆块
表示 n 条消息组成的一个语义单元,是感知记忆的基本单位。
"""
id: str # 记忆块唯一ID
messages: list[dict[str, Any]] # 原始消息列表(包含消息内容、发送者、时间等)
combined_text: str # 合并后的文本(用于生成向量)
embedding: np.ndarray | None = None # 整个块的向量表示
created_at: datetime = field(default_factory=datetime.now)
recall_count: int = 0 # 被召回次数(用于判断是否激活)
last_recalled: datetime | None = None # 最后一次被召回的时间
position_in_stack: int = 0 # 在记忆堆中的位置0=最顶层)
metadata: dict[str, Any] = field(default_factory=dict) # 额外元数据
def __post_init__(self):
"""后初始化处理"""
if not self.id:
self.id = f"block_{uuid.uuid4().hex[:12]}"
def to_dict(self) -> dict[str, Any]:
"""转换为字典(用于序列化)"""
return {
"id": self.id,
"messages": self.messages,
"combined_text": self.combined_text,
"created_at": self.created_at.isoformat(),
"recall_count": self.recall_count,
"last_recalled": self.last_recalled.isoformat() if self.last_recalled else None,
"position_in_stack": self.position_in_stack,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> MemoryBlock:
"""从字典创建记忆块"""
return cls(
id=data["id"],
messages=data["messages"],
combined_text=data["combined_text"],
embedding=None, # 向量数据需要单独加载
created_at=datetime.fromisoformat(data["created_at"]),
recall_count=data.get("recall_count", 0),
last_recalled=datetime.fromisoformat(data["last_recalled"]) if data.get("last_recalled") else None,
position_in_stack=data.get("position_in_stack", 0),
metadata=data.get("metadata", {}),
)
def increment_recall(self) -> None:
"""增加召回计数"""
self.recall_count += 1
self.last_recalled = datetime.now()
def __str__(self) -> str:
return f"MemoryBlock({self.id[:8]}, messages={len(self.messages)}, recalls={self.recall_count})"
@dataclass
class PerceptualMemory:
"""
感知记忆(记忆堆的完整状态)
全局单例,管理所有感知记忆块
"""
blocks: list[MemoryBlock] = field(default_factory=list) # 记忆块列表(有序,新的在前)
max_blocks: int = 50 # 记忆堆最大容量
block_size: int = 5 # 每个块包含的消息数量
pending_messages: list[dict[str, Any]] = field(default_factory=list) # 等待组块的消息缓存
created_at: datetime = field(default_factory=datetime.now)
metadata: dict[str, Any] = field(default_factory=dict) # 全局元数据
def to_dict(self) -> dict[str, Any]:
"""转换为字典(用于序列化)"""
return {
"blocks": [block.to_dict() for block in self.blocks],
"max_blocks": self.max_blocks,
"block_size": self.block_size,
"pending_messages": self.pending_messages,
"created_at": self.created_at.isoformat(),
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PerceptualMemory:
"""从字典创建感知记忆"""
return cls(
blocks=[MemoryBlock.from_dict(b) for b in data.get("blocks", [])],
max_blocks=data.get("max_blocks", 50),
block_size=data.get("block_size", 5),
pending_messages=data.get("pending_messages", []),
created_at=datetime.fromisoformat(data["created_at"]),
metadata=data.get("metadata", {}),
)
@dataclass
class ShortTermMemory:
"""
短期记忆
结构化的活跃记忆,介于感知记忆和长期记忆之间。
使用与长期记忆相同的 Memory 结构,但不包含图关系。
"""
id: str # 短期记忆唯一ID
content: str # 记忆的文本内容LLM 结构化后的描述)
embedding: np.ndarray | None = None # 向量表示
importance: float = 0.5 # 重要性评分 [0-1]
source_block_ids: list[str] = field(default_factory=list) # 来源感知记忆块ID列表
created_at: datetime = field(default_factory=datetime.now)
last_accessed: datetime = field(default_factory=datetime.now)
access_count: int = 0 # 访问次数
metadata: dict[str, Any] = field(default_factory=dict) # 额外元数据
# 记忆结构化字段(与长期记忆 Memory 兼容)
subject: str | None = None # 主体
topic: str | None = None # 主题
object: str | None = None # 客体
memory_type: str | None = None # 记忆类型
attributes: dict[str, str] = field(default_factory=dict) # 属性
def __post_init__(self):
"""后初始化处理"""
if not self.id:
self.id = f"stm_{uuid.uuid4().hex[:12]}"
# 确保重要性在有效范围内
self.importance = max(0.0, min(1.0, self.importance))
def to_dict(self) -> dict[str, Any]:
"""转换为字典(用于序列化)"""
return {
"id": self.id,
"content": self.content,
"importance": self.importance,
"source_block_ids": self.source_block_ids,
"created_at": self.created_at.isoformat(),
"last_accessed": self.last_accessed.isoformat(),
"access_count": self.access_count,
"metadata": self.metadata,
"subject": self.subject,
"topic": self.topic,
"object": self.object,
"memory_type": self.memory_type,
"attributes": self.attributes,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> ShortTermMemory:
"""从字典创建短期记忆"""
return cls(
id=data["id"],
content=data["content"],
embedding=None, # 向量数据需要单独加载
importance=data.get("importance", 0.5),
source_block_ids=data.get("source_block_ids", []),
created_at=datetime.fromisoformat(data["created_at"]),
last_accessed=datetime.fromisoformat(data.get("last_accessed", data["created_at"])),
access_count=data.get("access_count", 0),
metadata=data.get("metadata", {}),
subject=data.get("subject"),
topic=data.get("topic"),
object=data.get("object"),
memory_type=data.get("memory_type"),
attributes=data.get("attributes", {}),
)
def update_access(self) -> None:
"""更新访问记录"""
self.last_accessed = datetime.now()
self.access_count += 1
def __str__(self) -> str:
return f"ShortTermMemory({self.id[:8]}, content={self.content[:30]}..., importance={self.importance:.2f})"
@dataclass
class GraphOperation:
"""
图操作指令
表示一个对长期记忆图的原子操作,由 LLM 生成。
"""
operation_type: GraphOperationType # 操作类型
target_id: str | None = None # 目标对象ID节点/边/记忆ID
target_ids: list[str] = field(default_factory=list) # 多个目标ID用于合并操作
parameters: dict[str, Any] = field(default_factory=dict) # 操作参数
reason: str = "" # 操作原因LLM 的推理过程)
confidence: float = 1.0 # 操作置信度 [0-1]
def __post_init__(self):
"""后初始化处理"""
self.confidence = max(0.0, min(1.0, self.confidence))
def to_dict(self) -> dict[str, Any]:
"""转换为字典"""
return {
"operation_type": self.operation_type.value,
"target_id": self.target_id,
"target_ids": self.target_ids,
"parameters": self.parameters,
"reason": self.reason,
"confidence": self.confidence,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> GraphOperation:
"""从字典创建操作"""
return cls(
operation_type=GraphOperationType(data["operation_type"]),
target_id=data.get("target_id"),
target_ids=data.get("target_ids", []),
parameters=data.get("parameters", {}),
reason=data.get("reason", ""),
confidence=data.get("confidence", 1.0),
)
def __str__(self) -> str:
return f"GraphOperation({self.operation_type.value}, target={self.target_id}, confidence={self.confidence:.2f})"
@dataclass
class JudgeDecision:
"""
裁判模型决策结果
用于判断检索到的记忆是否充足
"""
is_sufficient: bool # 是否充足
confidence: float = 0.5 # 置信度 [0-1]
reasoning: str = "" # 推理过程
additional_queries: list[str] = field(default_factory=list) # 额外需要检索的 query
missing_aspects: list[str] = field(default_factory=list) # 缺失的信息维度
def __post_init__(self):
"""后初始化处理"""
self.confidence = max(0.0, min(1.0, self.confidence))
def to_dict(self) -> dict[str, Any]:
"""转换为字典"""
return {
"is_sufficient": self.is_sufficient,
"confidence": self.confidence,
"reasoning": self.reasoning,
"additional_queries": self.additional_queries,
"missing_aspects": self.missing_aspects,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> JudgeDecision:
"""从字典创建决策"""
return cls(
is_sufficient=data["is_sufficient"],
confidence=data.get("confidence", 0.5),
reasoning=data.get("reasoning", ""),
additional_queries=data.get("additional_queries", []),
missing_aspects=data.get("missing_aspects", []),
)
def __str__(self) -> str:
status = "充足" if self.is_sufficient else "不足"
return f"JudgeDecision({status}, confidence={self.confidence:.2f}, extra_queries={len(self.additional_queries)})"
@dataclass
class ShortTermDecision:
"""
短期记忆决策结果
LLM 对新短期记忆的处理决策
"""
operation: ShortTermOperation # 操作类型
target_memory_id: str | None = None # 目标记忆ID用于 MERGE/UPDATE
merged_content: str | None = None # 合并后的内容
reasoning: str = "" # 推理过程
confidence: float = 1.0 # 置信度 [0-1]
updated_importance: float | None = None # 更新后的重要性
updated_metadata: dict[str, Any] = field(default_factory=dict) # 更新后的元数据
def __post_init__(self):
"""后初始化处理"""
self.confidence = max(0.0, min(1.0, self.confidence))
if self.updated_importance is not None:
self.updated_importance = max(0.0, min(1.0, self.updated_importance))
def to_dict(self) -> dict[str, Any]:
"""转换为字典"""
return {
"operation": self.operation.value,
"target_memory_id": self.target_memory_id,
"merged_content": self.merged_content,
"reasoning": self.reasoning,
"confidence": self.confidence,
"updated_importance": self.updated_importance,
"updated_metadata": self.updated_metadata,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> ShortTermDecision:
"""从字典创建决策"""
return cls(
operation=ShortTermOperation(data["operation"]),
target_memory_id=data.get("target_memory_id"),
merged_content=data.get("merged_content"),
reasoning=data.get("reasoning", ""),
confidence=data.get("confidence", 1.0),
updated_importance=data.get("updated_importance"),
updated_metadata=data.get("updated_metadata", {}),
)
def __str__(self) -> str:
return f"ShortTermDecision({self.operation.value}, target={self.target_memory_id}, confidence={self.confidence:.2f})"

View File

@@ -74,12 +74,12 @@ class UnifiedMemoryManager:
self.judge_confidence_threshold = judge_confidence_threshold
# 三层管理器
self.perceptual_manager: PerceptualMemoryManager | None = None
self.short_term_manager: ShortTermMemoryManager | None = None
self.long_term_manager: LongTermMemoryManager | None = None
self.perceptual_manager: PerceptualMemoryManager
self.short_term_manager: ShortTermMemoryManager
self.long_term_manager: LongTermMemoryManager
# 底层 MemoryManager长期记忆
self.memory_manager: MemoryManager | None = None
self.memory_manager: MemoryManager
# 配置参数存储(用于初始化)
self._config = {
@@ -313,15 +313,29 @@ class UnifiedMemoryManager:
try:
from src.config.config import model_config
from src.llm_models.utils_model import LLMRequest
from src.memory_graph.utils.memory_formatter import format_memory_for_prompt
# 构建提示词
perceptual_desc = "\n\n".join(
[f"记忆块{i+1}:\n{block.combined_text}" for i, block in enumerate(perceptual_blocks)]
)
# 构建提示词 - 使用优化的格式
# 防御性处理:确保 combined_text 是字符串
perceptual_texts = []
for i, block in enumerate(perceptual_blocks):
text = block.combined_text
if isinstance(text, list):
text = " ".join(str(item) for item in text)
elif not isinstance(text, str):
text = str(text)
perceptual_texts.append(f"记忆块{i+1}:\n{text}")
short_term_desc = "\n\n".join(
[f"记忆{i+1}:\n{mem.content}" for i, mem in enumerate(short_term_memories)]
)
perceptual_desc = "\n\n".join(perceptual_texts)
# 短期记忆使用 "主体-主题(属性)" 格式
short_term_texts = []
for mem in short_term_memories:
formatted = format_memory_for_prompt(mem, include_metadata=False)
if formatted: # 只添加非空的格式化结果
short_term_texts.append(f"- {formatted}")
short_term_desc = "\n".join(short_term_texts)
prompt = f"""你是一个记忆检索评估专家。请判断检索到的记忆是否足以回答用户的问题。
@@ -331,7 +345,7 @@ class UnifiedMemoryManager:
**检索到的感知记忆块:**
{perceptual_desc or '(无)'}
**检索到的短期记忆:**
**检索到的短期记忆(结构化记忆,格式:主体-主题(属性)**
{short_term_desc or '(无)'}
**任务要求:**
@@ -352,16 +366,16 @@ class UnifiedMemoryManager:
请输出JSON"""
# 调用 LLM
# 调用记忆裁判模型
llm = LLMRequest(
model_set=model_config.model_task_config.utils_small,
model_set=model_config.model_task_config.memory_judge,
request_type="unified_memory.judge",
)
response, _ = await llm.generate_response_async(
prompt,
temperature=0.2,
max_tokens=800,
temperature=0.1,
max_tokens=600,
)
# 解析响应
@@ -407,32 +421,53 @@ class UnifiedMemoryManager:
logger.info("自动转移任务已启动")
async def _auto_transfer_loop(self) -> None:
"""自动转移循环"""
"""自动转移循环(批量缓存模式)"""
transfer_cache = [] # 缓存待转移的短期记忆
cache_size_threshold = self._config["long_term"]["batch_size"] # 使用配置的批量大小
while True:
try:
# 每 10 分钟检查一次
await asyncio.sleep(600)
# 检查短期记忆是否达到上限
if len(self.short_term_manager.memories) >= self.short_term_manager.max_memories:
logger.info("短期记忆已达上限,开始转移到长期记忆")
# 检查短期记忆是否有需要转移的
memories_to_transfer = self.short_term_manager.get_memories_for_transfer()
# 获取待转移的记忆
memories_to_transfer = self.short_term_manager.get_memories_for_transfer()
if memories_to_transfer:
# 添加到缓存
transfer_cache.extend(memories_to_transfer)
logger.info(
f"缓存待转移记忆: 新增{len(memories_to_transfer)}条, "
f"缓存总数{len(transfer_cache)}/{cache_size_threshold}"
)
if memories_to_transfer:
# 执行转移
result = await self.long_term_manager.transfer_from_short_term(
memories_to_transfer
# 检查是否达到批量转移阈值或短期记忆已满
should_transfer = (
len(transfer_cache) >= cache_size_threshold or
len(self.short_term_manager.memories) >= self.short_term_manager.max_memories
)
if should_transfer and transfer_cache:
logger.info(f"触发批量转移: {len(transfer_cache)}条短期记忆→长期记忆")
# 执行批量转移
result = await self.long_term_manager.transfer_from_short_term(
transfer_cache
)
# 清除已转移的记忆
if result.get("transferred_memory_ids"):
await self.short_term_manager.clear_transferred_memories(
result["transferred_memory_ids"]
)
# 从缓存中移除已转移的
transferred_ids = set(result["transferred_memory_ids"])
transfer_cache = [
m for m in transfer_cache
if m.id not in transferred_ids
]
# 清除已转移的记忆
if result.get("transferred_memory_ids"):
await self.short_term_manager.clear_transferred_memories(
result["transferred_memory_ids"]
)
logger.info(f"自动转移完成: {result}")
logger.info(f"✅ 批量转移完成: {result}")
except asyncio.CancelledError:
logger.info("自动转移任务已取消")

View File

@@ -0,0 +1,561 @@
"""
统一记忆管理器 (Unified Memory Manager)
整合三层记忆系统:
- 感知记忆层
- 短期记忆层
- 长期记忆层
提供统一的接口供外部调用
"""
import asyncio
from datetime import datetime
from pathlib import Path
from typing import Any
from src.common.logger import get_logger
from src.memory_graph.manager import MemoryManager
from src.memory_graph.long_term_manager import LongTermMemoryManager
from src.memory_graph.models import JudgeDecision, MemoryBlock, ShortTermMemory
from src.memory_graph.perceptual_manager import PerceptualMemoryManager
from src.memory_graph.short_term_manager import ShortTermMemoryManager
logger = get_logger(__name__)
class UnifiedMemoryManager:
"""
统一记忆管理器
整合三层记忆系统,提供统一接口
"""
def __init__(
self,
data_dir: Path | None = None,
# 感知记忆配置
perceptual_max_blocks: int = 50,
perceptual_block_size: int = 5,
perceptual_activation_threshold: int = 3,
perceptual_recall_top_k: int = 5,
perceptual_recall_threshold: float = 0.55,
# 短期记忆配置
short_term_max_memories: int = 30,
short_term_transfer_threshold: float = 0.6,
# 长期记忆配置
long_term_batch_size: int = 10,
long_term_search_top_k: int = 5,
long_term_decay_factor: float = 0.95,
# 智能检索配置
judge_confidence_threshold: float = 0.7,
):
"""
初始化统一记忆管理器
Args:
data_dir: 数据存储目录
perceptual_max_blocks: 感知记忆堆最大容量
perceptual_block_size: 每个记忆块的消息数量
perceptual_activation_threshold: 激活阈值(召回次数)
perceptual_recall_top_k: 召回时返回的最大块数
perceptual_recall_threshold: 召回的相似度阈值
short_term_max_memories: 短期记忆最大数量
short_term_transfer_threshold: 转移到长期记忆的重要性阈值
long_term_batch_size: 批量处理的短期记忆数量
long_term_search_top_k: 检索相似记忆的数量
long_term_decay_factor: 长期记忆的衰减因子
judge_confidence_threshold: 裁判模型的置信度阈值
"""
self.data_dir = data_dir or Path("data/memory_graph/three_tier")
self.data_dir.mkdir(parents=True, exist_ok=True)
# 配置参数
self.judge_confidence_threshold = judge_confidence_threshold
# 三层管理器
self.perceptual_manager: PerceptualMemoryManager
self.short_term_manager: ShortTermMemoryManager
self.long_term_manager: LongTermMemoryManager
# 底层 MemoryManager长期记忆
self.memory_manager: MemoryManager
# 配置参数存储(用于初始化)
self._config = {
"perceptual": {
"max_blocks": perceptual_max_blocks,
"block_size": perceptual_block_size,
"activation_threshold": perceptual_activation_threshold,
"recall_top_k": perceptual_recall_top_k,
"recall_similarity_threshold": perceptual_recall_threshold,
},
"short_term": {
"max_memories": short_term_max_memories,
"transfer_importance_threshold": short_term_transfer_threshold,
},
"long_term": {
"batch_size": long_term_batch_size,
"search_top_k": long_term_search_top_k,
"long_term_decay_factor": long_term_decay_factor,
},
}
# 状态
self._initialized = False
self._auto_transfer_task: asyncio.Task | None = None
logger.info("统一记忆管理器已创建")
async def initialize(self) -> None:
"""初始化统一记忆管理器"""
if self._initialized:
logger.warning("统一记忆管理器已经初始化")
return
try:
logger.info("开始初始化统一记忆管理器...")
# 初始化底层 MemoryManager长期记忆
self.memory_manager = MemoryManager(data_dir=self.data_dir.parent)
await self.memory_manager.initialize()
# 初始化感知记忆层
self.perceptual_manager = PerceptualMemoryManager(
data_dir=self.data_dir,
**self._config["perceptual"],
)
await self.perceptual_manager.initialize()
# 初始化短期记忆层
self.short_term_manager = ShortTermMemoryManager(
data_dir=self.data_dir,
**self._config["short_term"],
)
await self.short_term_manager.initialize()
# 初始化长期记忆层
self.long_term_manager = LongTermMemoryManager(
memory_manager=self.memory_manager,
**self._config["long_term"],
)
await self.long_term_manager.initialize()
self._initialized = True
logger.info("✅ 统一记忆管理器初始化完成")
# 启动自动转移任务
self._start_auto_transfer_task()
except Exception as e:
logger.error(f"统一记忆管理器初始化失败: {e}", exc_info=True)
raise
async def add_message(self, message: dict[str, Any]) -> MemoryBlock | None:
"""
添加消息到感知记忆层
Args:
message: 消息字典
Returns:
如果创建了新块,返回 MemoryBlock
"""
if not self._initialized:
await self.initialize()
new_block = await self.perceptual_manager.add_message(message)
# 注意:感知→短期的转移由召回触发,不是由添加消息触发
# 转移逻辑在 search_memories 中处理
return new_block
# 已移除 _process_activated_blocks 方法
# 转移逻辑现在在 search_memories 中处理:
# 当召回某个记忆块时,如果其 recall_count >= activation_threshold
# 立即将该块转移到短期记忆
async def search_memories(
self, query_text: str, use_judge: bool = True
) -> dict[str, Any]:
"""
智能检索记忆
流程:
1. 优先检索感知记忆和短期记忆
2. 使用裁判模型评估是否充足
3. 如果不充足,生成补充 query 并检索长期记忆
Args:
query_text: 查询文本
use_judge: 是否使用裁判模型
Returns:
检索结果字典,包含:
- perceptual_blocks: 感知记忆块列表
- short_term_memories: 短期记忆列表
- long_term_memories: 长期记忆列表
- judge_decision: 裁判决策(如果使用)
"""
if not self._initialized:
await self.initialize()
try:
result = {
"perceptual_blocks": [],
"short_term_memories": [],
"long_term_memories": [],
"judge_decision": None,
}
# 步骤1: 检索感知记忆和短期记忆
perceptual_blocks = await self.perceptual_manager.recall_blocks(query_text)
short_term_memories = await self.short_term_manager.search_memories(query_text)
# 步骤1.5: 检查并处理需要转移的记忆块
# 当某个块的召回次数达到阈值时,立即转移到短期记忆
blocks_to_transfer = [
block for block in perceptual_blocks
if block.metadata.get("needs_transfer", False)
]
if blocks_to_transfer:
logger.info(f"检测到 {len(blocks_to_transfer)} 个记忆块需要转移到短期记忆")
for block in blocks_to_transfer:
# 转换为短期记忆
stm = await self.short_term_manager.add_from_block(block)
if stm:
# 从感知记忆中移除
await self.perceptual_manager.remove_block(block.id)
logger.info(f"✅ 记忆块 {block.id} 已转为短期记忆 {stm.id}")
# 将新创建的短期记忆加入结果
short_term_memories.append(stm)
result["perceptual_blocks"] = perceptual_blocks
result["short_term_memories"] = short_term_memories
logger.info(
f"初步检索: 感知记忆 {len(perceptual_blocks)} 块, "
f"短期记忆 {len(short_term_memories)}"
)
# 步骤2: 裁判模型评估
if use_judge:
judge_decision = await self._judge_retrieval_sufficiency(
query_text, perceptual_blocks, short_term_memories
)
result["judge_decision"] = judge_decision
# 步骤3: 如果不充足,检索长期记忆
if not judge_decision.is_sufficient:
logger.info("裁判判定记忆不充足,启动长期记忆检索")
# 使用额外的 query 检索
long_term_memories = []
queries = [query_text] + judge_decision.additional_queries
for q in queries:
memories = await self.memory_manager.search_memories(
query=q,
top_k=5,
use_multi_query=False,
)
long_term_memories.extend(memories)
# 去重
seen_ids = set()
unique_memories = []
for mem in long_term_memories:
if mem.id not in seen_ids:
unique_memories.append(mem)
seen_ids.add(mem.id)
result["long_term_memories"] = unique_memories
logger.info(f"长期记忆检索: {len(unique_memories)}")
else:
# 不使用裁判,直接检索长期记忆
long_term_memories = await self.memory_manager.search_memories(
query=query_text,
top_k=5,
use_multi_query=False,
)
result["long_term_memories"] = long_term_memories
return result
except Exception as e:
logger.error(f"智能检索失败: {e}", exc_info=True)
return {
"perceptual_blocks": [],
"short_term_memories": [],
"long_term_memories": [],
"error": str(e),
}
async def _judge_retrieval_sufficiency(
self,
query: str,
perceptual_blocks: list[MemoryBlock],
short_term_memories: list[ShortTermMemory],
) -> JudgeDecision:
"""
使用裁判模型评估检索结果是否充足
Args:
query: 原始查询
perceptual_blocks: 感知记忆块
short_term_memories: 短期记忆
Returns:
裁判决策
"""
try:
from src.config.config import model_config
from src.llm_models.utils_model import LLMRequest
from src.memory_graph.utils.memory_formatter import format_memory_for_prompt
# 构建提示词 - 使用优化的格式
# 防御性处理:确保 combined_text 是字符串
perceptual_texts = []
for i, block in enumerate(perceptual_blocks):
text = block.combined_text
if isinstance(text, list):
text = " ".join(str(item) for item in text)
elif not isinstance(text, str):
text = str(text)
perceptual_texts.append(f"记忆块{i+1}:\n{text}")
perceptual_desc = "\n\n".join(perceptual_texts)
# 短期记忆使用 "主体-主题(属性)" 格式
short_term_texts = []
for mem in short_term_memories:
formatted = format_memory_for_prompt(mem, include_metadata=False)
if formatted: # 只添加非空的格式化结果
short_term_texts.append(f"- {formatted}")
short_term_desc = "\n".join(short_term_texts)
prompt = f"""你是一个记忆检索评估专家。请判断检索到的记忆是否足以回答用户的问题。
**用户查询:**
{query}
**检索到的感知记忆块:**
{perceptual_desc or '(无)'}
**检索到的短期记忆(结构化记忆,格式:主体-主题(属性)**
{short_term_desc or '(无)'}
**任务要求:**
1. 判断这些记忆是否足以回答用户的问题
2. 如果不充足,分析缺少哪些方面的信息
3. 生成额外需要检索的 query用于在长期记忆中检索
**输出格式JSON**
```json
{{
"is_sufficient": true/false,
"confidence": 0.85,
"reasoning": "判断理由",
"missing_aspects": ["缺失的信息1", "缺失的信息2"],
"additional_queries": ["补充query1", "补充query2"]
}}
```
请输出JSON"""
# 调用记忆裁判模型
llm = LLMRequest(
model_set=model_config.model_task_config.memory_judge,
request_type="unified_memory.judge",
)
response, _ = await llm.generate_response_async(
prompt,
temperature=0.1,
max_tokens=600,
)
# 解析响应
import json
import re
json_match = re.search(r"```json\s*(.*?)\s*```", response, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
json_str = response.strip()
data = json.loads(json_str)
decision = JudgeDecision(
is_sufficient=data.get("is_sufficient", False),
confidence=data.get("confidence", 0.5),
reasoning=data.get("reasoning", ""),
additional_queries=data.get("additional_queries", []),
missing_aspects=data.get("missing_aspects", []),
)
logger.info(f"裁判决策: {decision}")
return decision
except Exception as e:
logger.error(f"裁判模型评估失败: {e}", exc_info=True)
# 默认判定为不充足,需要检索长期记忆
return JudgeDecision(
is_sufficient=False,
confidence=0.3,
reasoning=f"裁判模型失败: {e}",
additional_queries=[query],
)
def _start_auto_transfer_task(self) -> None:
"""启动自动转移任务"""
if self._auto_transfer_task and not self._auto_transfer_task.done():
logger.warning("自动转移任务已在运行")
return
self._auto_transfer_task = asyncio.create_task(self._auto_transfer_loop())
logger.info("自动转移任务已启动")
async def _auto_transfer_loop(self) -> None:
"""自动转移循环(批量缓存模式)"""
transfer_cache = [] # 缓存待转移的短期记忆
cache_size_threshold = self._config["long_term"]["batch_size"] # 使用配置的批量大小
while True:
try:
# 每 10 分钟检查一次
await asyncio.sleep(600)
# 检查短期记忆是否有需要转移的
memories_to_transfer = self.short_term_manager.get_memories_for_transfer()
if memories_to_transfer:
# 添加到缓存
transfer_cache.extend(memories_to_transfer)
logger.info(
f"缓存待转移记忆: 新增{len(memories_to_transfer)}条, "
f"缓存总数{len(transfer_cache)}/{cache_size_threshold}"
)
# 检查是否达到批量转移阈值或短期记忆已满
should_transfer = (
len(transfer_cache) >= cache_size_threshold or
len(self.short_term_manager.memories) >= self.short_term_manager.max_memories
)
if should_transfer and transfer_cache:
logger.info(f"触发批量转移: {len(transfer_cache)}条短期记忆→长期记忆")
# 执行批量转移
result = await self.long_term_manager.transfer_from_short_term(
transfer_cache
)
# 清除已转移的记忆
if result.get("transferred_memory_ids"):
await self.short_term_manager.clear_transferred_memories(
result["transferred_memory_ids"]
)
# 从缓存中移除已转移的
transferred_ids = set(result["transferred_memory_ids"])
transfer_cache = [
m for m in transfer_cache
if m.id not in transferred_ids
]
logger.info(f"✅ 批量转移完成: {result}")
except asyncio.CancelledError:
logger.info("自动转移任务已取消")
break
except Exception as e:
logger.error(f"自动转移任务错误: {e}", exc_info=True)
# 继续运行
async def manual_transfer(self) -> dict[str, Any]:
"""
手动触发短期记忆到长期记忆的转移
Returns:
转移结果
"""
if not self._initialized:
await self.initialize()
try:
memories_to_transfer = self.short_term_manager.get_memories_for_transfer()
if not memories_to_transfer:
logger.info("没有需要转移的短期记忆")
return {"message": "没有需要转移的记忆", "transferred_count": 0}
# 执行转移
result = await self.long_term_manager.transfer_from_short_term(memories_to_transfer)
# 清除已转移的记忆
if result.get("transferred_memory_ids"):
await self.short_term_manager.clear_transferred_memories(
result["transferred_memory_ids"]
)
logger.info(f"手动转移完成: {result}")
return result
except Exception as e:
logger.error(f"手动转移失败: {e}", exc_info=True)
return {"error": str(e), "transferred_count": 0}
def get_statistics(self) -> dict[str, Any]:
"""获取三层记忆系统的统计信息"""
if not self._initialized:
return {}
return {
"perceptual": self.perceptual_manager.get_statistics(),
"short_term": self.short_term_manager.get_statistics(),
"long_term": self.long_term_manager.get_statistics(),
"total_system_memories": (
self.perceptual_manager.get_statistics().get("total_messages", 0)
+ self.short_term_manager.get_statistics().get("total_memories", 0)
+ self.long_term_manager.get_statistics().get("total_memories", 0)
),
}
async def shutdown(self) -> None:
"""关闭统一记忆管理器"""
if not self._initialized:
return
try:
logger.info("正在关闭统一记忆管理器...")
# 取消自动转移任务
if self._auto_transfer_task and not self._auto_transfer_task.done():
self._auto_transfer_task.cancel()
try:
await self._auto_transfer_task
except asyncio.CancelledError:
pass
# 关闭各层管理器
if self.perceptual_manager:
await self.perceptual_manager.shutdown()
if self.short_term_manager:
await self.short_term_manager.shutdown()
if self.long_term_manager:
await self.long_term_manager.shutdown()
if self.memory_manager:
await self.memory_manager.shutdown()
self._initialized = False
logger.info("✅ 统一记忆管理器已关闭")
except Exception as e:
logger.error(f"关闭统一记忆管理器失败: {e}", exc_info=True)

View File

@@ -0,0 +1,234 @@
"""
记忆格式化工具
提供将记忆对象格式化为提示词的功能,使用 "主体-主题(属性)" 格式。
"""
from src.memory_graph.models import Memory, MemoryNode, NodeType
from src.memory_graph.models import ShortTermMemory
def get_memory_type_label(memory_type: str) -> str:
"""
获取记忆类型的中文标签
Args:
memory_type: 记忆类型(英文)
Returns:
中文标签
"""
type_mapping = {
"事实": "事实",
"事件": "事件",
"观点": "观点",
"关系": "关系",
"目标": "目标",
"计划": "计划",
"fact": "事实",
"event": "事件",
"opinion": "观点",
"relation": "关系",
"goal": "目标",
"plan": "计划",
"unknown": "未知",
}
return type_mapping.get(memory_type.lower(), memory_type)
def format_memory_for_prompt(memory: Memory | ShortTermMemory, include_metadata: bool = True) -> str:
"""
格式化记忆为提示词文本
使用 "主体-主题(属性)" 格式,例如:
- "张三-职业(程序员, 公司=MoFox)"
- "小明-喜欢(Python, 原因=简洁优雅)"
- "拾风-地址(https://mofox.com)"
Args:
memory: Memory 或 ShortTermMemory 对象
include_metadata: 是否包含元数据(如重要性、时间等)
Returns:
格式化后的记忆文本
"""
if isinstance(memory, ShortTermMemory):
return _format_short_term_memory(memory, include_metadata)
elif isinstance(memory, Memory):
return _format_long_term_memory(memory, include_metadata)
else:
return str(memory)
def _format_short_term_memory(mem: ShortTermMemory, include_metadata: bool) -> str:
"""
格式化短期记忆
Args:
mem: ShortTermMemory 对象
include_metadata: 是否包含元数据
Returns:
格式化后的文本
"""
parts = []
# 主体
subject = mem.subject or ""
# 主题
topic = mem.topic or ""
# 客体
obj = mem.object or ""
# 构建基础格式:主体-主题
if subject and topic:
base = f"{subject}-{topic}"
elif subject:
base = subject
elif topic:
base = topic
else:
# 如果没有结构化字段,使用 content
# 防御性编程:确保 content 是字符串
if isinstance(mem.content, list):
return " ".join(str(item) for item in mem.content)
return str(mem.content) if mem.content else ""
# 添加客体和属性
attr_parts = []
if obj:
attr_parts.append(obj)
# 添加属性
if mem.attributes:
for key, value in mem.attributes.items():
if value:
attr_parts.append(f"{key}={value}")
# 组合
if attr_parts:
result = f"{base}({', '.join(attr_parts)})"
else:
result = base
# 添加元数据(可选)
if include_metadata:
metadata_parts = []
if mem.memory_type:
metadata_parts.append(f"类型:{get_memory_type_label(mem.memory_type)}")
if mem.importance > 0:
metadata_parts.append(f"重要性:{mem.importance:.2f}")
if metadata_parts:
result = f"{result} [{', '.join(metadata_parts)}]"
return result
def _format_long_term_memory(mem: Memory, include_metadata: bool) -> str:
"""
格式化长期记忆Memory 对象)
Args:
mem: Memory 对象
include_metadata: 是否包含元数据
Returns:
格式化后的文本
"""
from src.memory_graph.models import EdgeType
# 获取主体节点
subject_node = mem.get_subject_node()
if not subject_node:
return mem.to_text()
subject = subject_node.content
# 查找主题节点
topic_node = None
for edge in mem.edges:
edge_type = edge.edge_type.value if hasattr(edge.edge_type, 'value') else str(edge.edge_type)
if edge_type == "记忆类型" and edge.source_id == mem.subject_id:
topic_node = mem.get_node_by_id(edge.target_id)
break
if not topic_node:
return subject
topic = topic_node.content
# 基础格式:主体-主题
base = f"{subject}-{topic}"
# 收集客体和属性
attr_parts = []
# 查找客体节点(通过核心关系边)
for edge in mem.edges:
edge_type = edge.edge_type.value if hasattr(edge.edge_type, 'value') else str(edge.edge_type)
if edge_type == "核心关系" and edge.source_id == topic_node.id:
obj_node = mem.get_node_by_id(edge.target_id)
if obj_node:
# 如果有关系名称,使用关系名称
if edge.relation and edge.relation != "未知":
attr_parts.append(f"{edge.relation}={obj_node.content}")
else:
attr_parts.append(obj_node.content)
# 查找属性节点
for node in mem.nodes:
if node.node_type == NodeType.ATTRIBUTE:
# 属性节点的 content 格式可能是 "key=value" 或 "value"
attr_parts.append(node.content)
# 组合
if attr_parts:
result = f"{base}({', '.join(attr_parts)})"
else:
result = base
# 添加元数据(可选)
if include_metadata:
metadata_parts = []
if mem.memory_type:
type_value = mem.memory_type.value if hasattr(mem.memory_type, 'value') else str(mem.memory_type)
metadata_parts.append(f"类型:{get_memory_type_label(type_value)}")
if mem.importance > 0:
metadata_parts.append(f"重要性:{mem.importance:.2f}")
if metadata_parts:
result = f"{result} [{', '.join(metadata_parts)}]"
return result
def format_memories_block(
memories: list[Memory | ShortTermMemory],
title: str = "相关记忆",
max_count: int = 10,
include_metadata: bool = False,
) -> str:
"""
格式化多个记忆为提示词块
Args:
memories: 记忆列表
title: 块标题
max_count: 最多显示的记忆数量
include_metadata: 是否包含元数据
Returns:
格式化后的记忆块
"""
if not memories:
return ""
lines = [f"### 🧠 {title}", ""]
for mem in memories[:max_count]:
formatted = format_memory_for_prompt(mem, include_metadata=include_metadata)
if formatted:
lines.append(f"- {formatted}")
return "\n".join(lines)

View File

@@ -1,5 +1,5 @@
[inner]
version = "7.8.0"
version = "7.8.1"
#----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -314,35 +314,32 @@ max_related_memories = 5 # 激活传播时最多影响的相关记忆数
# ==================== 三层记忆系统配置 (Three-Tier Memory System) ====================
# 受人脑记忆机制启发的分层记忆架构:
# 1. 感知记忆层 (Perceptual Memory) - 消息块的短期缓存
# 2. 短期记忆层 (Short-term Memory) - 结构化的活跃记忆
# 3. 长期记忆层 (Long-term Memory) - 持久化的图结构记忆
# 1. 感知记忆层 (Perceptual Memory) - 消息块的短期缓存,自动收集
# 2. 短期记忆层 (Short-term Memory) - 结构化的活跃记忆,模型格式化
# 3. 长期记忆层 (Long-term Memory) - 持久化的图结构记忆,批量转移
[three_tier_memory]
enable = false # 是否启用三层记忆系统(实验性功能,建议在测试环境先试用)
enable = false # 是否启用三层记忆系统(已经是核心系统,建议启用)
data_dir = "data/memory_graph/three_tier" # 数据存储目录
# --- 感知记忆层配置 ---
perceptual_max_blocks = 50 # 记忆堆最大容量(全局,不区分聊天流
perceptual_max_blocks = 50 # 记忆堆最大容量(全局)
perceptual_block_size = 5 # 每个记忆块包含的消息数量
perceptual_similarity_threshold = 0.55 # 相似度阈值0-1
perceptual_topk = 3 # TopK召回数量
activation_threshold = 3 # 激活阈值(召回次数→短期
perceptual_topk = 5 # TopK召回数量
perceptual_activation_threshold = 3 # 激活阈值(召回次数达到此值→转为短期记忆
# --- 短期记忆层配置 ---
short_term_max_memories = 30 # 短期记忆最大数量
short_term_max_memories = 30 # 短期记忆最大数量(达到上限触发批量转移)
short_term_transfer_threshold = 0.6 # 转移到长期记忆的重要性阈值0.0-1.0
short_term_search_top_k = 5 # 搜索时返回的最大数量
short_term_decay_factor = 0.98 # 衰减因子
# --- 长期记忆层配置 ---
long_term_batch_size = 10 # 批量转移大小
long_term_decay_factor = 0.95 # 衰减因子(比短期记忆慢)
long_term_auto_transfer_interval = 600 # 自动转移间隔(秒)
long_term_batch_size = 10 # 批量转移大小(每次转移多少条短期记忆)
long_term_search_top_k = 5 # 长期记忆二次检索返回数量
# --- Judge模型配置 ---
judge_model_name = "utils_small" # 用于决策的LLM模型
judge_temperature = 0.1 # Judge模型的温度参数
enable_judge_retrieval = true # 启用智能检索判断
# --- 记忆检索配置 ---
enable_judge_retrieval = true # 启用智能检索判断(裁判模型评估是否需要二次检索)
judge_confidence_threshold = 0.7 # 裁判模型置信度阈值
[voice]
enable_asr = true # 是否启用语音识别启用后MoFox-Bot可以识别语音消息启用该功能需要配置语音识别模型[model.voice]

View File

@@ -1,5 +1,5 @@
[inner]
version = "1.3.9"
version = "1.4.0"
# 配置文件版本号迭代规则同bot_config.toml
@@ -214,3 +214,25 @@ max_tokens = 800
model_list = ["deepseek-r1-distill-qwen-32b"]
temperature = 0.7
max_tokens = 800
#------------记忆系统专用模型------------
[model_task_config.memory_short_term_builder] # 短期记忆构建模型(感知→短期格式化)
model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.2-Exp"]
temperature = 0.2
max_tokens = 800
[model_task_config.memory_short_term_decider] # 短期记忆决策模型(决定合并/更新/新建/丢弃)
model_list = ["siliconflow-deepseek-ai/DeepSeek-V3.2-Exp"]
temperature = 0.2
max_tokens = 1000
[model_task_config.memory_long_term_builder] # 长期记忆构建模型(短期→长期图结构)
model_list = ["siliconflow-Qwen/Qwen3-Next-80B-A3B-Instruct"]
temperature = 0.2
max_tokens = 1500
[model_task_config.memory_judge] # 记忆检索裁判模型(判断检索是否充足)
model_list = ["siliconflow-THUDM/GLM-4-9B-0414"]
temperature = 0.1
max_tokens = 600