This commit is contained in:
tt-P607
2025-12-16 15:26:41 +08:00
7 changed files with 758 additions and 87 deletions

View File

@@ -343,8 +343,17 @@ class StatisticOutputTask(AsyncTask):
stats[period_key][REQ_CNT_BY_MODULE][module_name] += 1
stats[period_key][REQ_CNT_BY_PROVIDER][provider_name] += 1
prompt_tokens = record.get("prompt_tokens") or 0
completion_tokens = record.get("completion_tokens") or 0
# 确保 tokens 是 int 类型
try:
prompt_tokens = int(record.get("prompt_tokens") or 0)
except (ValueError, TypeError):
prompt_tokens = 0
try:
completion_tokens = int(record.get("completion_tokens") or 0)
except (ValueError, TypeError):
completion_tokens = 0
total_tokens = prompt_tokens + completion_tokens
stats[period_key][IN_TOK_BY_TYPE][request_type] += prompt_tokens
@@ -363,7 +372,13 @@ class StatisticOutputTask(AsyncTask):
stats[period_key][TOTAL_TOK_BY_MODULE][module_name] += total_tokens
stats[period_key][TOTAL_TOK_BY_PROVIDER][provider_name] += total_tokens
# 确保 cost 是 float 类型
cost = record.get("cost") or 0.0
try:
cost = float(cost) if cost else 0.0
except (ValueError, TypeError):
cost = 0.0
stats[period_key][TOTAL_COST] += cost
stats[period_key][COST_BY_TYPE][request_type] += cost
stats[period_key][COST_BY_USER][user_id] += cost
@@ -371,8 +386,12 @@ class StatisticOutputTask(AsyncTask):
stats[period_key][COST_BY_MODULE][module_name] += cost
stats[period_key][COST_BY_PROVIDER][provider_name] += cost
# 收集time_cost数据
# 收集time_cost数据,确保 time_cost 是 float 类型
time_cost = record.get("time_cost") or 0.0
try:
time_cost = float(time_cost) if time_cost else 0.0
except (ValueError, TypeError):
time_cost = 0.0
if time_cost > 0: # 只记录有效的time_cost
stats[period_key][TIME_COST_BY_TYPE][request_type].append(time_cost)
stats[period_key][TIME_COST_BY_USER][user_id].append(time_cost)

580
src/memory_graph/README.md Normal file
View File

@@ -0,0 +1,580 @@
# 🧠 MoFox 记忆系统
MoFox-Core 采用**三层分级记忆架构**,模拟人类记忆的生物特性,实现了高效、可扩展的记忆管理系统。本文档介绍系统架构、使用方法和最佳实践。
---
## 📐 系统架构
```
┌─────────────────────────────────────────────────────────────────┐
│ 用户交互 (Chat Input) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 第1层感知记忆 (Perceptual Memory) - 即时对话流 (50块) │
│ ├─ 消息分块存储每块5条消息
│ ├─ 实时激活与召回 │
│ ├─ 相似度阈值触发转移 │
│ └─ 低开销,高频率访问 │
└─────────────────────────────────────────────────────────────────┘
↓ 激活转移
┌─────────────────────────────────────────────────────────────────┐
│ 第2层短期记忆 (Short-term Memory) - 结构化信息 (30条) │
│ ├─ LLM 驱动的决策(创建/合并/更新/丢弃) │
│ ├─ 重要性评分0.0-1.0
│ ├─ 自动转移与泄压机制 │
│ └─ 平衡灵活性与容量 │
└─────────────────────────────────────────────────────────────────┘
↓ 批量转移
┌─────────────────────────────────────────────────────────────────┐
│ 第3层长期记忆 (Long-term Memory) - 知识图谱 │
│ ├─ 图数据库存储(人物、事件、关系) │
│ ├─ 向量检索与相似度匹配 │
│ ├─ 动态节点合并与边生成 │
│ └─ 无容量限制,检索精确 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ LLM 回复生成(带完整上下文) │
└─────────────────────────────────────────────────────────────────┘
```
---
## 🎯 三层记忆详解
### 第1层感知记忆 (Perceptual Memory)
**特点**
- 📍 **位置**:即时对话窗口
- 💾 **容量**50 块250 条消息)
- ⏱️ **生命周期**:短暂,激活后可转移
- 🔍 **检索**:相似度匹配
**功能**
```python
# 添加消息到感知记忆
await perceptual_manager.add_message(
user_id="user123",
message="最近在学习Python",
timestamp=datetime.now()
)
# 召回相关块
blocks = await perceptual_manager.recall_blocks(
query="你在学什么编程语言",
top_k=3
)
```
**转移触发条件**
- 块被多次激活(激活次数 ≥ 3
- 块满足转移条件后提交到短期层
### 第2层短期记忆 (Short-term Memory)
**特点**
- 📍 **位置**:结构化数据存储
- 💾 **容量**30 条记忆
- ⏱️ **生命周期**:中等,根据重要性动态转移
- 🧠 **处理**LLM 驱动决策
**功能**
```python
# LLM 提取结构化记忆
extracted = await short_term_manager.add_from_block(block)
# 检索类似记忆
similar = await short_term_manager.search_memories(
query="Python 学习进度",
top_k=5
)
# 获取待转移记忆
to_transfer = short_term_manager.get_memories_for_transfer()
```
**决策类型**
| 决策 | 说明 | 场景 |
|------|------|------|
| `CREATE_NEW` | 创建新记忆 | 全新信息 |
| `MERGE` | 合并到现有 | 补充细节 |
| `UPDATE` | 更新现有 | 信息演变 |
| `DISCARD` | 丢弃 | 冗余/过时 |
**重要性评分**
```
高重要性 (≥0.6) → 优先转移到长期层
低重要性 (<0.6) → 保留或在容量溢出时删除
```
**容量管理**
-**自动转移**:占用率 ≥ 50% 时开始批量转移
- 🛡️ **泄压机制**:容量 100% 时删除低优先级记忆
- ⚙️ **配置**`short_term_max_memories = 30`
### 第3层长期记忆 (Long-term Memory)
**特点**
- 📍 **位置**图数据库NetworkX + Chroma
- 💾 **容量**:无限
- ⏱️ **生命周期**:持久,可检索
- 📊 **结构**:知识图谱
**功能**
```python
# 转移短期记忆到长期图
result = await long_term_manager.transfer_from_short_term(
short_term_memories
)
# 图检索
results = await memory_manager.search_memories(
query="用户的编程经验",
top_k=5
)
```
**知识图谱节点类型**
- 👤 **PERSON**:人物、角色
- 📅 **EVENT**:发生过的事件
- 💡 **CONCEPT**:概念、想法
- 🎯 **GOAL**:目标、计划
**节点关系**
- `participated_in`:参与了某事件
- `mentioned`:提及了某人/物
- `similar_to`:相似
- `related_to`:相关
- `caused_by`:由...导致
---
## 🔧 配置说明
### 基础配置
**文件**`config/bot_config.toml`
```toml
[memory]
# 启用/禁用记忆系统
enable = true
# 数据存储
data_dir = "data/memory_graph"
vector_collection_name = "memory_nodes"
vector_db_path = "data/memory_graph/chroma_db"
# 感知记忆
perceptual_max_blocks = 50 # 最大块数
perceptual_block_size = 5 # 每块消息数
perceptual_similarity_threshold = 0.55 # 召回阈值
perceptual_activation_threshold = 3 # 转移激活阈值
# 短期记忆
short_term_max_memories = 30 # 容量上限
short_term_transfer_threshold = 0.6 # 转移重要性阈值
short_term_enable_force_cleanup = true # 启用泄压
short_term_cleanup_keep_ratio = 0.9 # 泄压保留比例
# 长期记忆
long_term_batch_size = 10 # 批量转移大小
long_term_decay_factor = 0.95 # 激活衰减因子
long_term_auto_transfer_interval = 180 # 转移检查间隔(秒)
# 检索配置
search_top_k = 10 # 默认返回数量
search_min_importance = 0.3 # 最小重要性过滤
search_similarity_threshold = 0.6 # 相似度阈值
```
### 高级配置
```toml
[memory]
# 路径评分扩展(更精确的图检索)
enable_path_expansion = false # 启用算法
path_expansion_max_hops = 2 # 最大跳数
path_expansion_damping_factor = 0.85 # 衰减因子
path_expansion_max_branches = 10 # 分支限制
# 记忆激活
activation_decay_rate = 0.9 # 每天衰减10%
activation_propagation_strength = 0.5 # 传播强度
activation_propagation_depth = 1 # 传播深度
# 遗忘机制
forgetting_enabled = true # 启用遗忘
forgetting_activation_threshold = 0.1 # 遗忘激活度阈值
forgetting_min_importance = 0.8 # 保护重要性阈值
```
---
## 📚 使用示例
### 1. 初始化记忆系统
```python
from src.memory_graph.manager_singleton import (
initialize_unified_memory_manager,
get_unified_memory_manager
)
# 初始化系统
await initialize_unified_memory_manager()
# 获取管理器
manager = get_unified_memory_manager()
```
### 2. 添加感知记忆
```python
from src.memory_graph.models import MemoryBlock
# 模拟一个消息块
block = MemoryBlock(
id="msg_001",
content="用户提到在做一个Python爬虫项目",
timestamp=datetime.now(),
source="chat"
)
# 添加到感知层
await manager.add_memory(block, source="perceptual")
```
### 3. 智能检索记忆
```python
# 统一检索(从感知→短期→长期)
result = await manager.retrieve_memories(
query="最近在做什么项目",
use_judge=True # 使用裁判模型评估是否需要检索长期
)
# 访问不同层的结果
perceptual = result["perceptual_blocks"]
short_term = result["short_term_memories"]
long_term = result["long_term_memories"]
```
### 4. 手动触发转移
```python
# 立即转移短期→长期
result = await manager.manual_transfer()
print(f"转移了 {result['transferred_memory_ids']} 条记忆到长期层")
```
### 5. 获取统计信息
```python
stats = manager.get_statistics()
print(f"感知记忆块数:{stats['perceptual_blocks']}")
print(f"短期记忆数:{stats['short_term_memories']}")
print(f"长期记忆节点数:{stats['long_term_nodes']}")
print(f"图边数:{stats['long_term_edges']}")
```
---
## 🔄 转移流程
### 自动转移循环
系统在后台持续运行自动转移循环,确保记忆及时流转:
```
每 N 秒(可配置):
1. 检查短期记忆容量
2. 获取待转移的高重要性记忆
3. 如果缓存满或容量高,触发转移
4. 发送到长期管理器处理
5. 从短期层清除已转移记忆
```
**触发条件**(任一满足):
- 短期记忆占用率 ≥ 50%
- 缓存记忆数 ≥ 批量大小
- 距上次转移超过最大延迟
- 短期记忆达到容量上限
**代码位置**`src/memory_graph/unified_manager.py` 第 576-650 行
### 转移决策
长期记忆管理器对每条短期记忆做出决策:
```python
# LLM 决策过程
for short_term_memory in batch:
# 1. 检索相似的长期记忆
similar = await search_long_term(short_term_memory)
# 2. LLM 做出决策
decision = await llm_decide({
'short_term': short_term_memory,
'similar_long_term': similar
})
# 3. 执行决策
if decision == 'CREATE_NEW':
create_new_node()
elif decision == 'MERGE':
merge_into_existing()
elif decision == 'UPDATE':
update_existing()
```
---
## 🛡️ 容量管理策略
### 正常流程
```
短期记忆累积 → 达到 50% → 自动转移 → 长期记忆保存
```
### 压力场景
```
高频消息流 → 短期快速堆积
达到 100% → 转移来不及
启用泄压机制 → 删除低优先级记忆
保护核心数据,防止阻塞
```
**泄压参数**
```toml
short_term_enable_force_cleanup = true # 启用泄压
short_term_cleanup_keep_ratio = 0.9 # 保留 90% 容量
```
**删除策略**
- 优先删除:**重要性低 AND 创建时间早**
- 保留:高重要性记忆永不删除
---
## 📊 性能特性
### 时间复杂度
| 操作 | 复杂度 | 说明 |
|------|--------|------|
| 感知记忆添加 | O(1) | 直接追加 |
| 感知记忆召回 | O(n) | 相似度匹配 |
| 短期记忆添加 | O(1) | 直接追加 |
| 短期记忆搜索 | O(n) | 向量相似度 |
| 长期记忆检索 | O(log n) | 向量数据库 + 图遍历 |
| 转移操作 | O(n) | 批量处理 |
### 空间复杂度
| 层级 | 估计空间 | 配置 |
|------|---------|------|
| 感知层 | ~5-10 MB | 50 块 × 5 消息 |
| 短期层 | ~1-2 MB | 30 条记忆 |
| 长期层 | ~50-200 MB | 根据对话历史 |
### 优化技巧
1. **缓存去重**:避免同一记忆被转移多次
2. **批量转移**:减少 LLM 调用次数
3. **异步操作**:后台转移,不阻塞主流程
4. **自适应轮询**:根据容量压力调整检查间隔
---
## 🔍 检索策略
### 三层联合检索
```python
result = await manager.retrieve_memories(query, use_judge=True)
```
**流程**
1. 检索感知层(即时对话)
2. 检索短期层(结构化信息)
3. 使用裁判模型判断是否充足
4. 如不充足,检索长期层(知识图)
**裁判模型**
- 评估现有记忆是否满足查询
- 生成补充查询词
- 决策是否需要长期检索
### 路径评分扩展(可选)
启用后使用 PageRank 风格算法在图中传播分数:
```toml
enable_path_expansion = true
path_expansion_max_hops = 2
path_expansion_damping_factor = 0.85
```
**优势**
- 发现间接关联信息
- 上下文更丰富
- 精确度提高 15-25%
---
## 🐛 故障排查
### 问题1短期记忆快速堆积
**症状**:短期层记忆数快速增长,转移缓慢
**排查**
```python
# 查看统计信息
stats = manager.get_statistics()
print(f"短期记忆占用率: {stats['short_term_occupancy']:.0%}")
print(f"待转移记忆: {len(manager.short_term_manager.get_memories_for_transfer())}")
```
**解决**
- 减小 `long_term_auto_transfer_interval`(加快转移频率)
- 增加 `long_term_batch_size`(一次转移更多)
- 提高 `short_term_transfer_threshold`(更多记忆被转移)
### 问题2长期记忆检索结果不相关
**症状**:搜索返回的记忆与查询不匹配
**排查**
```python
# 启用调试日志
import logging
logging.getLogger("src.memory_graph").setLevel(logging.DEBUG)
# 重试检索
result = await manager.retrieve_memories(query, use_judge=True)
# 检查日志中的相似度评分
```
**解决**
- 增加 `search_top_k`(返回更多候选)
- 降低 `search_similarity_threshold`(放宽相似度要求)
- 检查向量模型是否加载正确
### 问题3转移失败导致记忆丢失
**症状**:短期记忆无故消失,长期层未出现
**排查**
```python
# 检查日志中的转移错误
# 查看长期管理器的错误日志
```
**解决**
- 检查 LLM 模型配置
- 确保长期图存储正常运行
- 增加转移超时时间
---
## 🎓 最佳实践
### 1. 合理配置容量
```toml
# 低频场景(私聊)
perceptual_max_blocks = 20
short_term_max_memories = 15
# 中等频率(小群)
perceptual_max_blocks = 50
short_term_max_memories = 30
# 高频场景(大群/客服)
perceptual_max_blocks = 100
short_term_max_memories = 50
short_term_enable_force_cleanup = true
```
### 2. 启用泄压保护
```toml
# 对于 24/7 运行的机器人
short_term_enable_force_cleanup = true
short_term_cleanup_keep_ratio = 0.85 # 更激进的清理
```
### 3. 定期监控
```python
# 在定时任务中检查
async def monitor_memory():
stats = manager.get_statistics()
if stats['short_term_occupancy'] > 0.8:
logger.warning("短期记忆压力高,考虑扩容")
if stats['long_term_nodes'] > 10000:
logger.warning("长期图规模大,检索可能变慢")
```
### 4. 使用裁判模型
```python
# 启用以提高检索质量
result = await manager.retrieve_memories(
query=user_query,
use_judge=True # 自动判断是否需要长期检索
)
```
---
## 📖 相关文档
- [三层记忆系统用户指南](../../docs/three_tier_memory_user_guide.md)
- [记忆图谱架构](../../docs/memory_graph_guide.md)
- [短期记忆压力泄压补丁](./short_term_pressure_patch.md)
- [转移算法分析](../../docs/memory_transfer_algorithm_analysis.md)
- [统一调度器指南](../../docs/unified_scheduler_guide.md)
---
## 🎯 快速导航
### 核心模块
| 模块 | 功能 | 文件 |
|------|------|------|
| 感知管理 | 消息分块、激活、转移 | `perceptual_manager.py` |
| 短期管理 | LLM 决策、合并、转移 | `short_term_manager.py` |
| 长期管理 | 图操作、节点合并 | `long_term_manager.py` |
| 统一接口 | 自动转移循环、检索 | `unified_manager.py` |
| 单例访问 | 全局管理器获取 | `manager_singleton.py` |
### 辅助工具
| 工具 | 功能 | 文件 |
|------|------|------|
| 向量生成 | 文本嵌入 | `utils/embeddings.py` |
| 相似度计算 | 余弦相似度 | `utils/similarity.py` |
| 格式化器 | 三层数据格式化 | `utils/three_tier_formatter.py` |
| 存储系统 | 磁盘持久化 | `storage/` |
---
## 📝 版本信息
- **架构**:三层分级记忆系统
- **存储**SQLAlchemy 2.0 + Chroma 向量库
- **图数据库**NetworkX
- **最后更新**2025 年 12 月 16 日

View File

@@ -956,14 +956,30 @@ class LongTermMemoryManager:
logger.warning(f"创建边失败: 缺少节点ID ({source_id} -> {target_id})")
return
# 检查节点是否存在
if not self.memory_manager.graph_store or not self.memory_manager.graph_store.graph.has_node(source_id):
logger.warning(f"创建边失败: 源节点不存在 ({source_id})")
return
if not self.memory_manager.graph_store or not self.memory_manager.graph_store.graph.has_node(target_id):
logger.warning(f"创建边失败: 目标节点不存在 ({target_id})")
if not self.memory_manager.graph_store:
logger.warning("创建边失败: 图存储未初始化")
return
# 检查和创建节点(如果不存在则创建占位符)
if not self.memory_manager.graph_store.graph.has_node(source_id):
logger.debug(f"源节点不存在,创建占位符节点: {source_id}")
self.memory_manager.graph_store.add_node(
node_id=source_id,
node_type="event",
content=f"临时节点 - {source_id}",
metadata={"placeholder": True, "created_by": "long_term_manager_edge_creation"}
)
if not self.memory_manager.graph_store.graph.has_node(target_id):
logger.debug(f"目标节点不存在,创建占位符节点: {target_id}")
self.memory_manager.graph_store.add_node(
node_id=target_id,
node_type="event",
content=f"临时节点 - {target_id}",
metadata={"placeholder": True, "created_by": "long_term_manager_edge_creation"}
)
# 现在两个节点都存在,可以创建边
edge_id = self.memory_manager.graph_store.add_edge(
source_id=source_id,
target_id=target_id,

View File

@@ -166,6 +166,8 @@ async def initialize_unified_memory_manager():
# 短期记忆配置
short_term_max_memories=getattr(config, "short_term_max_memories", 30),
short_term_transfer_threshold=getattr(config, "short_term_transfer_threshold", 0.6),
short_term_enable_force_cleanup=getattr(config, "short_term_enable_force_cleanup", True),
short_term_cleanup_keep_ratio=getattr(config, "short_term_cleanup_keep_ratio", 0.9),
# 长期记忆配置
long_term_batch_size=getattr(config, "long_term_batch_size", 10),
long_term_search_top_k=getattr(config, "search_top_k", 5),

View File

@@ -44,6 +44,7 @@ class ShortTermMemoryManager:
transfer_importance_threshold: float = 0.6,
llm_temperature: float = 0.2,
enable_force_cleanup: bool = False,
cleanup_keep_ratio: float = 0.9,
):
"""
初始化短期记忆层管理器
@@ -53,6 +54,8 @@ class ShortTermMemoryManager:
max_memories: 最大短期记忆数量
transfer_importance_threshold: 转移到长期记忆的重要性阈值
llm_temperature: LLM 决策的温度参数
enable_force_cleanup: 是否启用泄压功能
cleanup_keep_ratio: 泄压时保留容量的比例默认0.9表示保留90%
"""
self.data_dir = data_dir or Path("data/memory_graph")
self.data_dir.mkdir(parents=True, exist_ok=True)
@@ -62,6 +65,7 @@ class ShortTermMemoryManager:
self.transfer_importance_threshold = transfer_importance_threshold
self.llm_temperature = llm_temperature
self.enable_force_cleanup = enable_force_cleanup
self.cleanup_keep_ratio = cleanup_keep_ratio
# 核心数据
self.memories: list[ShortTermMemory] = []
@@ -635,69 +639,76 @@ class ShortTermMemoryManager:
def get_memories_for_transfer(self) -> list[ShortTermMemory]:
"""
获取需要转移到长期记忆的记忆(优化版:单次遍历
获取需要转移到长期记忆的记忆(改进版:转移优先于删除
逻辑
1. 优先选择重要性 >= 阈值的记忆
2. 如果剩余记忆数量仍超过 max_memories直接清理最早的低重要性记忆直到低于上限
优化的转移策略
1. 优先选择重要性 >= 阈值的记忆进行转移
2. 如果高重要性记忆已清空但仍超过容量,则考虑转移低重要性记忆
3. 仅当转移不能解决容量问题时,才进行强制删除(由 force_cleanup_overflow 处理)
返回:
需要转移的记忆列表(优先返回高重要性,次选低重要性)
"""
# 单次遍历:同时分类高重要性和低重要性记忆
candidates = []
high_importance_memories = []
low_importance_memories = []
for mem in self.memories:
if mem.importance >= self.transfer_importance_threshold:
candidates.append(mem)
high_importance_memories.append(mem)
else:
low_importance_memories.append(mem)
# 如果总体记忆数量超过了上限,优先清理低重要性最早创建的记忆
# 策略1优先返回高重要性记忆进行转移
if high_importance_memories:
logger.debug(
f"转移候选: 发现 {len(high_importance_memories)} 条高重要性记忆待转移"
)
return high_importance_memories
# 策略2如果没有高重要性记忆但总体超过容量上限
# 返回一部分低重要性记忆用于转移(而非删除)
if len(self.memories) > self.max_memories:
# 目标保留数量(降至上限的 90%
target_keep_count = int(self.max_memories * 0.9)
# 需要删除的数量(从当前总数降到 target_keep_count
num_to_remove = len(self.memories) - target_keep_count
if num_to_remove > 0 and low_importance_memories:
# 按创建时间排序,删除最早的低重要性记忆
low_importance_memories.sort(key=lambda x: x.created_at)
to_remove = low_importance_memories[:num_to_remove]
# 批量删除并更新索引
remove_ids = {mem.id for mem in to_remove}
self.memories = [mem for mem in self.memories if mem.id not in remove_ids]
for mem_id in remove_ids:
self._memory_id_index.pop(mem_id, None)
self._similarity_cache.pop(mem_id, None)
logger.info(
f"短期记忆清理: 移除了 {len(to_remove)} 条低重要性记忆 "
f"(保留 {len(self.memories)} 条)"
)
# 触发保存
asyncio.create_task(self._save_to_disk())
# 优先返回高重要性候选
if candidates:
return candidates
# 如果没有高重要性候选但总体超过上限,返回按创建时间最早的低重要性记忆作为后备转移候选
if len(self.memories) > self.max_memories:
needed = len(self.memories) - self.max_memories + 1
# 计算需要转移的数量(目标:降到上限
num_to_transfer = len(self.memories) - self.max_memories
# 按创建时间排序低重要性记忆,优先转移最早的(可能包含过时信息)
low_importance_memories.sort(key=lambda x: x.created_at)
return low_importance_memories[:needed]
to_transfer = low_importance_memories[:num_to_transfer]
if to_transfer:
logger.debug(
f"转移候选: 发现 {len(to_transfer)} 条低重要性记忆待转移 "
f"(当前容量 {len(self.memories)}/{self.max_memories})"
)
return to_transfer
return candidates
# 策略3容量充足无需转移
logger.debug(
f"转移检查: 无需转移 (当前容量 {len(self.memories)}/{self.max_memories})"
)
return []
def force_cleanup_overflow(self, keep_ratio: float = 0.9) -> int:
"""当短期记忆超过容量时,强制删除低重要性且最早的记忆以泄压"""
def force_cleanup_overflow(self, keep_ratio: float | None = None) -> int:
"""
当短期记忆超过容量时,强制删除低重要性且最早的记忆以泄压
Args:
keep_ratio: 保留容量的比例(默认使用配置中的 cleanup_keep_ratio
Returns:
删除的记忆数量
"""
if not self.enable_force_cleanup:
return 0
if self.max_memories <= 0:
return 0
# 使用实例配置或传入参数
if keep_ratio is None:
keep_ratio = self.cleanup_keep_ratio
current = len(self.memories)
limit = int(self.max_memories * keep_ratio)
if current <= self.max_memories:

View File

@@ -5,10 +5,11 @@
在高频消息场景下,短期记忆层(`ShortTermMemoryManager`)可能在自动转移机制触发前快速堆积大量记忆,当达到容量上限(`max_memories`)时可能阻塞后续写入。本功能提供一个**可选的泄压开关**,在容量溢出时自动删除低优先级记忆,防止系统阻塞。
**关键特性**
- ✅ 默认关闭保持向后兼容
- ✅ 默认开启(在高频场景中保护系统),可关闭保持向后兼容
- ✅ 基于重要性和时间的智能删除策略
- ✅ 异步持久化,不阻塞主流程
- ✅ 可通过配置文件或代码控制
- ✅ 可通过配置文件或代码灵活控制
- ✅ 支持自定义保留比例
---
@@ -22,36 +23,38 @@
from src.memory_graph.unified_manager import UnifiedMemoryManager
manager = UnifiedMemoryManager(
short_term_enable_force_cleanup=True, # 开启泄压功能
short_term_max_memories=30, # 短期记忆容量上限
short_term_enable_force_cleanup=True, # 开启泄压功能
short_term_cleanup_keep_ratio=0.9, # 泄压时保留容量的比例90%
short_term_max_memories=30, # 短期记忆容量上限
# ... 其他参数
)
```
### 方法 2配置文件通过单例获取
**推荐方式**:如果您使用 `get_unified_memory_manager()` 单例,需修改配置文件。
**推荐方式**:如果您使用 `get_unified_memory_manager()` 单例,通过配置文件控制
#### ❌ 目前的问题
配置文件 `config/bot_config.toml``[memory]`**尚未包含**此开关参数。
#### ✅ 已实现
配置文件 `config/bot_config.toml``[memory]`已包含此参数。
#### ✅ 解决方案
`config/bot_config.toml``[memory]` 节添加:
`config/bot_config.toml``[memory]` 节配置:
```toml
[memory]
# ... 其他配置 ...
short_term_max_memories = 30 # 短期记忆容量上限
short_term_transfer_threshold = 0.6 # 转移到长期记忆的重要性阈值
short_term_enable_force_cleanup = true # 开启压力泄压(建议高频场景开启)
short_term_max_memories = 30 # 短期记忆容量上限
short_term_transfer_threshold = 0.6 # 转移到长期记忆的重要性阈值
short_term_enable_force_cleanup = true # 开启压力泄压(建议高频场景开启)
short_term_cleanup_keep_ratio = 0.9 # 泄压时保留容量的比例保留90%
```
然后在 `src/memory_graph/manager_singleton.py` 第 157-175 行的 `get_unified_memory_manager()` 函数中添加读取逻辑
配置自动由 `src/memory_graph/manager_singleton.py` 读取并传递给管理器
```python
_unified_memory_manager = UnifiedMemoryManager(
# ... 其他参数 ...
short_term_enable_force_cleanup=getattr(config, "short_term_enable_force_cleanup", False), # 添加此行
short_term_enable_force_cleanup=getattr(config, "short_term_enable_force_cleanup", True),
short_term_cleanup_keep_ratio=getattr(config, "short_term_cleanup_keep_ratio", 0.9),
)
```
@@ -60,41 +63,68 @@ _unified_memory_manager = UnifiedMemoryManager(
## ⚙️ 核心实现位置
### 1. 参数定义
**文件**`src/memory_graph/unified_manager.py` 第 47
**文件**`src/memory_graph/unified_manager.py`35-54 行
```python
class UnifiedMemoryManager:
def __init__(
self,
# ... 其他参数 ...
short_term_enable_force_cleanup: bool = False, # 开关参数
short_term_cleanup_keep_ratio: float = 0.9, # 保留比例参数
# ... 其他参数
):
```
### 2. 传递到短期层
**文件**`src/memory_graph/unified_manager.py` 第 100
**文件**`src/memory_graph/unified_manager.py`94-106
```python
"short_term": {
"enable_force_cleanup": short_term_enable_force_cleanup, # 传递给 ShortTermMemoryManager
self._config = {
"short_term": {
"max_memories": short_term_max_memories,
"transfer_importance_threshold": short_term_transfer_threshold,
"enable_force_cleanup": short_term_enable_force_cleanup, # 传递给 ShortTermMemoryManager
"cleanup_keep_ratio": short_term_cleanup_keep_ratio, # 传递保留比例
},
# ... 其他配置
}
```
### 3. 泄压逻辑实现
**文件**`src/memory_graph/short_term_manager.py` 第 693-726 行
**文件**`src/memory_graph/short_term_manager.py` 40-76 行(初始化)和第 697-745 行(执行)
初始化参数:
```python
def force_cleanup_overflow(self, keep_ratio: float = 0.9) -> int:
class ShortTermMemoryManager:
def __init__(
self,
max_memories: int = 30,
enable_force_cleanup: bool = False,
cleanup_keep_ratio: float = 0.9, # 新参数
):
self.enable_force_cleanup = enable_force_cleanup
self.cleanup_keep_ratio = cleanup_keep_ratio
```
执行泄压:
```python
def force_cleanup_overflow(self, keep_ratio: float | None = None) -> int:
"""当短期记忆超过容量时,强制删除低重要性且最早的记忆以泄压"""
if not self.enable_force_cleanup: # 检查开关
return 0
if keep_ratio is None:
keep_ratio = self.cleanup_keep_ratio # 使用实例配置
# ... 删除逻辑
```
### 4. 触发条件
**文件**`src/memory_graph/unified_manager.py` 第 618-621 行
**文件**`src/memory_graph/unified_manager.py` 自动转移循环中
```python
# 在自动转移循环中检测
# 在自动转移循环中检测容量溢出
if occupancy_ratio >= 1.0 and not transfer_cache:
removed = self.short_term_manager.force_cleanup_overflow()
if removed > 0:
logger.warning(f"短期记忆占用率 {occupancy_ratio:.0%},已强制删除 {removed} 条低重要性记忆泄压")
logger.warning(f"短期记忆压力泄压: 移除 {removed} 条 (当前 {len}/30)")
```
---
@@ -112,17 +142,18 @@ if occupancy_ratio >= 1.0 and not transfer_cache:
sorted_memories = sorted(self.memories, key=lambda m: (m.importance, m.created_at))
```
**删除数量**:删除到容量的 90%
**删除数量**根据 `cleanup_keep_ratio` 删除
```python
current = len(self.memories) # 当前记忆数
limit = int(self.max_memories * 0.9) # 目标保留数
remove_count = current - limit # 需要删除的数量
current = len(self.memories) # 当前记忆数
limit = int(self.max_memories * keep_ratio) # 目标保留数
remove_count = current - limit # 需要删除的数量
```
**示例**
- 容量上限 `max_memories=30`
- 当前记忆数 `35` 删除 `35 - 27 = 8` 条最低优先级记忆
- 优先删除:重要性 0.1 且创建于 10 分钟前的记忆
**示例**`max_memories=30, keep_ratio=0.9`
- 当前记忆数 `35` → 删除到 `27` 条(保留 90%
- 删除 `35 - 27 = 8` 条最低优先级记忆
- 优先删除:重要性最低且创建时间最早的记忆
- 删除后异步保存,不阻塞主流程
### 持久化
- 使用 `asyncio.create_task(self._save_to_disk())` 异步保存
@@ -149,8 +180,8 @@ remove_count = current - limit # 需要删除的数量
## 🚨 注意事项
### ⚠️ 何时开启
-**推荐开启**高频群聊、客服机器人、24/7 运行场景
- **不建议开启**:需要完整保留所有短期记忆调试阶段
-**默认开启**高频群聊、客服机器人、24/7 运行场景
- ⚠️ **可选关闭**:需要完整保留所有短期记忆调试阶段
### ⚠️ 潜在影响
- 低重要性记忆可能被删除,**不会转移到长期记忆**
@@ -172,11 +203,12 @@ remove_count = current - limit # 需要删除的数量
unified_manager.short_term_manager.enable_force_cleanup = False
```
### 永久禁用
### 永久关闭
**配置文件方式**
```toml
[memory]
short_term_enable_force_cleanup = false # 或直接删除此行
short_term_enable_force_cleanup = false # 关闭泄压
short_term_cleanup_keep_ratio = 0.9 # 此时该参数被忽略
```
**代码方式**
@@ -196,4 +228,13 @@ manager = UnifiedMemoryManager(
---
## 📝 实现状态
**已完成**2025年12月16日
- 配置文件已添加 `short_term_enable_force_cleanup``short_term_cleanup_keep_ratio` 参数
- `UnifiedMemoryManager` 支持新参数并正确传递配置
- `ShortTermMemoryManager` 实现完整的泄压逻辑
- `manager_singleton.py` 读取并应用配置
- 日志系统正确记录泄压事件
**最后更新**2025年12月16日

View File

@@ -45,6 +45,7 @@ class UnifiedMemoryManager:
short_term_max_memories: int = 30,
short_term_transfer_threshold: float = 0.6,
short_term_enable_force_cleanup: bool = False,
short_term_cleanup_keep_ratio: float = 0.9,
# 长期记忆配置
long_term_batch_size: int = 10,
long_term_search_top_k: int = 5,
@@ -98,6 +99,7 @@ class UnifiedMemoryManager:
"max_memories": short_term_max_memories,
"transfer_importance_threshold": short_term_transfer_threshold,
"enable_force_cleanup": short_term_enable_force_cleanup,
"cleanup_keep_ratio": short_term_cleanup_keep_ratio,
},
"long_term": {
"batch_size": long_term_batch_size,