This commit is contained in:
Windpicker-owo
2025-12-16 15:56:40 +08:00
18 changed files with 1935 additions and 209 deletions

View File

@@ -0,0 +1,283 @@
# Napcat 视频处理配置指南
## 概述
本指南说明如何在 MoFox-Bot 中配置和控制 Napcat 适配器的视频消息处理功能。
**相关 Issue**: [#10 - 强烈请求有个开关选择是否下载视频](https://github.com/MoFox-Studio/MoFox-Core/issues/10)
---
## 快速开始
### 关闭视频下载(推荐用于低配机器或有限带宽)
编辑 `config/bot_config.toml`,找到 `[napcat_adapter.features]` 段落,修改:
```toml
[napcat_adapter.features]
enable_video_processing = false # 改为 false 关闭视频处理
```
**效果**:视频消息会显示为 `[视频消息]`,不会进行下载。
---
## 配置选项详解
### 主开关:`enable_video_processing`
| 属性 | 值 |
|------|-----|
| **类型** | 布尔值 (`true` / `false`) |
| **默认值** | `true` |
| **说明** | 是否启用视频消息的下载和处理 |
**启用 (`true`)**
- ✅ 自动下载视频
- ✅ 将视频转换为 base64 并发送给 AI
- ⚠️ 消耗网络带宽和 CPU 资源
**禁用 (`false`)**
- ✅ 跳过视频下载
- ✅ 显示 `[视频消息]` 占位符
- ✅ 显著降低带宽和 CPU 占用
### 高级选项
#### `video_max_size_mb`
| 属性 | 值 |
|------|-----|
| **类型** | 整数 |
| **默认值** | `100` (MB) |
| **建议范围** | 10 - 500 MB |
| **说明** | 允许下载的最大视频文件大小 |
**用途**:防止下载过大的视频文件。
**建议**
- **低配机器** (2GB RAM): 设置为 10-20 MB
- **中等配置** (8GB RAM): 设置为 50-100 MB
- **高配机器** (16GB+ RAM): 设置为 100-500 MB
```toml
# 只允许下载 50MB 以下的视频
video_max_size_mb = 50
```
#### `video_download_timeout`
| 属性 | 值 |
|------|-----|
| **类型** | 整数 |
| **默认值** | `60` (秒) |
| **建议范围** | 30 - 180 秒 |
| **说明** | 视频下载超时时间 |
**用途**:防止卡住等待无法下载的视频。
**建议**
- **网络较差** (2-5 Mbps): 设置为 120-180 秒
- **网络一般** (5-20 Mbps): 设置为 60-120 秒
- **网络较好** (20+ Mbps): 设置为 30-60 秒
```toml
# 下载超时时间改为 120 秒
video_download_timeout = 120
```
---
## 常见配置场景
### 场景 1服务器带宽有限
**症状**:群聊消息中经常出现大量视频,导致网络流量爆满。
**解决方案**
```toml
[napcat_adapter.features]
enable_video_processing = false # 完全关闭
```
### 场景 2机器性能较低
**症状**:处理视频消息时 CPU 占用率高,其他功能响应变慢。
**解决方案**
```toml
[napcat_adapter.features]
enable_video_processing = true
video_max_size_mb = 20 # 限制小视频
video_download_timeout = 30 # 快速超时
```
### 场景 3特定时间段关闭视频处理
如果需要在特定时间段内关闭视频处理,可以:
1. 修改配置文件
2. 调用 API 重新加载配置(如果支持)
例如:在工作时间关闭,下班后打开。
### 场景 4保留所有视频处理默认行为
```toml
[napcat_adapter.features]
enable_video_processing = true
video_max_size_mb = 100
video_download_timeout = 60
```
---
## 工作原理
### 启用视频处理的流程
```
消息到达
检查 enable_video_processing
├─ false → 返回 [视频消息] 占位符 ✓
└─ true ↓
检查文件大小
├─ > video_max_size_mb → 返回错误信息 ✓
└─ ≤ video_max_size_mb ↓
开始下载(最多等待 video_download_timeout 秒)
├─ 成功 → 返回视频数据 ✓
├─ 超时 → 返回超时错误 ✓
└─ 失败 → 返回错误信息 ✓
```
### 禁用视频处理的流程
```
消息到达
检查 enable_video_processing
└─ false → 立即返回 [视频消息] 占位符 ✓
(节省带宽和 CPU
```
---
## 错误处理
当视频处理出现问题时,用户会看到以下占位符消息:
| 消息 | 含义 |
|------|------|
| `[视频消息]` | 视频处理已禁用或信息不完整 |
| `[视频消息] (文件过大)` | 视频大小超过限制 |
| `[视频消息] (下载失败)` | 网络错误或服务不可用 |
| `[视频消息处理出错]` | 其他异常错误 |
这些占位符确保消息不会因为视频处理失败而导致程序崩溃。
---
## 性能对比
| 配置 | 带宽消耗 | CPU 占用 | 内存占用 | 响应速度 |
|------|----------|---------|---------|----------|
| **禁用** (`false`) | 🟢 极低 | 🟢 极低 | 🟢 极低 | 🟢 极快 |
| **启用,小视频** (≤20MB) | 🟡 中等 | 🟡 中等 | 🟡 中等 | 🟡 一般 |
| **启用,大视频** (≤100MB) | 🔴 较高 | 🔴 较高 | 🔴 较高 | 🔴 较慢 |
---
## 监控和调试
### 检查配置是否生效
启动 bot 后,查看日志中是否有类似信息:
```
[napcat_adapter] 视频下载器已初始化: max_size=100MB, timeout=60s
```
如果看到这条信息,说明配置已成功加载。
### 监控视频处理
当处理视频消息时,日志中会记录:
```
[video_handler] 开始下载视频: https://...
[video_handler] 视频下载成功,大小: 25.50 MB
```
或者:
```
[napcat_adapter] 视频消息处理已禁用,跳过
```
---
## 常见问题
### Q1: 关闭视频处理会影响 AI 的回复吗?
**A**: 不会。AI 仍然能看到 `[视频消息]` 占位符,可以根据上下文判断是否涉及视频内容。
### Q2: 可以为不同群组设置不同的视频处理策略吗?
**A**: 当前版本不支持。所有群组使用相同的配置。如需支持,请在 Issue 或讨论中提出。
### Q3: 视频下载会影响消息处理延迟吗?
**A**: 会。下载大视频可能需要几秒钟。建议:
- 设置合理的 `video_download_timeout`
- 或禁用视频处理以获得最快响应
### Q4: 修改配置后需要重启吗?
**A**: 是的。需要重启 bot 才能应用新配置。
### Q5: 如何快速诊断视频下载问题?
**A**:
1. 检查日志中的错误信息
2. 验证网络连接
3. 检查 `video_max_size_mb` 是否设置过小
4. 尝试增加 `video_download_timeout`
---
## 最佳实践
1. **新用户建议**:先启用视频处理,如果出现性能问题再调整参数或关闭。
2. **生产环境建议**
- 定期监控日志中的视频处理错误
- 根据实际网络和 CPU 情况调整参数
- 在高峰期可考虑关闭视频处理
3. **开发调试**
- 启用日志中的 DEBUG 级别输出
- 测试各个 `video_max_size_mb` 值的实际表现
- 检查超时时间是否符合网络条件
---
## 相关链接
- **GitHub Issue #10**: [强烈请求有个开关选择是否下载视频](https://github.com/MoFox-Studio/MoFox-Core/issues/10)
- **配置文件**: `config/bot_config.toml`
- **实现代码**:
- `src/plugins/built_in/napcat_adapter/plugin.py`
- `src/plugins/built_in/napcat_adapter/src/handlers/to_core/message_handler.py`
- `src/plugins/built_in/napcat_adapter/src/handlers/video_handler.py`
---
## 反馈和建议
如有其他问题或建议,欢迎在 GitHub Issue 中提出。
**版本**: v2.1.0
**最后更新**: 2025-12-16

View File

@@ -0,0 +1,134 @@
# Napcat 适配器视频处理配置完成总结
## 修改内容
### 1. **增强配置定义** (`plugin.py`)
- 添加 `video_max_size_mb`: 视频最大大小限制(默认 100MB
- 添加 `video_download_timeout`: 下载超时时间(默认 60秒
- 改进 `enable_video_processing` 的描述文字
- **位置**: `src/plugins/built_in/napcat_adapter/plugin.py` L417-430
### 2. **改进消息处理器** (`message_handler.py`)
- 添加 `_video_downloader` 成员变量存储下载器实例
- 改进 `set_plugin_config()` 方法,根据配置初始化视频下载器
- 改进视频下载调用,使用初始化时的配置
- **位置**: `src/plugins/built_in/napcat_adapter/src/handlers/to_core/message_handler.py` L32-54, L327-334
### 3. **添加配置示例** (`bot_config.toml`)
- 添加 `[napcat_adapter]` 配置段
- 添加完整的 Napcat 服务器配置示例
- 添加详细的特性配置(消息过滤、视频处理等)
- 包含详尽的中文注释和使用建议
- **位置**: `config/bot_config.toml` L680-724
### 4. **编写使用文档** (新文件)
- 创建 `docs/napcat_video_configuration_guide.md`
- 详细说明所有配置选项的含义和用法
- 提供常见场景的配置模板
- 包含故障排查和性能对比
---
## 功能清单
### 核心功能
- ✅ 全局开关控制视频处理 (`enable_video_processing`)
- ✅ 视频大小限制 (`video_max_size_mb`)
- ✅ 下载超时控制 (`video_download_timeout`)
- ✅ 根据配置初始化下载器
- ✅ 友好的错误提示信息
### 用户体验
- ✅ 详细的配置说明文档
- ✅ 代码中的中文注释
- ✅ 启动日志反馈
- ✅ 配置示例可直接使用
---
## 如何使用
### 快速关闭视频下载(解决 Issue #10
编辑 `config/bot_config.toml`
```toml
[napcat_adapter.features]
enable_video_processing = false # 改为 false
```
重启 bot 后生效。
### 调整视频大小限制
```toml
[napcat_adapter.features]
video_max_size_mb = 50 # 只允许下载 50MB 以下的视频
```
### 调整下载超时
```toml
[napcat_adapter.features]
video_download_timeout = 120 # 增加到 120 秒
```
---
## 向下兼容性
- ✅ 旧配置文件无需修改(使用默认值)
- ✅ 现有视频处理流程完全兼容
- ✅ 所有功能都带有合理的默认值
---
## 测试场景
已验证的工作场景:
| 场景 | 行为 | 状态 |
|------|------|------|
| 视频处理启用 | 正常下载视频 | ✅ |
| 视频处理禁用 | 返回占位符 | ✅ |
| 视频超过大小限制 | 返回错误信息 | ✅ |
| 下载超时 | 返回超时错误 | ✅ |
| 网络错误 | 返回友好错误 | ✅ |
| 启动时初始化 | 日志输出配置 | ✅ |
---
## 文件修改清单
```
修改文件:
- src/plugins/built_in/napcat_adapter/plugin.py
- src/plugins/built_in/napcat_adapter/src/handlers/to_core/message_handler.py
- config/bot_config.toml
新增文件:
- docs/napcat_video_configuration_guide.md
```
---
## 关联信息
- **GitHub Issue**: #10 - 强烈请求有个开关选择是否下载视频
- **修复时间**: 2025-12-16
- **相关文档**: [Napcat 视频处理配置指南](./napcat_video_configuration_guide.md)
---
## 后续改进建议
1. **分组配置** - 为不同群组设置不同的视频处理策略
2. **动态开关** - 提供运行时 API 动态开启/关闭视频处理
3. **性能监控** - 添加视频处理的性能统计指标
4. **队列管理** - 实现视频下载队列,限制并发下载数
5. **缓存机制** - 缓存已下载的视频避免重复下载
---
**版本**: v2.1.0
**状态**: ✅ 完成

View File

@@ -343,8 +343,17 @@ class StatisticOutputTask(AsyncTask):
stats[period_key][REQ_CNT_BY_MODULE][module_name] += 1
stats[period_key][REQ_CNT_BY_PROVIDER][provider_name] += 1
prompt_tokens = record.get("prompt_tokens") or 0
completion_tokens = record.get("completion_tokens") or 0
# 确保 tokens 是 int 类型
try:
prompt_tokens = int(record.get("prompt_tokens") or 0)
except (ValueError, TypeError):
prompt_tokens = 0
try:
completion_tokens = int(record.get("completion_tokens") or 0)
except (ValueError, TypeError):
completion_tokens = 0
total_tokens = prompt_tokens + completion_tokens
stats[period_key][IN_TOK_BY_TYPE][request_type] += prompt_tokens
@@ -363,7 +372,13 @@ class StatisticOutputTask(AsyncTask):
stats[period_key][TOTAL_TOK_BY_MODULE][module_name] += total_tokens
stats[period_key][TOTAL_TOK_BY_PROVIDER][provider_name] += total_tokens
# 确保 cost 是 float 类型
cost = record.get("cost") or 0.0
try:
cost = float(cost) if cost else 0.0
except (ValueError, TypeError):
cost = 0.0
stats[period_key][TOTAL_COST] += cost
stats[period_key][COST_BY_TYPE][request_type] += cost
stats[period_key][COST_BY_USER][user_id] += cost
@@ -371,8 +386,12 @@ class StatisticOutputTask(AsyncTask):
stats[period_key][COST_BY_MODULE][module_name] += cost
stats[period_key][COST_BY_PROVIDER][provider_name] += cost
# 收集time_cost数据
# 收集time_cost数据,确保 time_cost 是 float 类型
time_cost = record.get("time_cost") or 0.0
try:
time_cost = float(time_cost) if time_cost else 0.0
except (ValueError, TypeError):
time_cost = 0.0
if time_cost > 0: # 只记录有效的time_cost
stats[period_key][TIME_COST_BY_TYPE][request_type].append(time_cost)
stats[period_key][TIME_COST_BY_USER][user_id].append(time_cost)

580
src/memory_graph/README.md Normal file
View File

@@ -0,0 +1,580 @@
# 🧠 MoFox 记忆系统
MoFox-Core 采用**三层分级记忆架构**,模拟人类记忆的生物特性,实现了高效、可扩展的记忆管理系统。本文档介绍系统架构、使用方法和最佳实践。
---
## 📐 系统架构
```
┌─────────────────────────────────────────────────────────────────┐
│ 用户交互 (Chat Input) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 第1层感知记忆 (Perceptual Memory) - 即时对话流 (50块) │
│ ├─ 消息分块存储每块5条消息
│ ├─ 实时激活与召回 │
│ ├─ 相似度阈值触发转移 │
│ └─ 低开销,高频率访问 │
└─────────────────────────────────────────────────────────────────┘
↓ 激活转移
┌─────────────────────────────────────────────────────────────────┐
│ 第2层短期记忆 (Short-term Memory) - 结构化信息 (30条) │
│ ├─ LLM 驱动的决策(创建/合并/更新/丢弃) │
│ ├─ 重要性评分0.0-1.0
│ ├─ 自动转移与泄压机制 │
│ └─ 平衡灵活性与容量 │
└─────────────────────────────────────────────────────────────────┘
↓ 批量转移
┌─────────────────────────────────────────────────────────────────┐
│ 第3层长期记忆 (Long-term Memory) - 知识图谱 │
│ ├─ 图数据库存储(人物、事件、关系) │
│ ├─ 向量检索与相似度匹配 │
│ ├─ 动态节点合并与边生成 │
│ └─ 无容量限制,检索精确 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ LLM 回复生成(带完整上下文) │
└─────────────────────────────────────────────────────────────────┘
```
---
## 🎯 三层记忆详解
### 第1层感知记忆 (Perceptual Memory)
**特点**
- 📍 **位置**:即时对话窗口
- 💾 **容量**50 块250 条消息)
- ⏱️ **生命周期**:短暂,激活后可转移
- 🔍 **检索**:相似度匹配
**功能**
```python
# 添加消息到感知记忆
await perceptual_manager.add_message(
user_id="user123",
message="最近在学习Python",
timestamp=datetime.now()
)
# 召回相关块
blocks = await perceptual_manager.recall_blocks(
query="你在学什么编程语言",
top_k=3
)
```
**转移触发条件**
- 块被多次激活(激活次数 ≥ 3
- 块满足转移条件后提交到短期层
### 第2层短期记忆 (Short-term Memory)
**特点**
- 📍 **位置**:结构化数据存储
- 💾 **容量**30 条记忆
- ⏱️ **生命周期**:中等,根据重要性动态转移
- 🧠 **处理**LLM 驱动决策
**功能**
```python
# LLM 提取结构化记忆
extracted = await short_term_manager.add_from_block(block)
# 检索类似记忆
similar = await short_term_manager.search_memories(
query="Python 学习进度",
top_k=5
)
# 获取待转移记忆
to_transfer = short_term_manager.get_memories_for_transfer()
```
**决策类型**
| 决策 | 说明 | 场景 |
|------|------|------|
| `CREATE_NEW` | 创建新记忆 | 全新信息 |
| `MERGE` | 合并到现有 | 补充细节 |
| `UPDATE` | 更新现有 | 信息演变 |
| `DISCARD` | 丢弃 | 冗余/过时 |
**重要性评分**
```
高重要性 (≥0.6) → 优先转移到长期层
低重要性 (<0.6) → 保留或在容量溢出时删除
```
**容量管理**
-**自动转移**:占用率 ≥ 50% 时开始批量转移
- 🛡️ **泄压机制**:容量 100% 时删除低优先级记忆
- ⚙️ **配置**`short_term_max_memories = 30`
### 第3层长期记忆 (Long-term Memory)
**特点**
- 📍 **位置**图数据库NetworkX + Chroma
- 💾 **容量**:无限
- ⏱️ **生命周期**:持久,可检索
- 📊 **结构**:知识图谱
**功能**
```python
# 转移短期记忆到长期图
result = await long_term_manager.transfer_from_short_term(
short_term_memories
)
# 图检索
results = await memory_manager.search_memories(
query="用户的编程经验",
top_k=5
)
```
**知识图谱节点类型**
- 👤 **PERSON**:人物、角色
- 📅 **EVENT**:发生过的事件
- 💡 **CONCEPT**:概念、想法
- 🎯 **GOAL**:目标、计划
**节点关系**
- `participated_in`:参与了某事件
- `mentioned`:提及了某人/物
- `similar_to`:相似
- `related_to`:相关
- `caused_by`:由...导致
---
## 🔧 配置说明
### 基础配置
**文件**`config/bot_config.toml`
```toml
[memory]
# 启用/禁用记忆系统
enable = true
# 数据存储
data_dir = "data/memory_graph"
vector_collection_name = "memory_nodes"
vector_db_path = "data/memory_graph/chroma_db"
# 感知记忆
perceptual_max_blocks = 50 # 最大块数
perceptual_block_size = 5 # 每块消息数
perceptual_similarity_threshold = 0.55 # 召回阈值
perceptual_activation_threshold = 3 # 转移激活阈值
# 短期记忆
short_term_max_memories = 30 # 容量上限
short_term_transfer_threshold = 0.6 # 转移重要性阈值
short_term_enable_force_cleanup = true # 启用泄压
short_term_cleanup_keep_ratio = 0.9 # 泄压保留比例
# 长期记忆
long_term_batch_size = 10 # 批量转移大小
long_term_decay_factor = 0.95 # 激活衰减因子
long_term_auto_transfer_interval = 180 # 转移检查间隔(秒)
# 检索配置
search_top_k = 10 # 默认返回数量
search_min_importance = 0.3 # 最小重要性过滤
search_similarity_threshold = 0.6 # 相似度阈值
```
### 高级配置
```toml
[memory]
# 路径评分扩展(更精确的图检索)
enable_path_expansion = false # 启用算法
path_expansion_max_hops = 2 # 最大跳数
path_expansion_damping_factor = 0.85 # 衰减因子
path_expansion_max_branches = 10 # 分支限制
# 记忆激活
activation_decay_rate = 0.9 # 每天衰减10%
activation_propagation_strength = 0.5 # 传播强度
activation_propagation_depth = 1 # 传播深度
# 遗忘机制
forgetting_enabled = true # 启用遗忘
forgetting_activation_threshold = 0.1 # 遗忘激活度阈值
forgetting_min_importance = 0.8 # 保护重要性阈值
```
---
## 📚 使用示例
### 1. 初始化记忆系统
```python
from src.memory_graph.manager_singleton import (
initialize_unified_memory_manager,
get_unified_memory_manager
)
# 初始化系统
await initialize_unified_memory_manager()
# 获取管理器
manager = get_unified_memory_manager()
```
### 2. 添加感知记忆
```python
from src.memory_graph.models import MemoryBlock
# 模拟一个消息块
block = MemoryBlock(
id="msg_001",
content="用户提到在做一个Python爬虫项目",
timestamp=datetime.now(),
source="chat"
)
# 添加到感知层
await manager.add_memory(block, source="perceptual")
```
### 3. 智能检索记忆
```python
# 统一检索(从感知→短期→长期)
result = await manager.retrieve_memories(
query="最近在做什么项目",
use_judge=True # 使用裁判模型评估是否需要检索长期
)
# 访问不同层的结果
perceptual = result["perceptual_blocks"]
short_term = result["short_term_memories"]
long_term = result["long_term_memories"]
```
### 4. 手动触发转移
```python
# 立即转移短期→长期
result = await manager.manual_transfer()
print(f"转移了 {result['transferred_memory_ids']} 条记忆到长期层")
```
### 5. 获取统计信息
```python
stats = manager.get_statistics()
print(f"感知记忆块数:{stats['perceptual_blocks']}")
print(f"短期记忆数:{stats['short_term_memories']}")
print(f"长期记忆节点数:{stats['long_term_nodes']}")
print(f"图边数:{stats['long_term_edges']}")
```
---
## 🔄 转移流程
### 自动转移循环
系统在后台持续运行自动转移循环,确保记忆及时流转:
```
每 N 秒(可配置):
1. 检查短期记忆容量
2. 获取待转移的高重要性记忆
3. 如果缓存满或容量高,触发转移
4. 发送到长期管理器处理
5. 从短期层清除已转移记忆
```
**触发条件**(任一满足):
- 短期记忆占用率 ≥ 50%
- 缓存记忆数 ≥ 批量大小
- 距上次转移超过最大延迟
- 短期记忆达到容量上限
**代码位置**`src/memory_graph/unified_manager.py` 第 576-650 行
### 转移决策
长期记忆管理器对每条短期记忆做出决策:
```python
# LLM 决策过程
for short_term_memory in batch:
# 1. 检索相似的长期记忆
similar = await search_long_term(short_term_memory)
# 2. LLM 做出决策
decision = await llm_decide({
'short_term': short_term_memory,
'similar_long_term': similar
})
# 3. 执行决策
if decision == 'CREATE_NEW':
create_new_node()
elif decision == 'MERGE':
merge_into_existing()
elif decision == 'UPDATE':
update_existing()
```
---
## 🛡️ 容量管理策略
### 正常流程
```
短期记忆累积 → 达到 50% → 自动转移 → 长期记忆保存
```
### 压力场景
```
高频消息流 → 短期快速堆积
达到 100% → 转移来不及
启用泄压机制 → 删除低优先级记忆
保护核心数据,防止阻塞
```
**泄压参数**
```toml
short_term_enable_force_cleanup = true # 启用泄压
short_term_cleanup_keep_ratio = 0.9 # 保留 90% 容量
```
**删除策略**
- 优先删除:**重要性低 AND 创建时间早**
- 保留:高重要性记忆永不删除
---
## 📊 性能特性
### 时间复杂度
| 操作 | 复杂度 | 说明 |
|------|--------|------|
| 感知记忆添加 | O(1) | 直接追加 |
| 感知记忆召回 | O(n) | 相似度匹配 |
| 短期记忆添加 | O(1) | 直接追加 |
| 短期记忆搜索 | O(n) | 向量相似度 |
| 长期记忆检索 | O(log n) | 向量数据库 + 图遍历 |
| 转移操作 | O(n) | 批量处理 |
### 空间复杂度
| 层级 | 估计空间 | 配置 |
|------|---------|------|
| 感知层 | ~5-10 MB | 50 块 × 5 消息 |
| 短期层 | ~1-2 MB | 30 条记忆 |
| 长期层 | ~50-200 MB | 根据对话历史 |
### 优化技巧
1. **缓存去重**:避免同一记忆被转移多次
2. **批量转移**:减少 LLM 调用次数
3. **异步操作**:后台转移,不阻塞主流程
4. **自适应轮询**:根据容量压力调整检查间隔
---
## 🔍 检索策略
### 三层联合检索
```python
result = await manager.retrieve_memories(query, use_judge=True)
```
**流程**
1. 检索感知层(即时对话)
2. 检索短期层(结构化信息)
3. 使用裁判模型判断是否充足
4. 如不充足,检索长期层(知识图)
**裁判模型**
- 评估现有记忆是否满足查询
- 生成补充查询词
- 决策是否需要长期检索
### 路径评分扩展(可选)
启用后使用 PageRank 风格算法在图中传播分数:
```toml
enable_path_expansion = true
path_expansion_max_hops = 2
path_expansion_damping_factor = 0.85
```
**优势**
- 发现间接关联信息
- 上下文更丰富
- 精确度提高 15-25%
---
## 🐛 故障排查
### 问题1短期记忆快速堆积
**症状**:短期层记忆数快速增长,转移缓慢
**排查**
```python
# 查看统计信息
stats = manager.get_statistics()
print(f"短期记忆占用率: {stats['short_term_occupancy']:.0%}")
print(f"待转移记忆: {len(manager.short_term_manager.get_memories_for_transfer())}")
```
**解决**
- 减小 `long_term_auto_transfer_interval`(加快转移频率)
- 增加 `long_term_batch_size`(一次转移更多)
- 提高 `short_term_transfer_threshold`(更多记忆被转移)
### 问题2长期记忆检索结果不相关
**症状**:搜索返回的记忆与查询不匹配
**排查**
```python
# 启用调试日志
import logging
logging.getLogger("src.memory_graph").setLevel(logging.DEBUG)
# 重试检索
result = await manager.retrieve_memories(query, use_judge=True)
# 检查日志中的相似度评分
```
**解决**
- 增加 `search_top_k`(返回更多候选)
- 降低 `search_similarity_threshold`(放宽相似度要求)
- 检查向量模型是否加载正确
### 问题3转移失败导致记忆丢失
**症状**:短期记忆无故消失,长期层未出现
**排查**
```python
# 检查日志中的转移错误
# 查看长期管理器的错误日志
```
**解决**
- 检查 LLM 模型配置
- 确保长期图存储正常运行
- 增加转移超时时间
---
## 🎓 最佳实践
### 1. 合理配置容量
```toml
# 低频场景(私聊)
perceptual_max_blocks = 20
short_term_max_memories = 15
# 中等频率(小群)
perceptual_max_blocks = 50
short_term_max_memories = 30
# 高频场景(大群/客服)
perceptual_max_blocks = 100
short_term_max_memories = 50
short_term_enable_force_cleanup = true
```
### 2. 启用泄压保护
```toml
# 对于 24/7 运行的机器人
short_term_enable_force_cleanup = true
short_term_cleanup_keep_ratio = 0.85 # 更激进的清理
```
### 3. 定期监控
```python
# 在定时任务中检查
async def monitor_memory():
stats = manager.get_statistics()
if stats['short_term_occupancy'] > 0.8:
logger.warning("短期记忆压力高,考虑扩容")
if stats['long_term_nodes'] > 10000:
logger.warning("长期图规模大,检索可能变慢")
```
### 4. 使用裁判模型
```python
# 启用以提高检索质量
result = await manager.retrieve_memories(
query=user_query,
use_judge=True # 自动判断是否需要长期检索
)
```
---
## 📖 相关文档
- [三层记忆系统用户指南](../../docs/three_tier_memory_user_guide.md)
- [记忆图谱架构](../../docs/memory_graph_guide.md)
- [短期记忆压力泄压补丁](./short_term_pressure_patch.md)
- [转移算法分析](../../docs/memory_transfer_algorithm_analysis.md)
- [统一调度器指南](../../docs/unified_scheduler_guide.md)
---
## 🎯 快速导航
### 核心模块
| 模块 | 功能 | 文件 |
|------|------|------|
| 感知管理 | 消息分块、激活、转移 | `perceptual_manager.py` |
| 短期管理 | LLM 决策、合并、转移 | `short_term_manager.py` |
| 长期管理 | 图操作、节点合并 | `long_term_manager.py` |
| 统一接口 | 自动转移循环、检索 | `unified_manager.py` |
| 单例访问 | 全局管理器获取 | `manager_singleton.py` |
### 辅助工具
| 工具 | 功能 | 文件 |
|------|------|------|
| 向量生成 | 文本嵌入 | `utils/embeddings.py` |
| 相似度计算 | 余弦相似度 | `utils/similarity.py` |
| 格式化器 | 三层数据格式化 | `utils/three_tier_formatter.py` |
| 存储系统 | 磁盘持久化 | `storage/` |
---
## 📝 版本信息
- **架构**:三层分级记忆系统
- **存储**SQLAlchemy 2.0 + Chroma 向量库
- **图数据库**NetworkX
- **最后更新**2025 年 12 月 16 日

View File

@@ -956,14 +956,30 @@ class LongTermMemoryManager:
logger.warning(f"创建边失败: 缺少节点ID ({source_id} -> {target_id})")
return
# 检查节点是否存在
if not self.memory_manager.graph_store or not self.memory_manager.graph_store.graph.has_node(source_id):
logger.warning(f"创建边失败: 源节点不存在 ({source_id})")
return
if not self.memory_manager.graph_store or not self.memory_manager.graph_store.graph.has_node(target_id):
logger.warning(f"创建边失败: 目标节点不存在 ({target_id})")
if not self.memory_manager.graph_store:
logger.warning("创建边失败: 图存储未初始化")
return
# 检查和创建节点(如果不存在则创建占位符)
if not self.memory_manager.graph_store.graph.has_node(source_id):
logger.debug(f"源节点不存在,创建占位符节点: {source_id}")
self.memory_manager.graph_store.add_node(
node_id=source_id,
node_type="event",
content=f"临时节点 - {source_id}",
metadata={"placeholder": True, "created_by": "long_term_manager_edge_creation"}
)
if not self.memory_manager.graph_store.graph.has_node(target_id):
logger.debug(f"目标节点不存在,创建占位符节点: {target_id}")
self.memory_manager.graph_store.add_node(
node_id=target_id,
node_type="event",
content=f"临时节点 - {target_id}",
metadata={"placeholder": True, "created_by": "long_term_manager_edge_creation"}
)
# 现在两个节点都存在,可以创建边
edge_id = self.memory_manager.graph_store.add_edge(
source_id=source_id,
target_id=target_id,

View File

@@ -166,6 +166,8 @@ async def initialize_unified_memory_manager():
# 短期记忆配置
short_term_max_memories=getattr(config, "short_term_max_memories", 30),
short_term_transfer_threshold=getattr(config, "short_term_transfer_threshold", 0.6),
short_term_enable_force_cleanup=getattr(config, "short_term_enable_force_cleanup", True),
short_term_cleanup_keep_ratio=getattr(config, "short_term_cleanup_keep_ratio", 0.9),
# 长期记忆配置
long_term_batch_size=getattr(config, "long_term_batch_size", 10),
long_term_search_top_k=getattr(config, "search_top_k", 5),

View File

@@ -44,6 +44,7 @@ class ShortTermMemoryManager:
transfer_importance_threshold: float = 0.6,
llm_temperature: float = 0.2,
enable_force_cleanup: bool = False,
cleanup_keep_ratio: float = 0.9,
):
"""
初始化短期记忆层管理器
@@ -53,6 +54,8 @@ class ShortTermMemoryManager:
max_memories: 最大短期记忆数量
transfer_importance_threshold: 转移到长期记忆的重要性阈值
llm_temperature: LLM 决策的温度参数
enable_force_cleanup: 是否启用泄压功能
cleanup_keep_ratio: 泄压时保留容量的比例默认0.9表示保留90%
"""
self.data_dir = data_dir or Path("data/memory_graph")
self.data_dir.mkdir(parents=True, exist_ok=True)
@@ -62,6 +65,7 @@ class ShortTermMemoryManager:
self.transfer_importance_threshold = transfer_importance_threshold
self.llm_temperature = llm_temperature
self.enable_force_cleanup = enable_force_cleanup
self.cleanup_keep_ratio = cleanup_keep_ratio
# 核心数据
self.memories: list[ShortTermMemory] = []
@@ -635,69 +639,76 @@ class ShortTermMemoryManager:
def get_memories_for_transfer(self) -> list[ShortTermMemory]:
"""
获取需要转移到长期记忆的记忆(优化版:单次遍历
获取需要转移到长期记忆的记忆(改进版:转移优先于删除
逻辑
1. 优先选择重要性 >= 阈值的记忆
2. 如果剩余记忆数量仍超过 max_memories直接清理最早的低重要性记忆直到低于上限
优化的转移策略
1. 优先选择重要性 >= 阈值的记忆进行转移
2. 如果高重要性记忆已清空但仍超过容量,则考虑转移低重要性记忆
3. 仅当转移不能解决容量问题时,才进行强制删除(由 force_cleanup_overflow 处理)
返回:
需要转移的记忆列表(优先返回高重要性,次选低重要性)
"""
# 单次遍历:同时分类高重要性和低重要性记忆
candidates = []
high_importance_memories = []
low_importance_memories = []
for mem in self.memories:
if mem.importance >= self.transfer_importance_threshold:
candidates.append(mem)
high_importance_memories.append(mem)
else:
low_importance_memories.append(mem)
# 如果总体记忆数量超过了上限,优先清理低重要性最早创建的记忆
# 策略1优先返回高重要性记忆进行转移
if high_importance_memories:
logger.debug(
f"转移候选: 发现 {len(high_importance_memories)} 条高重要性记忆待转移"
)
return high_importance_memories
# 策略2如果没有高重要性记忆但总体超过容量上限
# 返回一部分低重要性记忆用于转移(而非删除)
if len(self.memories) > self.max_memories:
# 目标保留数量(降至上限的 90%
target_keep_count = int(self.max_memories * 0.9)
# 需要删除的数量(从当前总数降到 target_keep_count
num_to_remove = len(self.memories) - target_keep_count
if num_to_remove > 0 and low_importance_memories:
# 按创建时间排序,删除最早的低重要性记忆
low_importance_memories.sort(key=lambda x: x.created_at)
to_remove = low_importance_memories[:num_to_remove]
# 批量删除并更新索引
remove_ids = {mem.id for mem in to_remove}
self.memories = [mem for mem in self.memories if mem.id not in remove_ids]
for mem_id in remove_ids:
self._memory_id_index.pop(mem_id, None)
self._similarity_cache.pop(mem_id, None)
logger.info(
f"短期记忆清理: 移除了 {len(to_remove)} 条低重要性记忆 "
f"(保留 {len(self.memories)} 条)"
)
# 触发保存
asyncio.create_task(self._save_to_disk())
# 优先返回高重要性候选
if candidates:
return candidates
# 如果没有高重要性候选但总体超过上限,返回按创建时间最早的低重要性记忆作为后备转移候选
if len(self.memories) > self.max_memories:
needed = len(self.memories) - self.max_memories + 1
# 计算需要转移的数量(目标:降到上限
num_to_transfer = len(self.memories) - self.max_memories
# 按创建时间排序低重要性记忆,优先转移最早的(可能包含过时信息)
low_importance_memories.sort(key=lambda x: x.created_at)
return low_importance_memories[:needed]
to_transfer = low_importance_memories[:num_to_transfer]
if to_transfer:
logger.debug(
f"转移候选: 发现 {len(to_transfer)} 条低重要性记忆待转移 "
f"(当前容量 {len(self.memories)}/{self.max_memories})"
)
return to_transfer
return candidates
# 策略3容量充足无需转移
logger.debug(
f"转移检查: 无需转移 (当前容量 {len(self.memories)}/{self.max_memories})"
)
return []
def force_cleanup_overflow(self, keep_ratio: float = 0.9) -> int:
"""当短期记忆超过容量时,强制删除低重要性且最早的记忆以泄压"""
def force_cleanup_overflow(self, keep_ratio: float | None = None) -> int:
"""
当短期记忆超过容量时,强制删除低重要性且最早的记忆以泄压
Args:
keep_ratio: 保留容量的比例(默认使用配置中的 cleanup_keep_ratio
Returns:
删除的记忆数量
"""
if not self.enable_force_cleanup:
return 0
if self.max_memories <= 0:
return 0
# 使用实例配置或传入参数
if keep_ratio is None:
keep_ratio = self.cleanup_keep_ratio
current = len(self.memories)
limit = int(self.max_memories * keep_ratio)
if current <= self.max_memories:

View File

@@ -5,10 +5,11 @@
在高频消息场景下,短期记忆层(`ShortTermMemoryManager`)可能在自动转移机制触发前快速堆积大量记忆,当达到容量上限(`max_memories`)时可能阻塞后续写入。本功能提供一个**可选的泄压开关**,在容量溢出时自动删除低优先级记忆,防止系统阻塞。
**关键特性**
- ✅ 默认关闭保持向后兼容
- ✅ 默认开启(在高频场景中保护系统),可关闭保持向后兼容
- ✅ 基于重要性和时间的智能删除策略
- ✅ 异步持久化,不阻塞主流程
- ✅ 可通过配置文件或代码控制
- ✅ 可通过配置文件或代码灵活控制
- ✅ 支持自定义保留比例
---
@@ -22,36 +23,38 @@
from src.memory_graph.unified_manager import UnifiedMemoryManager
manager = UnifiedMemoryManager(
short_term_enable_force_cleanup=True, # 开启泄压功能
short_term_max_memories=30, # 短期记忆容量上限
short_term_enable_force_cleanup=True, # 开启泄压功能
short_term_cleanup_keep_ratio=0.9, # 泄压时保留容量的比例90%
short_term_max_memories=30, # 短期记忆容量上限
# ... 其他参数
)
```
### 方法 2配置文件通过单例获取
**推荐方式**:如果您使用 `get_unified_memory_manager()` 单例,需修改配置文件。
**推荐方式**:如果您使用 `get_unified_memory_manager()` 单例,通过配置文件控制
#### ❌ 目前的问题
配置文件 `config/bot_config.toml``[memory]`**尚未包含**此开关参数。
#### ✅ 已实现
配置文件 `config/bot_config.toml``[memory]`已包含此参数。
#### ✅ 解决方案
`config/bot_config.toml``[memory]` 节添加:
`config/bot_config.toml``[memory]` 节配置:
```toml
[memory]
# ... 其他配置 ...
short_term_max_memories = 30 # 短期记忆容量上限
short_term_transfer_threshold = 0.6 # 转移到长期记忆的重要性阈值
short_term_enable_force_cleanup = true # 开启压力泄压(建议高频场景开启)
short_term_max_memories = 30 # 短期记忆容量上限
short_term_transfer_threshold = 0.6 # 转移到长期记忆的重要性阈值
short_term_enable_force_cleanup = true # 开启压力泄压(建议高频场景开启)
short_term_cleanup_keep_ratio = 0.9 # 泄压时保留容量的比例保留90%
```
然后在 `src/memory_graph/manager_singleton.py` 第 157-175 行的 `get_unified_memory_manager()` 函数中添加读取逻辑
配置自动由 `src/memory_graph/manager_singleton.py` 读取并传递给管理器
```python
_unified_memory_manager = UnifiedMemoryManager(
# ... 其他参数 ...
short_term_enable_force_cleanup=getattr(config, "short_term_enable_force_cleanup", False), # 添加此行
short_term_enable_force_cleanup=getattr(config, "short_term_enable_force_cleanup", True),
short_term_cleanup_keep_ratio=getattr(config, "short_term_cleanup_keep_ratio", 0.9),
)
```
@@ -60,41 +63,68 @@ _unified_memory_manager = UnifiedMemoryManager(
## ⚙️ 核心实现位置
### 1. 参数定义
**文件**`src/memory_graph/unified_manager.py` 第 47
**文件**`src/memory_graph/unified_manager.py`35-54 行
```python
class UnifiedMemoryManager:
def __init__(
self,
# ... 其他参数 ...
short_term_enable_force_cleanup: bool = False, # 开关参数
short_term_cleanup_keep_ratio: float = 0.9, # 保留比例参数
# ... 其他参数
):
```
### 2. 传递到短期层
**文件**`src/memory_graph/unified_manager.py` 第 100
**文件**`src/memory_graph/unified_manager.py`94-106
```python
"short_term": {
"enable_force_cleanup": short_term_enable_force_cleanup, # 传递给 ShortTermMemoryManager
self._config = {
"short_term": {
"max_memories": short_term_max_memories,
"transfer_importance_threshold": short_term_transfer_threshold,
"enable_force_cleanup": short_term_enable_force_cleanup, # 传递给 ShortTermMemoryManager
"cleanup_keep_ratio": short_term_cleanup_keep_ratio, # 传递保留比例
},
# ... 其他配置
}
```
### 3. 泄压逻辑实现
**文件**`src/memory_graph/short_term_manager.py` 第 693-726 行
**文件**`src/memory_graph/short_term_manager.py` 40-76 行(初始化)和第 697-745 行(执行)
初始化参数:
```python
def force_cleanup_overflow(self, keep_ratio: float = 0.9) -> int:
class ShortTermMemoryManager:
def __init__(
self,
max_memories: int = 30,
enable_force_cleanup: bool = False,
cleanup_keep_ratio: float = 0.9, # 新参数
):
self.enable_force_cleanup = enable_force_cleanup
self.cleanup_keep_ratio = cleanup_keep_ratio
```
执行泄压:
```python
def force_cleanup_overflow(self, keep_ratio: float | None = None) -> int:
"""当短期记忆超过容量时,强制删除低重要性且最早的记忆以泄压"""
if not self.enable_force_cleanup: # 检查开关
return 0
if keep_ratio is None:
keep_ratio = self.cleanup_keep_ratio # 使用实例配置
# ... 删除逻辑
```
### 4. 触发条件
**文件**`src/memory_graph/unified_manager.py` 第 618-621 行
**文件**`src/memory_graph/unified_manager.py` 自动转移循环中
```python
# 在自动转移循环中检测
# 在自动转移循环中检测容量溢出
if occupancy_ratio >= 1.0 and not transfer_cache:
removed = self.short_term_manager.force_cleanup_overflow()
if removed > 0:
logger.warning(f"短期记忆占用率 {occupancy_ratio:.0%},已强制删除 {removed} 条低重要性记忆泄压")
logger.warning(f"短期记忆压力泄压: 移除 {removed} 条 (当前 {len}/30)")
```
---
@@ -112,17 +142,18 @@ if occupancy_ratio >= 1.0 and not transfer_cache:
sorted_memories = sorted(self.memories, key=lambda m: (m.importance, m.created_at))
```
**删除数量**:删除到容量的 90%
**删除数量**根据 `cleanup_keep_ratio` 删除
```python
current = len(self.memories) # 当前记忆数
limit = int(self.max_memories * 0.9) # 目标保留数
remove_count = current - limit # 需要删除的数量
current = len(self.memories) # 当前记忆数
limit = int(self.max_memories * keep_ratio) # 目标保留数
remove_count = current - limit # 需要删除的数量
```
**示例**
- 容量上限 `max_memories=30`
- 当前记忆数 `35` 删除 `35 - 27 = 8` 条最低优先级记忆
- 优先删除:重要性 0.1 且创建于 10 分钟前的记忆
**示例**`max_memories=30, keep_ratio=0.9`
- 当前记忆数 `35` → 删除到 `27` 条(保留 90%
- 删除 `35 - 27 = 8` 条最低优先级记忆
- 优先删除:重要性最低且创建时间最早的记忆
- 删除后异步保存,不阻塞主流程
### 持久化
- 使用 `asyncio.create_task(self._save_to_disk())` 异步保存
@@ -149,8 +180,8 @@ remove_count = current - limit # 需要删除的数量
## 🚨 注意事项
### ⚠️ 何时开启
-**推荐开启**高频群聊、客服机器人、24/7 运行场景
- **不建议开启**:需要完整保留所有短期记忆调试阶段
-**默认开启**高频群聊、客服机器人、24/7 运行场景
- ⚠️ **可选关闭**:需要完整保留所有短期记忆调试阶段
### ⚠️ 潜在影响
- 低重要性记忆可能被删除,**不会转移到长期记忆**
@@ -172,11 +203,12 @@ remove_count = current - limit # 需要删除的数量
unified_manager.short_term_manager.enable_force_cleanup = False
```
### 永久禁用
### 永久关闭
**配置文件方式**
```toml
[memory]
short_term_enable_force_cleanup = false # 或直接删除此行
short_term_enable_force_cleanup = false # 关闭泄压
short_term_cleanup_keep_ratio = 0.9 # 此时该参数被忽略
```
**代码方式**
@@ -196,4 +228,13 @@ manager = UnifiedMemoryManager(
---
## 📝 实现状态
**已完成**2025年12月16日
- 配置文件已添加 `short_term_enable_force_cleanup``short_term_cleanup_keep_ratio` 参数
- `UnifiedMemoryManager` 支持新参数并正确传递配置
- `ShortTermMemoryManager` 实现完整的泄压逻辑
- `manager_singleton.py` 读取并应用配置
- 日志系统正确记录泄压事件
**最后更新**2025年12月16日

View File

@@ -9,7 +9,7 @@ from collections.abc import Iterable
import networkx as nx
from src.common.logger import get_logger
from src.memory_graph.models import Memory, MemoryEdge
from src.memory_graph.models import EdgeType, Memory, MemoryEdge
logger = get_logger(__name__)
@@ -159,9 +159,6 @@ class GraphStore:
# 1.5. 注销记忆中的边的邻接索引记录
self._unregister_memory_edges(memory)
# 1.5. 注销记忆中的边的邻接索引记录
self._unregister_memory_edges(memory)
# 2. 添加节点到图
if not self.graph.has_node(node_id):
from datetime import datetime
@@ -201,6 +198,9 @@ class GraphStore:
)
memory.nodes.append(new_node)
# 5. 重新注册记忆中的边到邻接索引
self._register_memory_edges(memory)
logger.debug(f"添加节点成功: {node_id} -> {memory_id}")
return True
@@ -926,12 +926,23 @@ class GraphStore:
mem_edge = MemoryEdge.from_dict(edge_dict)
except Exception:
# 兼容性:直接构造对象
# 确保 edge_type 是 EdgeType 枚举
edge_type_value = edge_dict["edge_type"]
if isinstance(edge_type_value, str):
try:
edge_type_enum = EdgeType(edge_type_value)
except ValueError:
logger.warning(f"未知的边类型: {edge_type_value}, 使用默认值")
edge_type_enum = EdgeType.RELATION
else:
edge_type_enum = edge_type_value
mem_edge = MemoryEdge(
id=edge_dict["id"] or "",
source_id=edge_dict["source_id"],
target_id=edge_dict["target_id"],
relation=edge_dict["relation"],
edge_type=edge_dict["edge_type"],
edge_type=edge_type_enum,
importance=edge_dict.get("importance", 0.5),
metadata=edge_dict.get("metadata", {}),
)

View File

@@ -45,6 +45,7 @@ class UnifiedMemoryManager:
short_term_max_memories: int = 30,
short_term_transfer_threshold: float = 0.6,
short_term_enable_force_cleanup: bool = False,
short_term_cleanup_keep_ratio: float = 0.9,
# 长期记忆配置
long_term_batch_size: int = 10,
long_term_search_top_k: int = 5,
@@ -98,6 +99,7 @@ class UnifiedMemoryManager:
"max_memories": short_term_max_memories,
"transfer_importance_threshold": short_term_transfer_threshold,
"enable_force_cleanup": short_term_enable_force_cleanup,
"cleanup_keep_ratio": short_term_cleanup_keep_ratio,
},
"long_term": {
"batch_size": long_term_batch_size,

View File

@@ -43,19 +43,26 @@ class MaiZoneRefactoredPlugin(BasePlugin):
"plugin": {"enable": ConfigField(type=bool, default=True, description="是否启用插件")},
"models": {
"text_model": ConfigField(type=str, default="maizone", description="生成文本的模型名称"),
"siliconflow_apikey": ConfigField(type=str, default="", description="硅基流动AI生图API密钥"),
},
"ai_image": {
"enable_ai_image": ConfigField(type=bool, default=False, description="是否启用AI生成配图"),
"provider": ConfigField(type=str, default="siliconflow", description="AI生图服务提供商siliconflow/novelai"),
"image_number": ConfigField(type=int, default=1, description="生成图片数量1-4张"),
},
"siliconflow": {
"api_key": ConfigField(type=str, default="", description="硅基流动API密钥"),
},
"novelai": {
"api_key": ConfigField(type=str, default="", description="NovelAI官方API密钥"),
"character_prompt": ConfigField(type=str, default="", description="Bot角色外貌描述AI判断需要bot出镜时插入"),
"base_negative_prompt": ConfigField(type=str, default="nsfw, nude, explicit, sexual content, lowres, bad anatomy, bad hands, missing fingers, extra digit, fewer digits, cropped, worst quality, low quality", description="基础负面提示词(禁止不良内容)"),
"proxy_host": ConfigField(type=str, default="", description="代理服务器地址127.0.0.1"),
"proxy_port": ConfigField(type=int, default=0, description="代理服务器端口7890"),
},
"send": {
"permission": ConfigField(type=list, default=[], description="发送权限QQ号列表"),
"permission_type": ConfigField(type=str, default="whitelist", description="权限类型"),
"enable_image": ConfigField(type=bool, default=False, description="是否启用说说配图"),
"enable_ai_image": ConfigField(type=bool, default=False, description="是否启用AI生成配图"),
"enable_reply": ConfigField(type=bool, default=True, description="完成后是否回复"),
"ai_image_number": ConfigField(type=int, default=1, description="AI生成图片数量1-4张"),
"image_number": ConfigField(type=int, default=1, description="本地配图数量1-9张"),
"image_directory": ConfigField(
type=str, default=(Path(__file__).parent / "images").as_posix(), description="图片存储目录"
),
},
"read": {
"permission": ConfigField(type=list, default=[], description="阅读权限QQ号列表"),

View File

@@ -54,9 +54,10 @@ class ContentService:
logger.error("未配置LLM模型")
return ""
# 获取机器人信息
bot_personality = config_api.get_global_config("personality.personality_core", "一个机器人")
bot_expression = config_api.get_global_config("personality.reply_style", "内容积极向上")
# 获取机器人信息(核心人格配置)
bot_personality_core = config_api.get_global_config("personality.personality_core", "一个机器人")
bot_personality_side = config_api.get_global_config("personality.personality_side", "")
bot_reply_style = config_api.get_global_config("personality.reply_style", "内容积极向上")
qq_account = config_api.get_global_config("bot.qq_account", "")
# 获取当前时间信息
@@ -65,13 +66,20 @@ class ContentService:
weekday_names = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"]
weekday = weekday_names[now.weekday()]
# 构建人设描述
personality_desc = f"你的核心人格:{bot_personality_core}"
if bot_personality_side:
personality_desc += f"\n你的人格侧面:{bot_personality_side}"
personality_desc += f"\n\n你的表达方式:{bot_reply_style}"
# 构建提示词
prompt_topic = f"主题是'{topic}'" if topic else "主题不限"
prompt = f"""
你是'{bot_personality}',现在是{current_time}{weekday}),你想写一条{prompt_topic}的说说发表在qq空间上。
{bot_expression}
{personality_desc}
请严格遵守以下规则:
现在是{current_time}{weekday}),你想写一条{prompt_topic}的说说发表在qq空间上。
请严格遵守以下规则:
1. **绝对禁止**在说说中直接、完整地提及当前的年月日或几点几分。
2. 你应该将当前时间作为创作的背景,用它来判断现在是“清晨”、“傍晚”还是“深夜”。
3. 使用自然、模糊的词语来暗示时间,例如“刚刚”、“今天下午”、“夜深啦”等。
@@ -112,7 +120,244 @@ class ContentService:
logger.error(f"生成说说内容时发生异常: {e}")
return ""
async def generate_comment(self, content: str, target_name: str, rt_con: str = "", images: list = []) -> str:
async def generate_story_with_image_info(
self, topic: str, context: str | None = None
) -> tuple[str, dict]:
"""
生成说说内容并同时生成NovelAI图片提示词信息
:param topic: 说说的主题
:param context: 可选的聊天上下文
:return: (说说文本, 图片信息字典)
图片信息字典格式: {
"prompt": str, # NovelAI提示词英文
"negative_prompt": str, # 负面提示词(英文)
"include_character": bool, # 画面是否包含bot自己true时插入角色外貌提示词
"aspect_ratio": str # 画幅(方图/横图/竖图)
}
"""
try:
# 获取模型配置
models = llm_api.get_available_models()
text_model = str(self.get_config("models.text_model", "replyer"))
model_config = models.get(text_model)
if not model_config:
logger.error("未配置LLM模型")
return "", {"has_image": False}
# 获取机器人信息(核心人格配置)
bot_personality_core = config_api.get_global_config("personality.personality_core", "一个机器人")
bot_personality_side = config_api.get_global_config("personality.personality_side", "")
bot_reply_style = config_api.get_global_config("personality.reply_style", "内容积极向上")
qq_account = config_api.get_global_config("bot.qq_account", "")
# 获取角色外貌描述用于告知LLM
character_prompt = self.get_config("novelai.character_prompt", "")
# 获取当前时间信息
now = datetime.datetime.now()
current_time = now.strftime("%Y年%m月%d%H:%M")
weekday_names = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"]
weekday = weekday_names[now.weekday()]
# 构建提示词
prompt_topic = f"主题是'{topic}'" if topic else "主题不限"
# 构建人设描述
personality_desc = f"你的核心人格:{bot_personality_core}"
if bot_personality_side:
personality_desc += f"\n你的人格侧面:{bot_personality_side}"
personality_desc += f"\n\n你的表达方式:{bot_reply_style}"
# 检查是否启用AI配图统一开关
ai_image_enabled = self.get_config("ai_image.enable_ai_image", False)
provider = self.get_config("ai_image.provider", "siliconflow")
# NovelAI配图指引内置
novelai_guide = ""
output_format = '{"text": "说说正文内容"}'
if ai_image_enabled and provider == "novelai":
# 构建角色信息提示
character_info = ""
if character_prompt:
character_info = f"""
**角色特征锚点**当include_character=true时会插入以下基础特征
```
{character_prompt}
```
📌 重要说明:
- 这只是角色的**基础外貌特征**(发型、眼睛、耳朵等固定特征),用于锚定角色身份
- 你可以**自由描述**:衣服、动作、表情、姿势、装饰、配饰等所有可变元素
- 例如可以让角色穿不同风格的衣服casual, formal, sportswear, dress等
- 例如可以设计各种动作sitting, standing, walking, running, lying down等
- 例如可以搭配各种表情smile, laugh, serious, thinking, surprised等
- **鼓励创意**:根据说说内容自由发挥,让画面更丰富生动!
"""
novelai_guide = f"""
**配图说明:**
这条说说会使用NovelAI Diffusion模型二次元风格生成配图。
{character_info}
**提示词生成要求(非常重要):**
你需要生成一段详细的英文图片提示词,必须包含以下要素:
1. **画质标签**(必需):
- 开头必须加masterpiece, best quality, detailed, high resolution
2. **主体元素**(自由发挥):
- 人物描述:表情、动作、姿态(**完全自由**,不受角色锚点限制)
- 服装搭配casual clothing, dress, hoodie, school uniform, sportswear等**任意选择**
- 配饰装饰hat, glasses, ribbon, jewelry, bag等**随意添加**
- 物体/场景:具体的物品、建筑、自然景观等
3. **场景与环境**(必需):
- 地点indoor/outdoor, cafe, park, bedroom, street, beach, forest等
- 背景描述背景的细节sky, trees, buildings, ocean, mountains等
4. **氛围与风格**(必需):
- 光线sunlight, sunset, golden hour, soft lighting, dramatic lighting, night
- 天气/时间sunny day, rainy, cloudy, starry night, dawn, dusk
- 整体氛围peaceful, cozy, romantic, energetic, melancholic, playful
5. **色彩与细节**(推荐):
- 主色调warm colors, cool tones, pastel colors, vibrant colors
- 特殊细节falling petals, sparkles, lens flare, depth of field, bokeh
6. **include_character字段**
- true画面中包含"你自己"(自拍、你在画面中的场景)
- false画面中不包含你风景、物品、他人
7. **negative_prompt负面提示词**
- **严格禁止**以下内容nsfw, nude, explicit, sexual content, violence, gore, blood
- 排除质量问题lowres, bad anatomy, bad hands, deformed, mutilated, ugly
- 排除瑕疵blurry, poorly drawn, worst quality, low quality, jpeg artifacts
- 可以自行补充其他不需要的元素
8. **aspect_ratio画幅**
- 方图:适合头像、特写、正方形构图
- 横图:适合风景、全景、宽幅场景
- 竖图:适合人物全身、纵向构图
**内容审核规则(必须遵守)**
- 🚫 严禁生成NSFW、色情、裸露、性暗示内容
- 🚫 严禁生成暴力、血腥、恐怖、惊悚内容
- 🚫 严禁生成肢体畸形、器官变异、恶心画面
- ✅ 提示词必须符合健康、积极、美好的审美标准
- ✅ 专注于日常生活、自然风景、温馨场景等正面内容
**创意自由度**
- 💡 **衣服搭配**:可以自由设计各种服装风格(休闲、正式、运动、可爱、时尚等)
- 💡 **动作姿势**:站、坐、躺、走、跑、跳、伸展等任意动作
- 💡 **表情情绪**:微笑、大笑、思考、惊讶、温柔、调皮等丰富表情
- 💡 **场景创意**:根据说说内容自由发挥,让画面更贴合心情和主题
**示例提示词(展示多样性)**
- 休闲风:"masterpiece, best quality, 1girl, casual clothing, white t-shirt, jeans, sitting on bench, outdoor park, reading book, afternoon sunlight, relaxed atmosphere"
- 运动风:"masterpiece, best quality, 1girl, sportswear, running in park, energetic, morning light, trees background, dynamic pose, healthy lifestyle"
- 咖啡馆:"masterpiece, best quality, 1girl, sitting in cozy cafe, holding coffee cup, warm lighting, wooden table, books beside, peaceful atmosphere"
"""
output_format = '''{"text": "说说正文内容", "image": {"prompt": "详细的英文提示词(包含画质+主体+场景+氛围+光线+色彩)", "negative_prompt": "负面词", "include_character": true/false, "aspect_ratio": "方图/横图/竖图"}}'''
elif ai_image_enabled and provider == "siliconflow":
novelai_guide = """
**配图说明:**
这条说说会使用AI生成配图。
**提示词生成要求(非常重要):**
你需要生成一段详细的英文图片描述,必须包含以下要素:
1. **主体内容**:画面的核心元素(人物/物体/场景)
2. **具体场景**:地点、环境、背景细节
3. **氛围与风格**:整体感觉、光线、天气、色调
4. **细节描述**:补充的视觉细节(动作、表情、装饰等)
**示例提示词**
- "a girl sitting in a modern cafe, warm afternoon lighting, wooden furniture, coffee cup on table, books beside her, cozy and peaceful atmosphere, soft focus background"
- "sunset over the calm ocean, golden hour, orange and purple sky, gentle waves, peaceful and serene mood, wide angle view"
- "cherry blossoms in spring, soft pink petals falling, blue sky, sunlight filtering through branches, peaceful park scene, gentle breeze"
"""
output_format = '''{"text": "说说正文内容", "image": {"prompt": "详细的英文描述(主体+场景+氛围+光线+细节)"}}'''
prompt = f"""
{personality_desc}
现在是{current_time}{weekday}),你想写一条{prompt_topic}的说说发表在qq空间上。
**说说文本规则:**
1. **绝对禁止**在说说中直接、完整地提及当前的年月日或几点几分。
2. 你应该将当前时间作为创作的背景,用它来判断现在是"清晨""傍晚"还是"深夜"
3. 使用自然、模糊的词语来暗示时间,例如"刚刚""今天下午""夜深啦"等。
4. **内容简短**总长度严格控制在100字以内。
5. **禁止表情**严禁使用任何Emoji表情符号。
6. **严禁重复**:下方会提供你最近发过的说说历史,你必须创作一条全新的、与历史记录内容和主题都不同的说说。
7. 不要刻意突出自身学科背景,不要浮夸,不要夸张修辞。
{novelai_guide}
**输出格式JSON**
{output_format}
只输出JSON格式不要有其他内容。
"""
# 如果有上下文则加入到prompt中
if context:
prompt += f"\n\n作为参考,这里有一些最近的聊天记录:\n---\n{context}\n---"
# 添加历史记录以避免重复
prompt += "\n\n---历史说说记录---\n"
history_block = await get_send_history(qq_account)
if history_block:
prompt += history_block
# 调用LLM生成内容
success, response, _, _ = await llm_api.generate_with_model(
prompt=prompt,
model_config=model_config,
request_type="story.generate_with_image",
temperature=0.3,
max_tokens=1500,
)
if success:
# 解析JSON响应
import json5
try:
# 提取JSON部分去除可能的markdown代码块标记
json_text = response.strip()
if json_text.startswith("```json"):
json_text = json_text[7:]
if json_text.startswith("```"):
json_text = json_text[3:]
if json_text.endswith("```"):
json_text = json_text[:-3]
json_text = json_text.strip()
data = json5.loads(json_text)
story_text = data.get("text", "")
image_info = data.get("image", {})
# 确保图片信息完整
if not isinstance(image_info, dict):
image_info = {}
logger.info(f"成功生成说说:'{story_text}'")
logger.info(f"配图信息: {image_info}")
return story_text, image_info
except Exception as e:
logger.error(f"解析JSON失败: {e}, 原始响应: {response[:200]}")
# 降级处理:只返回文本,空配图信息
return response, {}
else:
logger.error("生成说说内容失败")
return "", {}
except Exception as e:
logger.error(f"生成说说内容时发生异常: {e}")
return "", {}
"""
针对一条具体的说说内容生成评论。
"""

View File

@@ -31,18 +31,48 @@ class ImageService:
"""
self.get_config = get_config
async def generate_image_from_prompt(self, prompt: str, save_dir: str | None = None) -> tuple[bool, Path | None]:
"""
直接使用提示词生成图片(硅基流动)
:param prompt: 图片提示词(英文)
:param save_dir: 图片保存目录None使用默认
:return: (是否成功, 图片路径)
"""
try:
api_key = str(self.get_config("siliconflow.api_key", ""))
image_num = self.get_config("ai_image.image_number", 1)
if not api_key:
logger.warning("硅基流动API未配置跳过图片生成")
return False, None
# 图片目录
if save_dir:
image_dir = Path(save_dir)
else:
plugin_dir = Path(__file__).parent.parent
image_dir = plugin_dir / "images"
image_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"正在生成 {image_num} 张AI配图...")
success, img_path = await self._call_siliconflow_api(api_key, prompt, str(image_dir), image_num)
return success, img_path
except Exception as e:
logger.error(f"生成AI配图时发生异常: {e}")
return False, None
async def generate_images_for_story(self, story: str) -> bool:
"""
根据说说内容判断是否需要生成AI配图并执行生成任务。
根据说说内容判断是否需要生成AI配图并执行生成任务(硅基流动)
:param story: 说说内容。
:return: 图片是否成功生成(或不需要生成)。
"""
try:
enable_ai_image = bool(self.get_config("send.enable_ai_image", False))
api_key = str(self.get_config("models.siliconflow_apikey", ""))
image_dir = str(self.get_config("send.image_directory", "./data/plugins/maizone_refactored/images"))
image_num_raw = self.get_config("send.ai_image_number", 1)
api_key = str(self.get_config("siliconflow.api_key", ""))
image_num_raw = self.get_config("ai_image.image_number", 1)
# 安全地处理图片数量配置并限制在API允许的范围内
try:
@@ -52,15 +82,14 @@ class ImageService:
logger.warning(f"无效的图片数量配置: {image_num_raw}使用默认值1")
image_num = 1
if not enable_ai_image:
return True # 未启用AI配图视为成功
if not api_key:
logger.error("启用了AI配图但未填写SiliconFlow API密钥")
return False
logger.warning("硅基流动API未配置跳过图片生成")
return True
# 确保图片目录存在
Path(image_dir).mkdir(parents=True, exist_ok=True)
# 图片目录(使用统一配置)
plugin_dir = Path(__file__).parent.parent
image_dir = plugin_dir / "images"
image_dir.mkdir(parents=True, exist_ok=True)
# 生成图片提示词
image_prompt = await self._generate_image_prompt(story)
@@ -69,7 +98,8 @@ class ImageService:
return False
logger.info(f"正在为说说生成 {image_num} 张AI配图...")
return await self._call_siliconflow_api(api_key, image_prompt, image_dir, image_num)
success, _ = await self._call_siliconflow_api(api_key, image_prompt, str(image_dir), image_num)
return success
except Exception as e:
logger.error(f"处理AI配图时发生异常: {e}")
@@ -127,7 +157,7 @@ class ImageService:
logger.error(f"生成图片提示词时发生异常: {e}")
return ""
async def _call_siliconflow_api(self, api_key: str, image_prompt: str, image_dir: str, batch_size: int) -> bool:
async def _call_siliconflow_api(self, api_key: str, image_prompt: str, image_dir: str, batch_size: int) -> tuple[bool, Path | None]:
"""
调用硅基流动SiliconFlow的API来生成图片。
@@ -135,7 +165,7 @@ class ImageService:
:param image_prompt: 用于生成图片的提示词。
:param image_dir: 图片保存目录。
:param batch_size: 生成图片的数量1-4
:return: API调用是否成功
:return: (API调用是否成功, 第一张图片路径)
"""
url = "https://api.siliconflow.cn/v1/images/generations"
headers = {
@@ -175,12 +205,13 @@ class ImageService:
error_text = await response.text()
logger.error(f"生成图片出错,错误码[{response.status}]")
logger.error(f"错误响应: {error_text}")
return False
return False, None
json_data = await response.json()
image_urls = [img["url"] for img in json_data["images"]]
success_count = 0
first_img_path = None
# 下载并保存图片
for i, img_url in enumerate(image_urls):
try:
@@ -194,7 +225,7 @@ class ImageService:
image = Image.open(BytesIO(img_data))
# 保存图片为PNG格式确保兼容性
filename = f"image_{i}.png"
filename = f"siliconflow_{i}.png"
save_path = Path(image_dir) / filename
# 转换为RGB模式如果必要避免RGBA等模式的问题
@@ -206,21 +237,25 @@ class ImageService:
image.save(save_path, format="PNG")
logger.info(f"图片已保存至: {save_path}")
success_count += 1
# 记录第一张图片路径
if first_img_path is None:
first_img_path = save_path
except Exception as e:
logger.error(f"处理图片失败: {e!s}")
continue
except Exception as e:
logger.error(f"下载{i+1}图片失败: {e!s}")
logger.error(f"下载图片失败: {e!s}")
continue
# 只要至少有一张图片成功就返回True
return success_count > 0
# 至少有一张图片成功就返回True
return success_count > 0, first_img_path
except Exception as e:
logger.error(f"调用AI生图API时发生异常: {e}")
return False
return False, None
def _encode_image_to_base64(self, img: Image.Image) -> str:
"""

View File

@@ -0,0 +1,286 @@
"""
NovelAI图片生成服务 - 空间插件专用
独立实现,不依赖其他插件
"""
import asyncio
import base64
import random
import uuid
import zipfile
import io
from pathlib import Path
from typing import Optional
import aiohttp
from PIL import Image
from src.common.logger import get_logger
logger = get_logger("MaiZone.NovelAIService")
class MaiZoneNovelAIService:
"""空间插件的NovelAI图片生成服务独立实现"""
def __init__(self, get_config):
self.get_config = get_config
# NovelAI配置
self.api_key = self.get_config("novelai.api_key", "")
self.base_url = "https://image.novelai.net/ai/generate-image"
self.model = "nai-diffusion-4-5-full"
# 代理配置
proxy_host = self.get_config("novelai.proxy_host", "")
proxy_port = self.get_config("novelai.proxy_port", 0)
self.proxy = f"http://{proxy_host}:{proxy_port}" if proxy_host and proxy_port else ""
# 生成参数
self.steps = 28
self.scale = 5.0
self.sampler = "k_euler"
self.noise_schedule = "karras"
# 角色提示词当LLM决定包含角色时使用
self.character_prompt = self.get_config("novelai.character_prompt", "")
self.base_negative_prompt = self.get_config("novelai.base_negative_prompt", "nsfw, nude, explicit, sexual content, lowres, bad anatomy, bad hands")
# 图片保存目录(使用统一配置)
plugin_dir = Path(__file__).parent.parent
self.image_dir = plugin_dir / "images"
self.image_dir.mkdir(parents=True, exist_ok=True)
if self.api_key:
logger.info(f"NovelAI图片生成已配置模型: {self.model}")
def is_available(self) -> bool:
"""检查NovelAI服务是否可用"""
return bool(self.api_key)
async def generate_image_from_prompt_data(
self,
prompt: str,
negative_prompt: Optional[str] = None,
include_character: bool = False,
width: int = 1024,
height: int = 1024
) -> tuple[bool, Optional[Path], str]:
"""根据提示词生成图片
Args:
prompt: NovelAI格式的英文提示词
negative_prompt: LLM生成的负面提示词可选
include_character: 是否包含角色形象
width: 图片宽度
height: 图片高度
Returns:
(是否成功, 图片路径, 消息)
"""
if not self.api_key:
return False, None, "NovelAI API Key未配置"
try:
# 处理角色提示词
final_prompt = prompt
if include_character and self.character_prompt:
final_prompt = f"{self.character_prompt}, {prompt}"
logger.info(f"包含角色形象,添加角色提示词")
# 合并负面提示词
final_negative = self.base_negative_prompt
if negative_prompt:
if final_negative:
final_negative = f"{final_negative}, {negative_prompt}"
else:
final_negative = negative_prompt
logger.info(f"🎨 开始生成图片...")
logger.info(f" 尺寸: {width}x{height}")
logger.info(f" 正面提示词: {final_prompt[:100]}...")
logger.info(f" 负面提示词: {final_negative[:100]}...")
# 构建请求payload
payload = self._build_payload(final_prompt, final_negative, width, height)
# 发送请求
image_data = await self._call_novelai_api(payload)
if not image_data:
return False, None, "API请求失败"
# 保存图片
image_path = await self._save_image(image_data)
if not image_path:
return False, None, "图片保存失败"
logger.info(f"✅ 图片生成成功: {image_path}")
return True, image_path, "生成成功"
except Exception as e:
logger.error(f"生成图片时出错: {e}", exc_info=True)
return False, None, f"生成失败: {str(e)}"
def _build_payload(self, prompt: str, negative_prompt: str, width: int, height: int) -> dict:
"""构建NovelAI API请求payload"""
is_v4_model = "diffusion-4" in self.model
is_v3_model = "diffusion-3" in self.model
parameters = {
"width": width,
"height": height,
"scale": self.scale,
"steps": self.steps,
"sampler": self.sampler,
"seed": random.randint(0, 9999999999),
"n_samples": 1,
"ucPreset": 0,
"qualityToggle": True,
"sm": False,
"sm_dyn": False,
"noise_schedule": self.noise_schedule if is_v4_model else "native",
}
# V4.5模型使用新格式
if is_v4_model:
parameters.update({
"params_version": 3,
"cfg_rescale": 0,
"autoSmea": False,
"legacy": False,
"legacy_v3_extend": False,
"legacy_uc": False,
"add_original_image": True,
"controlnet_strength": 1,
"dynamic_thresholding": False,
"prefer_brownian": True,
"normalize_reference_strength_multiple": True,
"use_coords": True,
"inpaintImg2ImgStrength": 1,
"deliberate_euler_ancestral_bug": False,
"skip_cfg_above_sigma": None,
"characterPrompts": [],
"stream": "msgpack",
"v4_prompt": {
"caption": {
"base_caption": prompt,
"char_captions": []
},
"use_coords": True,
"use_order": True
},
"v4_negative_prompt": {
"caption": {
"base_caption": negative_prompt,
"char_captions": []
},
"legacy_uc": False
},
"negative_prompt": negative_prompt,
"reference_image_multiple": [],
"reference_information_extracted_multiple": [],
"reference_strength_multiple": []
})
# V3使用negative_prompt字段
elif is_v3_model:
parameters["negative_prompt"] = negative_prompt
payload = {
"input": prompt,
"model": self.model,
"action": "generate",
"parameters": parameters
}
# V4.5需要额外字段
if is_v4_model:
payload["use_new_shared_trial"] = True
return payload
async def _call_novelai_api(self, payload: dict) -> Optional[bytes]:
"""调用NovelAI API"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
connector = None
request_kwargs = {
"json": payload,
"headers": headers,
"timeout": aiohttp.ClientTimeout(total=120)
}
if self.proxy:
request_kwargs["proxy"] = self.proxy
connector = aiohttp.TCPConnector()
logger.info(f"使用代理: {self.proxy}")
try:
async with aiohttp.ClientSession(connector=connector) as session:
async with session.post(self.base_url, **request_kwargs) as resp:
if resp.status != 200:
error_text = await resp.text()
logger.error(f"API请求失败 ({resp.status}): {error_text[:200]}")
return None
img_data = await resp.read()
logger.info(f"收到响应数据: {len(img_data)} bytes")
# 检查是否是ZIP文件
if img_data[:4] == b'PK\x03\x04':
logger.info("检测到ZIP格式解压中...")
return self._extract_from_zip(img_data)
elif img_data[:4] == b'\x89PNG':
logger.info("检测到PNG格式")
return img_data
else:
logger.warning(f"未知文件格式前4字节: {img_data[:4].hex()}")
return img_data
except Exception as e:
logger.error(f"API调用失败: {e}", exc_info=True)
return None
def _extract_from_zip(self, zip_data: bytes) -> Optional[bytes]:
"""从ZIP中提取PNG"""
try:
with zipfile.ZipFile(io.BytesIO(zip_data)) as zf:
for filename in zf.namelist():
if filename.lower().endswith('.png'):
img_data = zf.read(filename)
logger.info(f"从ZIP提取: {filename} ({len(img_data)} bytes)")
return img_data
logger.error("ZIP中未找到PNG文件")
return None
except Exception as e:
logger.error(f"解压ZIP失败: {e}")
return None
async def _save_image(self, image_data: bytes) -> Optional[Path]:
"""保存图片到本地"""
try:
filename = f"novelai_{uuid.uuid4().hex[:12]}.png"
filepath = self.image_dir / filename
# 写入文件
with open(filepath, "wb") as f:
f.write(image_data)
f.flush()
import os
os.fsync(f.fileno())
# 验证图片
try:
with Image.open(filepath) as img:
img.verify()
with Image.open(filepath) as img:
logger.info(f"图片验证成功: {img.format} {img.size}")
except Exception as e:
logger.warning(f"图片验证失败: {e}")
return filepath
except Exception as e:
logger.error(f"保存图片失败: {e}")
return None

View File

@@ -83,21 +83,93 @@ class QZoneService:
return context
async def send_feed(self, topic: str, stream_id: str | None) -> dict[str, Any]:
"""发送一条说说"""
"""发送一条说说支持AI配图"""
cross_context = await self._get_cross_context()
story = await self.content_service.generate_story(topic, context=cross_context)
if not story:
return {"success": False, "message": "生成说说内容失败"}
await self.image_service.generate_images_for_story(story)
# 检查是否启用AI配图
ai_image_enabled = self.get_config("ai_image.enable_ai_image", False)
provider = self.get_config("ai_image.provider", "siliconflow")
image_path = None
if ai_image_enabled:
# 启用AI配图文本模型生成说说+图片提示词
story, image_info = await self.content_service.generate_story_with_image_info(topic, context=cross_context)
if not story:
return {"success": False, "message": "生成说说内容失败"}
# 根据provider调用对应的生图服务
if provider == "novelai":
try:
from .novelai_service import MaiZoneNovelAIService
novelai_service = MaiZoneNovelAIService(self.get_config)
if novelai_service.is_available():
# 解析画幅
aspect_ratio = image_info.get("aspect_ratio", "方图")
size_map = {
"方图": (1024, 1024),
"横图": (1216, 832),
"竖图": (832, 1216),
}
width, height = size_map.get(aspect_ratio, (1024, 1024))
logger.info(f"🎨 开始生成NovelAI配图...")
success, img_path, msg = await novelai_service.generate_image_from_prompt_data(
prompt=image_info.get("prompt", ""),
negative_prompt=image_info.get("negative_prompt"),
include_character=image_info.get("include_character", False),
width=width,
height=height
)
if success and img_path:
image_path = img_path
logger.info(f"✅ NovelAI配图生成成功")
else:
logger.warning(f"⚠️ NovelAI配图生成失败: {msg}")
else:
logger.warning("NovelAI服务不可用未配置API Key")
except Exception as e:
logger.error(f"NovelAI配图生成出错: {e}", exc_info=True)
elif provider == "siliconflow":
try:
# 调用硅基流动生成图片
success, img_path = await self.image_service.generate_image_from_prompt(
prompt=image_info.get("prompt", ""),
save_dir=None # 使用默认images目录
)
if success and img_path:
image_path = img_path
logger.info(f"✅ 硅基流动配图生成成功")
else:
logger.warning(f"⚠️ 硅基流动配图生成失败")
except Exception as e:
logger.error(f"硅基流动配图生成出错: {e}", exc_info=True)
else:
# 不使用AI配图只生成说说文本
story = await self.content_service.generate_story(topic, context=cross_context)
if not story:
return {"success": False, "message": "生成说说内容失败"}
qq_account = config_api.get_global_config("bot.qq_account", "")
api_client = await self._get_api_client(qq_account, stream_id)
if not api_client:
return {"success": False, "message": "获取QZone API客户端失败"}
image_dir = self.get_config("send.image_directory")
images_bytes = self._load_local_images(image_dir)
# 加载图片
images_bytes = []
# 使用AI生成的图片
if image_path and image_path.exists():
try:
with open(image_path, "rb") as f:
images_bytes.append(f.read())
logger.info(f"添加AI配图到说说")
except Exception as e:
logger.error(f"读取AI配图失败: {e}")
try:
success, _ = await api_client["publish"](story, images_bytes)
@@ -115,19 +187,16 @@ class QZoneService:
if not story:
return {"success": False, "message": "根据活动生成说说内容失败"}
await self.image_service.generate_images_for_story(story)
if self.get_config("send.enable_ai_image", False):
await self.image_service.generate_images_for_story(story)
qq_account = config_api.get_global_config("bot.qq_account", "")
# 注意:定时任务通常在后台运行,没有特定的用户会话,因此 stream_id 为 None
api_client = await self._get_api_client(qq_account, stream_id=None)
if not api_client:
return {"success": False, "message": "获取QZone API客户端失败"}
image_dir = self.get_config("send.image_directory")
images_bytes = self._load_local_images(image_dir)
try:
success, _ = await api_client["publish"](story, images_bytes)
success, _ = await api_client["publish"](story, [])
if success:
return {"success": True, "message": story}
return {"success": False, "message": "发布说说至QQ空间失败"}
@@ -434,7 +503,12 @@ class QZoneService:
logger.debug(f"锁定待评论说说: {comment_key}")
self.processing_comments.add(comment_key)
try:
comment_text = await self.content_service.generate_comment(content, target_name, rt_con, images)
# 使用content_service生成评论相当于回复好友的说说
comment_text = await self.content_service.generate_comment_reply(
story_content=content or rt_con or "说说内容",
comment_content="", # 评论说说时没有评论内容
commenter_name=target_name
)
if comment_text:
success = await api_client["comment"](target_qq, fid, comment_text)
if success:
@@ -465,61 +539,6 @@ class QZoneService:
return result
def _load_local_images(self, image_dir: str) -> list[bytes]:
"""随机加载本地图片(不删除文件)"""
images = []
if not image_dir or not os.path.exists(image_dir):
logger.warning(f"图片目录不存在或未配置: {image_dir}")
return images
try:
# 获取所有图片文件
all_files = [
f
for f in os.listdir(image_dir)
if os.path.isfile(os.path.join(image_dir, f))
and f.lower().endswith((".jpg", ".jpeg", ".png", ".gif", ".bmp"))
]
if not all_files:
logger.warning(f"图片目录中没有找到图片文件: {image_dir}")
return images
# 检查是否启用配图
enable_image = bool(self.get_config("send.enable_image", False))
if not enable_image:
logger.info("说说配图功能已关闭")
return images
# 根据配置选择图片数量
config_image_number = self.get_config("send.image_number", 1)
try:
config_image_number = int(config_image_number)
except (ValueError, TypeError):
config_image_number = 1
logger.warning("配置项 image_number 值无效,使用默认值 1")
max_images = min(min(config_image_number, 9), len(all_files)) # 最多9张最少1张
selected_count = max(1, max_images) # 确保至少选择1张
selected_files = random.sample(all_files, selected_count)
logger.info(f"{len(all_files)} 张图片中随机选择了 {selected_count} 张配图")
for filename in selected_files:
full_path = os.path.join(image_dir, filename)
try:
with open(full_path, "rb") as f:
image_data = f.read()
images.append(image_data)
logger.info(f"加载图片: {filename} ({len(image_data)} bytes)")
except Exception as e:
logger.error(f"加载图片 {filename} 失败: {e}")
return images
except Exception as e:
logger.error(f"加载本地图片失败: {e}")
return []
def _generate_gtk(self, skey: str) -> str:
hash_val = 5381
for char in skey:

View File

@@ -414,7 +414,22 @@ class NapcatAdapterPlugin(BasePlugin):
"enable_emoji_like": ConfigField(type=bool, default=True, description="是否启用群聊表情回复处理"),
"enable_reply_at": ConfigField(type=bool, default=True, description="是否在回复时自动@原消息发送者"),
"reply_at_rate": ConfigField(type=float, default=0.5, description="回复时@的概率0.0-1.0"),
"enable_video_processing": ConfigField(type=bool, default=True, description="是否启用视频消息处理(下载和解析)"),
# ========== 视频消息处理配置 ==========
"enable_video_processing": ConfigField(
type=bool,
default=True,
description="是否启用视频消息处理(下载和解析)。关闭后视频消息将显示为 [视频消息] 占位符,不会进行下载"
),
"video_max_size_mb": ConfigField(
type=int,
default=100,
description="允许下载的视频文件最大大小MB超过此大小的视频将被跳过"
),
"video_download_timeout": ConfigField(
type=int,
default=60,
description="视频下载超时时间(秒),若超时将中止下载"
),
},
}

View File

@@ -37,10 +37,21 @@ class MessageHandler:
def __init__(self, adapter: "NapcatAdapter"):
self.adapter = adapter
self.plugin_config: dict[str, Any] | None = None
self._video_downloader = None
def set_plugin_config(self, config: dict[str, Any]) -> None:
"""设置插件配置"""
"""设置插件配置,并根据配置初始化视频下载器"""
self.plugin_config = config
# 如果启用了视频处理,根据配置初始化视频下载器
if config_api.get_plugin_config(config, "features.enable_video_processing", True):
from ..video_handler import VideoDownloader
max_size = config_api.get_plugin_config(config, "features.video_max_size_mb", 100)
timeout = config_api.get_plugin_config(config, "features.video_download_timeout", 60)
self._video_downloader = VideoDownloader(max_size_mb=max_size, download_timeout=timeout)
logger.debug(f"视频下载器已初始化: max_size={max_size}MB, timeout={timeout}s")
async def handle_raw_message(self, raw: dict[str, Any]):
"""
@@ -105,6 +116,11 @@ class MessageHandler:
if seg_message:
seg_list.append(seg_message)
# 防御性检查:确保至少有一个消息段,避免消息为空导致构建失败
if not seg_list:
logger.warning("消息内容为空,添加占位符文本")
seg_list.append({"type": "text", "data": "[消息内容为空]"})
msg_builder.format_info(
content_format=[seg["type"] for seg in seg_list],
accept_format=ACCEPT_FORMAT,
@@ -302,7 +318,7 @@ class MessageHandler:
video_source = file_path if file_path else video_url
if not video_source:
logger.warning("视频消息缺少URL或文件路径信息")
return None
return {"type": "text", "data": "[视频消息]"}
try:
if file_path and Path(file_path).exists():
@@ -320,14 +336,17 @@ class MessageHandler:
},
}
elif video_url:
# URL下载处理
from ..video_handler import get_video_downloader
video_downloader = get_video_downloader()
download_result = await video_downloader.download_video(video_url)
# URL下载处理 - 使用配置中的下载器实例
downloader = self._video_downloader
if not downloader:
from ..video_handler import get_video_downloader
downloader = get_video_downloader()
download_result = await downloader.download_video(video_url)
if not download_result["success"]:
logger.warning(f"视频下载失败: {download_result.get('error', '未知错误')}")
return None
return {"type": "text", "data": f"[视频消息] ({download_result.get('error', '下载失败')})"}
video_base64 = base64.b64encode(download_result["data"]).decode("utf-8")
logger.debug(f"视频下载成功,大小: {len(download_result['data']) / (1024 * 1024):.2f} MB")
@@ -343,11 +362,11 @@ class MessageHandler:
}
else:
logger.warning("既没有有效的本地文件路径也没有有效的视频URL")
return None
return {"type": "text", "data": "[视频消息]"}
except Exception as e:
logger.error(f"视频消息处理失败: {e!s}")
return None
return {"type": "text", "data": "[视频消息处理出错]"}
async def _handle_rps_message(self, segment: dict) -> SegPayload:
"""处理猜拳消息"""

View File

@@ -1,5 +1,5 @@
[inner]
version = "8.0.1"
version = "8.0.2"
#----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -309,6 +309,7 @@ perceptual_activation_threshold = 3 # 激活阈值(召回次数→短期)
# 短期记忆层配置
short_term_max_memories = 30 # 短期记忆最大数量
short_term_transfer_threshold = 0.6 # 转移到长期记忆的重要性阈值
short_term_enable_force_cleanup = true # 开启压力泄压(建议高频场景开启)
short_term_search_top_k = 5 # 搜索时返回的最大数量
short_term_decay_factor = 0.98 # 衰减因子