This commit is contained in:
雅诺狐
2025-08-19 15:35:11 +08:00
5 changed files with 676 additions and 120 deletions

View File

@@ -1,66 +0,0 @@
# 全新三层记忆系统架构 (V2.0) 设计文档
## 1. 核心思想
本架构旨在建立一个清晰、有序的信息处理流水线,模拟人类记忆从瞬时感知到长期知识沉淀的过程。信息将经历**短期记忆 (STM)**、**中期记忆 (MTM)** 和 **长期记忆 (LTM)** 三个阶段,实现从海量、零散到结构化、深刻的转化。
## 2. 架构分层详解
### 2.1. 短期记忆 (STM - Short-Term Memory) - “消息缓冲区”
* **职责**: 捕获并暂存所有进入核心的最新消息,为即时对话提供上下文,实现快速响应。
* **实现方式**:
* **内存队列**: 采用定长的内存队列(如 `collections.deque`),存储最近的 N 条原始消息(建议初始值为 200
* **实时向量化**: 消息入队时,异步进行文本内容的语义向量化,生成“意义指纹”。
* **快速检索**: 利用高效的向量相似度计算库(如 FAISS, Annoy在新消息到来时快速从队列中检索最相关的历史消息构建即时上下文。
* **触发机制**: 当队列达到容量上限时,将最老的一批消息(例如前 50 条)打包,移交给中期记忆模块处理。
### 2.2. 中期记忆 (MTM - Mid-Term Memory) - “记忆压缩器”
* **职责**: 对来自短期记忆的大量零散信息进行压缩、总结,形成结构化的“记忆片段”。
* **实现方式**:
* **LLM 总结**: 调用大语言模型LLM对 STM 移交的消息包进行深度分析和总结提炼成一段精简的“记忆陈述”Memory Statement
* **信息结构化**: 每个记忆片段都将包含以下元数据:
* `memory_text`: 记忆陈述本身。
* `keywords`: 关联的关键词列表。
* `time_range`: 记忆所涉及的时间范围。
* `importance_score`: LLM 评估的重要性评分。
* `access_count`: 访问计数器,初始为 0。
* **持久化存储**: 将结构化的记忆片段存储在数据库中,可复用或改造现有 `Memory` 表。
* **触发机制**: 由 STM 的队列溢出事件触发。
### 2.3. 长期记忆 (LTM - Long-Term Memory) - “知识图谱”
* **职责**: 将经过验证的、具有高价值的中期记忆,内化为系统核心知识的一部分,构建深层联系。
* **实现方式**:
* **晋升机制**: 通过一个定期的“记忆整理”任务,扫描中期记忆数据库。当某个记忆片段的 `access_count` 达到预设阈值(例如 10 次),则触发晋升。
* **融入图谱**: 晋升的记忆片段将被送往 `Hippocampus` 模块。`Hippocampus` 将不再直接处理原始聊天记录,而是处理这些高质量、经过预处理的记忆片段。它会从中提取核心概念(节点)和它们之间的关系(边),然后将这些信息融入并更新现有的知识图谱。
* **触发机制**: 由定时任务(例如每天执行一次)触发。
## 3. 信息处理流程
```mermaid
graph TD
A[输入: 新消息] --> B{短期记忆 STM};
B --> |实时向量检索| C[输出: 对话上下文];
B --> |队列满| D{中期记忆 MTM};
D --> |LLM 总结| E[存入数据库: 记忆片段];
E --> |关键词/时间检索| C;
E --> |访问次数高| F{长期记忆 LTM};
F --> |LLM 提取概念/关系| G[更新: 知识图谱];
G --> |图谱扩散激活检索| C;
subgraph "内存中 (高速)"
B
end
subgraph "数据库中 (持久化)"
E
G
end
```
## 4. 现有模块改造计划
* **`InstantMemory`**: 将被新的 **STM****MTM** 模块取代。其原有的“判断是否需要记忆”和“总结”的功能,将融入到 MTM 的处理流程中。
* **`Hippocampus`**: 将保留其作为 **LTM** 的核心地位,但其输入源将从“随机抽样的历史聊天记录”变更为“从 MTM 晋升的高价值记忆片段”。这将极大提升其构建知识图谱的效率和质量。

View File

@@ -0,0 +1,526 @@
# 麦麦记忆系统架构 v3.0 设计文档
*融合 Mem0 先进理念的生产级记忆系统*
## 🎯 1. 核心思想
本架构借鉴 **Mem0** 的先进设计思路,建立一个**智能化、高效率、可扩展**的记忆系统。系统采用**事实导向**的记忆管理,通过**双阶段智能处理**实现从碎片化信息到结构化知识的转化,支持**语义检索**和**关系推理**。
### 设计原则
- **智能优于规则**使用LLM进行事实提取和冲突解决而非简单的阈值判断
- **效率优于完整**:关注核心事实,避免信息冗余和重复处理
- **语义优于字面**:基于向量相似度的语义检索,而非关键词匹配
- **异步优于同步**:记忆处理不阻塞主对话流程
## 📚 2. 三层架构详解
### 2.1. 智能短期记忆 (STM - Smart Short-Term Memory)
**定位**:高速语义缓存 + 智能事实提取器
#### 核心功能
- **消息缓冲**内存队列存储最近的对话消息建议200条
- **实时向量化**:消息入队时异步生成语义向量
- **语义检索**:基于相似度快速检索相关历史上下文
- **事实提取**:智能识别并提取对话中的重要事实
#### 技术架构
- **内存队列**使用定长双端队列deque存储最近消息
- **向量缓存**每条消息的embedding向量缓存在内存中
- **索引结构**使用FAISS或Annoy等高效向量索引库
- **处理流程**:消息入队 → 异步向量化 → 索引更新 → 批量转移判断
#### 触发机制
- **定时批处理**每隔一定时间如5分钟处理一批消息
- **队列满载**:队列达到容量时触发批量转移
- **对话间隙**:检测到对话暂停时进行处理
### 2.2. 智能中期记忆 (MTM - Smart Mid-Term Memory)
**定位**:事实管理器 + 冲突解决器借鉴Mem0双阶段处理
#### 双阶段处理流程
##### 阶段一:事实提取 (Extraction Phase)
**输入源**:消息批次 + 对话摘要 + 最近10条消息
**输出结果**:结构化的候选事实列表
**提取内容**
1. 用户偏好和习惯
2. 重要的个人信息
3. 关键的决定和计划
4. 情感状态变化
**处理方式**使用LLM分析对话内容输出JSON格式的事实数据包含内容、重要性评分、事实类型等元数据。
##### 阶段二:冲突解决 (Update Phase)
**处理流程**
1. 检索相似的现有记忆top-k语义搜索
2. LLM分析新事实与现有记忆的关系
3. 智能决策操作类型
**操作类型**
- **ADD**:添加全新记忆
- **UPDATE**:更新现有记忆内容
- **DELETE**:删除矛盾或过时记忆
- **NOOP**:无需任何操作
**决策机制**基于语义相似度和时间戳LLM判断最合适的操作确保记忆库的一致性和准确性。
#### 记忆元数据结构
每个记忆事实包含以下属性:
- **基础信息**唯一ID、内容文本、关键词列表
- **语义信息**向量化embedding、重要性评分、事实类型
- **时间信息**:创建时间、最后访问时间、访问计数
- **归属信息**对话ID、用户ID
- **事实类型**:用户偏好、个人信息、计划安排、情感状态等
### 2.3. 增强长期记忆 (LTM - Enhanced Long-Term Memory)
**定位**:知识图谱 + 关系推理引擎集成现有Hippocampus
#### 晋升机制
- **访问频次**`access_count >= 10` 的记忆事实
- **重要性评分**`importance_score >= 0.8` 的高价值信息
- **时间持久性**存在超过7天且仍被访问的记忆
#### 图谱增强借鉴Mem0g
**技术组件**
- **实体提取器**:从记忆事实中识别人物、地点、概念等实体
- **关系构建器**:分析实体间的语义关系,构建三元组
- **图谱集成器**:将新的实体关系融入现有知识图谱
**处理流程**
1. 对晋升的记忆事实进行实体提取
2. 构建实体间的关系三元组
3. 与现有Hippocampus知识图谱进行集成
4. 支持复杂的多跳推理和关系查询
## 🔄 3. 信息处理流程
```mermaid
graph TD
A[新消息] --> B{智能STM}
B --> |实时向量检索| C[即时上下文]
B --> |批量处理| D{智能MTM}
D --> |事实提取| E[候选事实]
E --> |冲突解决| F{决策引擎}
F --> |ADD| G[新增记忆]
F --> |UPDATE| H[更新记忆]
F --> |DELETE| I[删除冲突]
F --> |NOOP| J[无操作]
G --> K[(MTM数据库)]
H --> K
K --> |语义检索| C
K --> |访问频次高| L{增强LTM}
L --> |实体关系提取| M[知识图谱]
M --> |图谱扩散检索| C
subgraph "内存层 (毫秒级)"
B
end
subgraph "事实层 (秒级)"
D
E
F
K
end
subgraph "知识层 (分钟级)"
L
M
end
```
## ⚙️ 4. 配置体系
### 4.1. 核心配置
```toml
[memory_v3]
enable = true
processing_mode = "async" # async/sync
[memory_v3.stm]
max_size = 200
batch_size = 50
vector_index_type = "faiss" # faiss/annoy
similarity_threshold = 0.75
embedding_model = "text-embedding-3-small"
[memory_v3.mtm]
fact_extraction_batch_size = 20
importance_threshold = 0.6
conflict_resolution_top_k = 10
max_facts_per_batch = 50
[memory_v3.ltm]
promotion_access_threshold = 10
promotion_importance_threshold = 0.8
promotion_time_threshold = 604800 # 7天(秒)
enable_graph_enhancement = true
```
### 4.2. 性能配置
```toml
[memory_v3.performance]
max_concurrent_extractions = 3
llm_timeout = 30
vector_cache_size = 10000
enable_compression = true
compression_ratio = 0.1
[memory_v3.personalization]
enable_user_profiling = true
enable_context_adaptation = true
enable_emotional_weighting = true
```
## 🚀 5. 性能优化策略
### 5.1. 异步处理管道
**设计理念**:主流程与记忆处理分离,确保对话响应速度
**处理模式**
- **前台处理**立即从STM检索相关上下文快速响应用户
- **后台处理**:异步进行事实提取、冲突解决和记忆更新
- **流水线处理**:多个消息可以并行处理,提高整体吞吐量
### 5.2. 批量优化
- **批量向量化**:减少模型调用次数
- **批量数据库操作**提高I/O效率
- **批量LLM推理**降低API成本
### 5.3. 缓存策略
- **向量缓存**常用embedding保存在内存中
- **查询缓存**:相似查询复用结果
- **LRU淘汰**:自动清理过期缓存
## 🛡️ 6. 错误处理与降级
### 6.1. 分层降级机制
**LLM失效降级**
- 事实提取失败 → 基于规则的关键词提取
- 冲突解决失败 → 简单的时间戳去重
- 重要性评分失败 → 基于消息长度的启发式评分
**向量服务降级**
- 向量检索失败 → 降级到关键词匹配
- 向量化失败 → 使用TF-IDF等传统方法
- 索引异常 → 临时使用线性搜索
### 6.2. 容错机制
- **超时保护**LLM调用超时自动降级
- **重试机制**:网络错误自动重试(指数退避)
- **数据备份**:关键记忆多副本存储
- **状态恢复**:系统重启后自动恢复处理状态
## 🎨 7. 个性化与适应性
### 7.1. 用户画像集成
**画像维度**
- **兴趣领域**:用户关注的话题和偏好
- **沟通风格**:正式/非正式、简洁/详细等
- **记忆偏好**:希望记住的信息类型和重点
**个性化机制**
- 根据用户兴趣调整事实重要性评分
- 基于沟通风格优化记忆表达方式
- 考虑用户偏好决定记忆保留策略
### 7.2. 情境感知记忆
- **时间感知**:工作时间 vs 休闲时间的记忆优先级
- **场景感知**:群聊 vs 私聊的记忆策略差异
- **情绪感知**:情感状态影响记忆权重
### 7.3. 动态参数调整
**调整策略**
- **性能导向**:根据准确性和延迟指标自动调整阈值
- **使用模式导向**:基于用户行为模式优化配置
- **资源导向**:根据系统负载动态调整处理参数
**调整范围**
- 事实提取的重要性阈值
- 批处理的大小和频率
- 向量检索的相似度阈值
- 记忆晋升的访问次数阈值
## 📊 8. 监控与分析
### 8.1. 关键指标
- **记忆质量**:事实准确性、相关性评分
- **系统性能**:检索延迟、处理吞吐量
- **用户体验**:记忆命中率、回复连贯性
- **资源使用**token消耗、内存占用
### 8.2. 分析面板
**统计维度**
- **STM指标**:队列使用率、命中率、向量化效率
- **MTM指标**:事实提取成功率、冲突解决准确性、存储增长
- **LTM指标**:晋升率、图谱规模、推理查询性能
- **用户行为**:记忆访问模式、偏好变化趋势
- **系统性能**:处理延迟、资源消耗、错误率
## 🛣️ 9. 实施路线图
### 阶段一基础重构2周
- [ ] 重写STM为真正的内存队列
- [ ] 实现基础向量检索
- [ ] 添加异步处理框架
### 阶段二智能升级3周
- [ ] 实现双阶段MTM处理
- [ ] 集成事实提取和冲突解决
- [ ] 完善配置体系
### 阶段三性能优化2周
- [ ] 批量处理优化
- [ ] 缓存策略实现
- [ ] 错误处理完善
### 阶段四个性化增强2周
- [ ] 用户画像集成
- [ ] 动态参数调整
- [ ] 监控分析系统
## 🎯 10. 预期效果
基于Mem0的benchmark数据预期实现
- **准确性提升**比现有系统提高20-30%
- **延迟降低**检索延迟控制在200ms以内
- **成本节约**token使用量减少80%以上
- **用户体验**:记忆连贯性显著改善
---
*本设计文档融合了Mem0的先进理念与MMC项目的实际需求旨在构建一个真正生产级的智能记忆系统。*
## 🔍 11. MemU 架构分析与融合思路
### 11.1. MemU vs Mem0 设计哲学对比
在调研过程中,我们发现了另一个优秀的记忆框架 **MemU**,其设计理念为我们的架构提供了新的思路。
#### 核心差异分析
| 维度 | 当前架构 (基于Mem0) | MemU架构 | 优势对比 |
|------|------------------|---------|----------|
| **存储方式** | 向量数据库 + 结构化事实 | 文档化记忆 + 文件系统 | MemU: 上下文完整性Mem0: 精确检索 |
| **处理流程** | 双阶段:提取→冲突解决 | 代理驱动:自主决策 | MemU: 自适应性Mem0: 可控性 |
| **知识组织** | 层级晋升 (STM→MTM→LTM) | 网络化超链接 | MemU: 关联推理Mem0: 层次清晰 |
| **性能指标** | 26%提升 vs OpenAI | 92.09% Locomo准确率 | MemU: 更高准确率Mem0: 更低延迟 |
#### MemU 的关键创新
**记忆即文件系统 (Memory as File System)**
- **🗂️ 自主组织**Memory Agent 自动决定记录、修改、归档
- **🔗 智能链接**:自动创建记忆间的语义连接
- **🌱 持续演化**:离线时仍在分析和生成新见解
- **🧠 自适应遗忘**:基于使用模式的智能优先级调整
### 11.2. 融合架构设计
#### 混合存储策略
**两套存储系统并行工作:**
**文档存储系统借鉴MemU**
- 把相关记忆整理成完整的"文档",就像写日记一样
- 每个文档都有主题,比如"用户的饮食偏好"、"工作安排"等
- 文档之间可以互相引用,形成知识网络
**向量数据库保留Mem0**
- 把每个具体事实转换成数字向量存储
- 适合精确查找特定信息
- 作为文档系统的补充和备用方案
**工作模式:**
- 优先使用文档系统回答问题(速度快,上下文完整)
- 文档找不到时,降级使用向量搜索(精确度高)
- 两个系统的结果可以合并,给出更全面的答案
#### 增强架构流程
```mermaid
graph TD
A[新消息] --> B{增强STM}
B --> |实时混合检索| C[文档+向量上下文]
B --> |批量处理| D{智能MTM + Agent}
D --> |事实提取| E[候选事实]
E --> |文档化组织| F[Memory Agent]
F --> |自主决策| G{融合操作}
G --> |CREATE_DOC| H[文档化记忆]
G --> |LINK| I[建立超链接]
G --> |ADD_FACT| J[添加事实]
G --> |UPDATE| K[更新内容]
G --> |DELETE| L[删除冲突]
G --> |FORGET| M[自适应遗忘]
H --> N[(混合存储层)]
I --> N
J --> N
K --> N
N --> |文档检索| C
N --> |向量检索| C
N --> |网络推理| O[关联发现]
subgraph "文档层 (新增)"
H
I
O
end
subgraph "智能代理层 (新增)"
F
G
M
end
subgraph "混合存储层"
N
end
```
### 11.3. Memory Agent 实现方案
#### 智能记忆管家的工作方式
**Memory Agent就像一个聪明的图书管理员**
**日常工作流程:**
1. **收集新信息** - 接收从对话中提取的事实
2. **决定存储方式** - 判断是创建新文档还是添加到现有文档
3. **建立连接** - 发现新信息与已有记忆的关联关系
4. **智能操作** - 自主决定采取什么行动
**后台整理工作:**
- **分析使用模式** - 观察哪些记忆经常被访问
- **生成新见解** - 通过分析现有记忆发现新的规律和联系
- **调整优先级** - 把重要的记忆放在容易找到的地方
- **智能遗忘** - 让不重要的记忆逐渐淡化
#### 操作类型扩展
**在原有Mem0的四种基础操作基础上新增五种智能操作**
**原有操作(保留):**
- **添加** - 加入全新的记忆事实
- **更新** - 修改现有记忆内容
- **删除** - 移除矛盾或错误的记忆
- **无操作** - 信息重复或无价值时不做处理
**新增操作MemU启发**
- **文档化** - 将相关事实整理成主题文档
- **建立链接** - 在相关记忆之间建立引用关系
- **重新组织** - 调整记忆的分类和结构
- **自适应遗忘** - 根据重要性和使用频率淡化记忆
- **合成见解** - 通过分析多个记忆生成新的理解
### 11.4. 性能优化策略
#### 批量处理的智慧
**学习MemU的"批量处理"思路:**
**为什么要批量处理?**
- 就像洗衣服,积攒一堆一起洗比每件衣服单独洗要高效
- 减少LLM调用次数大幅降低成本
- 一次处理长对话比如8000个token比多次处理短对话效果更好
**什么时候触发批量处理?**
- 对话积累到一定长度时比如50条消息
- 检测到对话主题发生转换时
- 用户长时间不活跃时进行后台整理
#### 混合检索的策略
**四步检索法,确保既快又准:**
1. **文档优先搜索** - 先在整理好的文档中找答案
- 优势:速度快,上下文完整
- 适用:大部分常见问题都能解决
2. **向量精确搜索** - 文档找不到时的备用方案
- 优势:精确度高,能找到细节
- 适用:特定事实查询
3. **关联扩展搜索** - 通过记忆之间的链接发现更多相关信息
- 优势:能发现间接相关的内容
- 适用:需要推理的复杂问题
4. **智能结果合并** - 把不同来源的结果整合排序
- 按相关性、时效性、重要性综合排序
- 去除重复信息,提供最佳答案
### 11.5. 实施路线图更新
#### 第一步MemU融合试验 (2周)
**目标:验证混合架构的可行性**
- [ ] 搭建文档存储系统,让记忆能以文档形式保存
- [ ] 开发基础版智能记忆管家,能自动决定记忆操作
- [ ] 建立双重检索机制,文档+向量并行工作
- [ ] 对比新旧方案的准确性和速度
#### 第二步:智能化能力提升 (2周)
**目标:让记忆系统真正"聪明"起来**
- [ ] 让记忆管家能自动整理和分类记忆
- [ ] 建立记忆间的智能连接网络
- [ ] 实现智能遗忘,重要的记住,不重要的淡化
- [ ] 优化批量处理,降低运行成本
#### 第三步:效果验证与调优 (1周)
**目标:确认融合架构达到预期效果**
- [ ] 用标准测试验证记忆准确性
- [ ] 向MemU的92%准确率目标靠拢
- [ ] 验证成本是否真的降低了
- [ ] 根据测试结果调整各部分的权重
### 11.6. 预期效果提升
基于MemU的融合设计预期在原有基础上进一步提升
- **准确性提升**从当前20-30%提升目标提高到40-50%朝着MemU的92%努力)
- **上下文完整性**:通过文档化存储显著改善
- **关联推理能力**:通过超链接网络大幅增强
- **自适应性**通过Memory Agent实现真正的智能化记忆管理
- **成本效率**:通过批量处理和文档检索进一步降低
### 11.7. 风险与挑战
#### 主要担心的问题
**系统变复杂了:**
- 原来只有一套存储,现在要维护两套(文档+向量)
- 增加了智能代理,需要更多的协调和管理
- 新功能多了,出问题的可能性也增加了
**数据一致性问题:**
- 文档存储和向量存储的信息可能不同步
- 记忆管家的自动操作可能产生意外结果
- 不同存储系统的更新时间可能不一致
**性能可能受影响:**
- 文档生成需要时间,可能增加响应延迟
- 混合检索比单一检索复杂,耗时可能更长
- 智能代理的后台处理可能占用资源
#### 怎么降低风险
**稳妥推进:**
- 保留原有的向量检索作为保底方案
- 新功能先小范围测试,确认没问题再全面推广
- 随时可以回退到简单的架构
**持续监控:**
- 实时观察系统的准确性、速度、成本变化
- 发现问题及时调整参数或策略
- 定期对比新旧方案的效果
---
*本设计文档现已融合了Mem0与MemU的先进理念旨在构建一个更加智能、高效、自适应的记忆系统。通过混合架构和智能代理的引入我们期望能够实现真正的生产级AI记忆管理。*

View File

@@ -644,7 +644,7 @@ class HeartFChatting:
else:
logger.warning(f"{self.log_prefix} 预生成的回复任务未生成有效内容")
action_message = message_data or target_message
action_message = target_message or message_data
if action_type == "reply":
# 等待回复生成完毕
if self.loop_mode == ChatMode.NORMAL:

View File

@@ -98,7 +98,7 @@ class MainSystem:
from random import choices
# 分离彩蛋和权重
egg_texts, weights = zip(*phrases, strict=False)
egg_texts, weights = zip(*phrases, strict=True)
# 使用choices进行带权重的随机选择
selected_egg = choices(egg_texts, weights=weights, k=1)

View File

@@ -238,17 +238,87 @@ class QZoneService:
hash_val += (hash_val << 5) + ord(char)
return str(hash_val & 2147483647)
async def _get_api_client(self, qq_account: str, stream_id: Optional[str]) -> Optional[Dict]:
cookies = await self.cookie_service.get_cookies(qq_account, stream_id)
if not cookies:
async def _renew_and_load_cookies(self, qq_account: str, stream_id: Optional[str]) -> Optional[Dict[str, str]]:
cookie_dir = Path(__file__).resolve().parent.parent / "cookies"
cookie_dir.mkdir(exist_ok=True)
cookie_file_path = cookie_dir / f"cookies-{qq_account}.json"
try:
# 使用HTTP服务器方式获取Cookie
host = self.get_config("cookie.http_fallback_host", "127.0.0.1")
port = self.get_config("cookie.http_fallback_port", "8080")
napcat_token = self.get_config("plugin.napcat_token", "")
cookie_data = await self._fetch_cookies_http(host, port, napcat_token)
if cookie_data and "cookies" in cookie_data:
cookie_str = cookie_data["cookies"]
parsed_cookies = {k.strip(): v.strip() for k, v in (p.split('=', 1) for p in cookie_str.split('; ') if '=' in p)}
with open(cookie_file_path, "w", encoding="utf-8") as f:
json.dump(parsed_cookies, f)
logger.info(f"Cookie已更新并保存至: {cookie_file_path}")
return parsed_cookies
# 如果HTTP获取失败尝试读取本地文件
if cookie_file_path.exists():
with open(cookie_file_path, "r", encoding="utf-8") as f:
return json.load(f)
return None
except Exception as e:
logger.error(f"更新或加载Cookie时发生异常: {e}")
return None
p_skey = cookies.get("p_skey") or cookies.get("p_skey".upper())
if not p_skey:
return None
async def _fetch_cookies_http(self, host: str, port: str, napcat_token: str) -> Optional[Dict]:
"""通过HTTP服务器获取Cookie"""
url = f"http://{host}:{port}/get_cookies"
max_retries = 5
retry_delay = 1
for attempt in range(max_retries):
try:
headers = {"Content-Type": "application/json"}
if napcat_token:
headers["Authorization"] = f"Bearer {napcat_token}"
payload = {"domain": "user.qzone.qq.com"}
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30.0)) as session:
async with session.post(url, json=payload, headers=headers) as resp:
resp.raise_for_status()
if resp.status != 200:
error_msg = f"Napcat服务返回错误状态码: {resp.status}"
if resp.status == 403:
error_msg += " (Token验证失败)"
raise RuntimeError(error_msg)
data = await resp.json()
if data.get("status") != "ok" or "cookies" not in data.get("data", {}):
raise RuntimeError(f"获取 cookie 失败: {data}")
return data["data"]
except aiohttp.ClientError as e:
if attempt < max_retries - 1:
logger.warning(f"无法连接到Napcat服务(尝试 {attempt + 1}/{max_retries}): {url},错误: {str(e)}")
await asyncio.sleep(retry_delay)
retry_delay *= 2
continue
logger.error(f"无法连接到Napcat服务(最终尝试): {url},错误: {str(e)}")
raise RuntimeError(f"无法连接到Napcat服务: {url}")
except Exception as e:
logger.error(f"获取cookie异常: {str(e)}")
raise
raise RuntimeError(f"无法连接到Napcat服务: 超过最大重试次数({max_retries})")
async def _get_api_client(self, qq_account: str, stream_id: Optional[str]) -> Optional[Dict]:
cookies = await self._renew_and_load_cookies(qq_account, stream_id)
if not cookies: return None
p_skey = cookies.get('p_skey') or cookies.get('p_skey'.upper())
if not p_skey: return None
gtk = self._generate_gtk(p_skey)
uin = cookies.get("uin", "").lstrip("o")
uin = cookies.get('uin', '').lstrip('o')
async def _request(method, url, params=None, data=None, headers=None):
final_headers = {"referer": f"https://user.qzone.qq.com/{uin}", "origin": "https://user.qzone.qq.com"}
@@ -421,64 +491,90 @@ class QZoneService:
"""监控好友动态"""
try:
params = {
"uin": uin,
"scope": 0,
"view": 1,
"filter": "all",
"flag": 1,
"applist": "all",
"pagenum": 1,
"count": num,
"format": "json",
"g_tk": gtk,
"useutf8": 1,
"outputhtmlfeed": 1,
"uin": uin, "scope": 0, "view": 1, "filter": "all", "flag": 1,
"applist": "all", "pagenum": 1, "count": num, "format": "json",
"g_tk": gtk, "useutf8": 1, "outputhtmlfeed": 1
}
res_text = await _request("GET", self.ZONE_LIST_URL, params=params)
# 增加对返回内容的校验
if res_text.startswith("_Callback("):
# 兼容旧版jsonp格式
json_str = res_text[len("_Callback(") : -2]
# 处理不同的响应格式
json_str = ""
# 使用strip()处理可能存在的前后空白字符
stripped_res_text = res_text.strip()
if stripped_res_text.startswith('_Callback(') and stripped_res_text.endswith(');'):
# JSONP格式
json_str = stripped_res_text[len('_Callback('):-2]
elif stripped_res_text.startswith('{') and stripped_res_text.endswith('}'):
# 直接JSON格式
json_str = stripped_res_text
else:
# 兼容新版纯json格式
json_str = res_text
logger.warning(f"意外的响应格式: {res_text[:100]}...")
return []
# 清理和标准化JSON字符串
json_str = json_str.replace('undefined', 'null').strip()
try:
# 替换 undefined 为 null
json_data = json5.loads(json_str.replace("undefined", "null"))
except Exception:
logger.warning(f"监控好友动态返回格式异常: {res_text}")
json_data = json5.loads(json_str)
# 检查API返回的错误码
if json_data.get('code') != 0:
error_code = json_data.get('code')
error_msg = json_data.get('message', '未知错误')
logger.warning(f"QQ空间API返回错误: code={error_code}, message={error_msg}")
return []
except Exception as parse_error:
logger.error(f"JSON解析失败: {parse_error}, 原始数据: {json_str[:200]}...")
return []
feeds_data = []
if isinstance(json_data, dict):
data_level1 = json_data.get("data")
data_level1 = json_data.get('data')
if isinstance(data_level1, dict):
feeds_data = data_level1.get("data", [])
feeds_data = data_level1.get('data', [])
feeds_list = []
for feed in feeds_data:
if feed is None:
continue
if str(feed.get("appid", "")) != "311" or str(feed.get("uin", "")) == str(uin):
if not feed: continue
# 过滤非说说动态
if str(feed.get('appid', '')) != '311':
continue
html_content = feed.get("html", "")
soup = bs4.BeautifulSoup(html_content, "html.parser")
like_btn = soup.find("a", class_="qz_like_btn_v3")
if isinstance(like_btn, bs4.element.Tag) and like_btn.get("data-islike") == "1":
target_qq = str(feed.get('uin', ''))
tid = feed.get('key', '')
if not target_qq or not tid:
continue
text_div = soup.find("div", class_="f-info")
# 跳过自己的说说(监控是看好友的)
if target_qq == str(uin):
continue
html_content = feed.get('html', '')
if not html_content:
continue
soup = bs4.BeautifulSoup(html_content, 'html.parser')
# 通过点赞状态判断是否已读/处理过
like_btn = soup.find('a', class_='qz_like_btn_v3')
is_liked = False
if like_btn:
is_liked = like_btn.get('data-islike') == '1'
if is_liked:
continue # 如果已经点赞过,说明是已处理的说说,跳过
# 提取内容
text_div = soup.find('div', class_='f-info')
text = text_div.get_text(strip=True) if text_div else ""
feeds_list.append(
{
"target_qq": feed.get("uin"),
"tid": feed.get("key"),
"content": text,
}
)
feeds_list.append({
'target_qq': target_qq,
'tid': tid,
'content': text,
})
logger.info(f"监控任务发现 {len(feeds_list)} 条未处理的新说说。")
return feeds_list
except Exception as e:
logger.error(f"监控好友动态失败: {e}", exc_info=True)