diff --git a/MEMORY_PROFILING.md b/MEMORY_PROFILING.md new file mode 100644 index 000000000..a52762b8c --- /dev/null +++ b/MEMORY_PROFILING.md @@ -0,0 +1,471 @@ +# Bot 内存分析工具使用指南 + +一个统一的内存诊断工具,提供进程监控、对象分析和数据可视化功能。 + +## 🚀 快速开始 + +> **提示**: 建议使用虚拟环境运行脚本(`.\.venv\Scripts\python.exe`) + +```powershell +# 查看帮助 +.\.venv\Scripts\python.exe scripts/memory_profiler.py --help + +# 进程监控模式(最简单) +.\.venv\Scripts\python.exe scripts/memory_profiler.py --monitor + +# 对象分析模式(深度分析) +.\.venv\Scripts\python.exe scripts/memory_profiler.py --objects --output memory_data.txt + +# 可视化模式(生成图表) +.\.venv\Scripts\python.exe scripts/memory_profiler.py --visualize --input memory_data.txt.jsonl +``` + +**或者使用简短命令**(如果你的系统 `python` 已指向虚拟环境): + +```powershell +python scripts/memory_profiler.py --monitor +``` + +## 📦 依赖安装 + +```powershell +# 基础功能(进程监控) +pip install psutil + +# 对象分析功能 +pip install pympler + +# 可视化功能 +pip install matplotlib + +# 一次性安装全部 +pip install psutil pympler matplotlib +``` + +## 🔧 三种模式详解 + +### 1. 进程监控模式 (--monitor) + +**用途**: 从外部监控 bot 进程的总内存、子进程情况 + +**特点**: +- ✅ 自动启动 bot.py(使用虚拟环境) +- ✅ 实时显示进程内存(RSS、VMS) +- ✅ 列出所有子进程及其内存占用 +- ✅ 显示 bot 输出日志 +- ✅ 自动保存监控历史 + +**使用示例**: + +```powershell +# 基础用法 +python scripts/memory_profiler.py --monitor + +# 自定义监控间隔(10秒) +python scripts/memory_profiler.py --monitor --interval 10 + +# 简写 +python scripts/memory_profiler.py -m -i 5 +``` + +**输出示例**: + +``` +================================================================================ +检查点 #1 - 14:23:15 +Bot 进程 (PID: 12345) + RSS: 45.82 MB + VMS: 12.34 MB + 占比: 0.25% + 子进程: 2 个 + 子进程内存: 723.64 MB + 总内存: 769.46 MB + + 📋 子进程详情: + [1] PID 12346: python.exe - 520.15 MB + 命令: python.exe -m chromadb.server ... + [2] PID 12347: python.exe - 203.49 MB + 命令: python.exe -m uvicorn ... +================================================================================ +``` + +**保存位置**: `data/memory_diagnostics/process_monitor__pid.txt` + +--- + +### 2. 对象分析模式 (--objects) + +**用途**: 在 bot 进程内部统计所有 Python 对象的内存占用 + +**特点**: +- ✅ 统计所有对象类型(dict、list、str、AsyncOpenAI 等) +- ✅ **按模块统计内存占用(新增)** - 显示哪个模块占用最多内存 +- ✅ 包含所有线程的对象 +- ✅ 显示对象变化(diff) +- ✅ 线程信息和 GC 统计 +- ✅ 保存 JSONL 数据用于可视化 + +**使用示例**: + +```powershell +# 基础用法(推荐指定输出文件) +python scripts/memory_profiler.py --objects --output memory_data.txt + +# 自定义参数 +python scripts/memory_profiler.py --objects \ + --interval 10 \ + --output memory_data.txt \ + --object-limit 30 + +# 简写 +python scripts/memory_profiler.py -o -i 10 --output data.txt -l 30 +``` + +**输出示例**: + +``` +================================================================================ +🔍 对象级内存分析 #1 - 14:25:30 +================================================================================ + +📦 对象统计 (前 20 个类型): + +类型 数量 总大小 +-------------------------------------------------------------------------------- + 125,843 45.23 MB + 234,567 23.45 MB + 56,789 12.34 MB + 89,012 8.90 MB + 12 5.67 MB +... + +📚 模块内存占用 (前 20 个模块): + +模块名 对象数 总内存 +-------------------------------------------------------------------------------- +builtins 169,144 26.20 MB +src 12,345 5.67 MB +openai 3,456 2.34 MB +chromadb 2,345 1.89 MB +... + + 总模块数: 85 + +🧵 线程信息 (8 个): + [1] ✓ MainThread + [2] ✓ AsyncOpenAIClient (守护) + [3] ✓ ChromaDBWorker (守护) + ... + +🗑️ 垃圾回收: + 代 0: 1,234 次 + 代 1: 56 次 + 代 2: 3 次 + 追踪对象: 456,789 + +📊 总对象数: 567,890 +================================================================================ +``` + +**每 3 次迭代会显示对象变化**: + +``` +📈 对象变化分析: +-------------------------------------------------------------------------------- + types | # objects | total size +==================== | =========== | ============ + | +1234 | +1.23 MB + | +567 | +0.56 MB +... +-------------------------------------------------------------------------------- +``` + +**保存位置**: +- 文本: `.txt` +- 结构化数据: `.txt.jsonl` + +--- + +### 3. 可视化模式 (--visualize) + +**用途**: 将对象分析模式生成的 JSONL 数据绘制成图表 + +**特点**: +- ✅ 显示对象类型随时间的内存变化 +- ✅ 自动选择内存占用最高的 N 个类型 +- ✅ 生成高清 PNG 图表 + +**使用示例**: + +```powershell +# 基础用法 +python scripts/memory_profiler.py --visualize \ + --input memory_data.txt.jsonl + +# 自定义参数 +python scripts/memory_profiler.py --visualize \ + --input memory_data.txt.jsonl \ + --top 15 \ + --plot-output my_plot.png + +# 简写 +python scripts/memory_profiler.py -v -i data.txt.jsonl -t 15 +``` + +**输出**: PNG 图像,展示前 N 个对象类型的内存占用随时间的变化曲线 + +**保存位置**: 默认 `memory_analysis_plot.png`,可通过 `--plot-output` 指定 + +--- + +## 💡 使用场景 + +| 场景 | 推荐模式 | 命令 | +|------|----------|------| +| 快速查看总内存 | `--monitor` | `python scripts/memory_profiler.py -m` | +| 查看子进程占用 | `--monitor` | `python scripts/memory_profiler.py -m` | +| 分析具体对象占用 | `--objects` | `python scripts/memory_profiler.py -o --output data.txt` | +| 追踪内存泄漏 | `--objects` | `python scripts/memory_profiler.py -o --output data.txt` | +| 可视化分析趋势 | `--visualize` | `python scripts/memory_profiler.py -v -i data.txt.jsonl` | + +## 📊 完整工作流程 + +### 场景 1: 快速诊断内存问题 + +```powershell +# 1. 运行进程监控(查看总体情况) +python scripts/memory_profiler.py --monitor --interval 5 + +# 观察输出,如果发现内存异常,进入场景 2 +``` + +### 场景 2: 深度分析对象占用 + +```powershell +# 1. 启动对象分析(保存数据) +python scripts/memory_profiler.py --objects \ + --interval 10 \ + --output data/memory_diagnostics/analysis_$(Get-Date -Format 'yyyyMMdd_HHmmss').txt + +# 2. 运行一段时间(建议至少 5-10 分钟),按 Ctrl+C 停止 + +# 3. 生成可视化图表 +python scripts/memory_profiler.py --visualize \ + --input data/memory_diagnostics/analysis_.txt.jsonl \ + --top 15 \ + --plot-output data/memory_diagnostics/plot_.png + +# 4. 查看图表,分析哪些对象类型随时间增长 +``` + +### 场景 3: 持续监控 + +```powershell +# 在后台运行对象分析(Windows) +Start-Process powershell -ArgumentList "-Command", "python scripts/memory_profiler.py -o -i 30 --output logs/memory_continuous.txt" -WindowStyle Minimized + +# 定期查看 JSONL 并生成图表 +python scripts/memory_profiler.py -v -i logs/memory_continuous.txt.jsonl -t 20 +``` + +## 🎯 参数参考 + +### 通用参数 + +| 参数 | 简写 | 默认值 | 说明 | +|------|------|--------|------| +| `--interval` | `-i` | 10 | 监控间隔(秒) | + +### 对象分析模式参数 + +| 参数 | 简写 | 默认值 | 说明 | +|------|------|--------|------| +| `--output` | - | 无 | 输出文件路径(强烈推荐) | +| `--object-limit` | `-l` | 20 | 显示的对象类型数量 | + +### 可视化模式参数 + +| 参数 | 简写 | 默认值 | 说明 | +|------|------|--------|------| +| `--input` | - | **必需** | 输入 JSONL 文件路径 | +| `--top` | `-t` | 10 | 展示前 N 个对象类型 | +| `--plot-output` | - | `memory_analysis_plot.png` | 输出图表路径 | + +## ⚠️ 注意事项 + +### 性能影响 + +| 模式 | 性能影响 | 说明 | +|------|----------|------| +| `--monitor` | < 1% | 几乎无影响,适合生产环境 | +| `--objects` | 5-15% | 有一定影响,建议在测试环境使用 | +| `--visualize` | 0% | 离线分析,无影响 | + +### 常见问题 + +**Q: 对象分析模式报错 "pympler 未安装"?** +```powershell +pip install pympler +``` + +**Q: 可视化模式报错 "matplotlib 未安装"?** +```powershell +pip install matplotlib +``` + +**Q: 对象分析模式提示 "bot.py 未找到 main_async() 或 main() 函数"?** + +这是正常的。如果你的 bot.py 的主逻辑在 `if __name__ == "__main__":` 中,监控线程仍会在后台运行。你可以: +- 保持 bot 运行,监控会持续统计 +- 或者在 bot.py 中添加一个 `main_async()` 或 `main()` 函数 + +**Q: 进程监控模式看不到子进程?** + +确保 bot.py 已经启动了子进程(例如 ChromaDB)。如果刚启动就查看,可能还没有创建子进程。 + +**Q: JSONL 文件在哪里?** + +当你使用 `--output ` 时,会生成: +- ``: 人类可读的文本 +- `.jsonl`: 结构化数据(用于可视化) + +## 📁 输出文件说明 + +### 进程监控输出 + +**位置**: `data/memory_diagnostics/process_monitor__pid.txt` + +**内容**: 每次检查点的进程内存信息 + +### 对象分析输出 + +**文本文件**: `` +- 人类可读格式 +- 包含每次迭代的对象统计 + +**JSONL 文件**: `.jsonl` +- 每行一个 JSON 对象 +- 包含: timestamp, iteration, total_objects, summary, threads, gc_stats +- 用于可视化分析 + +### 可视化输出 + +**PNG 图像**: 默认 `memory_analysis_plot.png` +- 折线图,展示对象类型随时间的内存变化 +- 高清 150 DPI + +## 🔍 诊断技巧 + +### 1. 识别内存泄漏 + +使用对象分析模式运行较长时间,观察: +- 某个对象类型的数量或大小持续增长 +- 对象变化 diff 中始终为正数 + +### 2. 定位大内存对象 + +**查看对象统计**: +- 如果 `` 占用很大,可能是缓存未清理 +- 如果看到特定类(如 `AsyncOpenAI`),检查该类的实例数 + +**查看模块统计**(推荐): +- 查看 📚 模块内存占用部分 +- 如果 `src` 模块占用很大,说明你的代码中有大量对象 +- 如果 `openai`、`chromadb` 等第三方模块占用大,可能是这些库的使用问题 +- 对比不同时间点,看哪个模块的内存持续增长 + +### 3. 分析子进程占用 + +使用进程监控模式: +- 查看子进程详情中的命令行 +- 识别哪个子进程占用大量内存(如 ChromaDB) + +### 4. 对比不同时间点 + +使用可视化模式: +- 生成图表后,观察哪些对象类型的曲线持续上升 +- 对比不同功能运行时的内存变化 + +## 🎓 高级用法 + +### 长期监控脚本 + +创建 `monitor_continuously.ps1`: + +```powershell +# 持续监控脚本 +$timestamp = Get-Date -Format "yyyyMMdd_HHmmss" +$logPath = "logs/memory_analysis_$timestamp.txt" + +Write-Host "开始持续监控,数据保存到: $logPath" +Write-Host "按 Ctrl+C 停止监控" + +python scripts/memory_profiler.py --objects --interval 30 --output $logPath +``` + +### 自动生成日报 + +创建 `generate_daily_report.ps1`: + +```powershell +# 生成内存分析日报 +$date = Get-Date -Format "yyyyMMdd" +$jsonlFiles = Get-ChildItem "logs" -Filter "*$date*.jsonl" + +foreach ($file in $jsonlFiles) { + $outputPlot = $file.FullName -replace ".jsonl", "_plot.png" + python scripts/memory_profiler.py --visualize --input $file.FullName --plot-output $outputPlot --top 20 + Write-Host "生成图表: $outputPlot" +} +``` + +## 📚 扩展阅读 + +- **Python 内存管理**: https://docs.python.org/3/c-api/memory.html +- **psutil 文档**: https://psutil.readthedocs.io/ +- **Pympler 文档**: https://pympler.readthedocs.io/ +- **Matplotlib 文档**: https://matplotlib.org/ + +## 🆘 获取帮助 + +```powershell +# 查看完整帮助信息 +python scripts/memory_profiler.py --help + +# 查看特定模式示例 +python scripts/memory_profiler.py --help | Select-String "示例" +``` + +--- + +**快速开始提醒**: + +```powershell +# 使用虚拟环境(推荐) +.\.venv\Scripts\python.exe scripts/memory_profiler.py --monitor + +# 或者使用系统 Python +python scripts/memory_profiler.py --monitor + +# 深度分析 +.\.venv\Scripts\python.exe scripts/memory_profiler.py --objects --output memory.txt + +# 可视化 +.\.venv\Scripts\python.exe scripts/memory_profiler.py --visualize --input memory.txt.jsonl +``` + +### 💡 虚拟环境说明 + +**Windows**: +```powershell +.\.venv\Scripts\python.exe scripts/memory_profiler.py [选项] +``` + +**Linux/Mac**: +```bash +./.venv/bin/python scripts/memory_profiler.py [选项] +``` + +脚本会自动检测并使用项目虚拟环境来启动 bot(进程监控模式),对象分析模式会自动添加项目根目录到 Python 路径。 + +🎉 现在你已经掌握了完整的内存分析工具! diff --git a/docs/guides/OBJECT_LEVEL_MEMORY_ANALYSIS.md b/docs/guides/OBJECT_LEVEL_MEMORY_ANALYSIS.md new file mode 100644 index 000000000..c7aad3ffc --- /dev/null +++ b/docs/guides/OBJECT_LEVEL_MEMORY_ANALYSIS.md @@ -0,0 +1,267 @@ +# 对象级内存分析指南 + +## 🎯 概述 + +对象级内存分析可以帮助你: +- 查看哪些 Python 对象类型占用最多内存 +- 追踪对象数量和大小的变化 +- 识别内存泄漏的具体对象 +- 监控垃圾回收效率 + +## 🚀 快速开始 + +### 1. 安装依赖 + +```powershell +pip install pympler +``` + +### 2. 启用对象级分析 + +```powershell +# 基本用法 - 启用对象分析 +python scripts/run_bot_with_tracking.py --objects + +# 自定义监控间隔(10 秒) +python scripts/run_bot_with_tracking.py --objects --interval 10 + +# 显示更多对象类型(前 20 个) +python scripts/run_bot_with_tracking.py --objects --object-limit 20 + +# 完整示例(简写参数) +python scripts/run_bot_with_tracking.py -o -i 10 -l 20 +``` + +## 📊 输出示例 + +### 进程级信息 + +``` +================================================================================ +检查点 #1 - 12:34:56 +Bot 进程 (PID: 12345) + RSS: 45.23 MB + VMS: 125.45 MB + 占比: 0.35% + 子进程: 1 个 + 子进程内存: 32.10 MB + 总内存: 77.33 MB + +变化: + RSS: +2.15 MB +``` + +### 对象级分析信息 + +``` +📦 对象级内存分析 (检查点 #1) +-------------------------------------------------------------------------------- +类型 数量 总大小 +-------------------------------------------------------------------------------- +dict 12,345 15.23 MB +str 45,678 8.92 MB +list 8,901 5.67 MB +tuple 23,456 4.32 MB +type 1,234 3.21 MB +code 2,345 2.10 MB +set 1,567 1.85 MB +function 3,456 1.23 MB +method 4,567 890.45 KB +weakref 2,345 678.12 KB + +🗑️ 垃圾回收统计: + - 代 0 回收: 125 次 + - 代 1 回收: 12 次 + - 代 2 回收: 2 次 + - 未回收对象: 0 + - 追踪对象数: 89,456 + +📊 总对象数: 123,456 +-------------------------------------------------------------------------------- +``` + +## 🔍 如何解读输出 + +### 1. 对象类型统计 + +每一行显示: +- **类型名称**: Python 对象类型(dict、str、list 等) +- **数量**: 该类型的对象实例数量 +- **总大小**: 该类型所有对象占用的总内存 + +**关键指标**: +- `dict` 多是正常的(Python 大量使用字典) +- `str` 多也是正常的(字符串无处不在) +- 如果看到某个自定义类型数量异常增长 → 可能存在泄漏 +- 如果某个类型占用内存异常大 → 需要优化 + +### 2. 垃圾回收统计 + +**代 0/1/2 回收次数**: +- 代 0:最频繁,新创建的对象 +- 代 1:中等频率,存活一段时间的对象 +- 代 2:最少,长期存活的对象 + +**未回收对象**: +- 应该是 0 或很小的数字 +- 如果持续增长 → 可能存在循环引用导致的内存泄漏 + +**追踪对象数**: +- Python 垃圾回收器追踪的对象总数 +- 持续增长可能表示内存泄漏 + +### 3. 总对象数 + +当前进程中所有 Python 对象的数量。 + +## 🎯 常见使用场景 + +### 场景 1: 查找内存泄漏 + +```powershell +# 长时间运行,频繁检查 +python scripts/run_bot_with_tracking.py -o -i 5 +``` + +**观察**: +- 哪些对象类型数量持续增长? +- RSS 内存增长和对象数量增长是否一致? +- 垃圾回收是否正常工作? + +### 场景 2: 优化内存占用 + +```powershell +# 较长间隔,查看稳定状态 +python scripts/run_bot_with_tracking.py -o -i 30 -l 25 +``` + +**分析**: +- 前 25 个对象类型中,哪些是你的代码创建的? +- 是否有不必要的大对象缓存? +- 能否使用更轻量的数据结构? + +### 场景 3: 调试特定功能 + +```powershell +# 短间隔,快速反馈 +python scripts/run_bot_with_tracking.py -o -i 3 +``` + +**用途**: +- 触发某个功能后立即观察内存变化 +- 检查对象是否正确释放 +- 验证优化效果 + +## 📝 保存的历史文件 + +监控结束后,历史数据会自动保存到: +``` +data/memory_diagnostics/bot_memory_monitor_YYYYMMDD_HHMMSS_pidXXXXX.txt +``` + +文件内容包括: +- 每个检查点的进程内存信息 +- 每个检查点的对象统计(前 10 个类型) +- 总体统计信息(起始/结束/峰值/平均) + +## 🔧 高级技巧 + +### 1. 结合代码修改 + +在你的代码中添加检查点: + +```python +import gc +from pympler import muppy, summary + +def debug_memory(): + """在关键位置调用此函数""" + gc.collect() + all_objects = muppy.get_objects() + sum_data = summary.summarize(all_objects) + summary.print_(sum_data, limit=10) +``` + +### 2. 比较不同时间点 + +```powershell +# 运行 1 分钟 +python scripts/run_bot_with_tracking.py -o -i 10 +# Ctrl+C 停止,查看文件 + +# 等待 5 分钟后再运行 +python scripts/run_bot_with_tracking.py -o -i 10 +# 比较两次的对象统计 +``` + +### 3. 专注特定对象类型 + +修改 `run_bot_with_tracking.py` 中的 `get_object_stats()` 函数,添加过滤: + +```python +def get_object_stats(limit: int = 10) -> Dict: + # ...现有代码... + + # 只显示特定类型 + filtered_summary = [ + row for row in sum_data + if 'YourClassName' in row[0] + ] + + return { + "summary": filtered_summary[:limit], + # ... + } +``` + +## ⚠️ 注意事项 + +### 性能影响 + +对象级分析会影响性能: +- **pympler 分析**: ~10-20% 性能影响 +- **gc.collect()**: 每次检查点触发垃圾回收,可能导致短暂卡顿 + +**建议**: +- 开发/调试时使用对象分析 +- 生产环境使用普通监控(不加 `--objects`) + +### 内存开销 + +对象分析本身也会占用内存: +- `muppy.get_objects()` 会创建对象列表 +- 统计数据会保存在历史中 + +**建议**: +- 不要设置过小的 `--interval`(建议 >= 5 秒) +- 长时间运行时考虑关闭对象分析 + +### 准确性 + +- 对象统计是**快照**,不是实时的 +- `gc.collect()` 后才统计,确保垃圾已回收 +- 子进程的对象无法统计(只统计主进程) + +## 📚 相关工具 + +| 工具 | 用途 | 对象级分析 | +|------|------|----------| +| `run_bot_with_tracking.py` | 一键启动+监控 | ✅ 支持 | +| `memory_monitor.py` | 手动监控 | ✅ 支持 | +| `windows_memory_profiler.py` | 详细分析 | ✅ 支持 | +| `run_bot_with_pympler.py` | 专门的对象追踪 | ✅ 专注此功能 | + +## 🎓 学习资源 + +- [Pympler 文档](https://pympler.readthedocs.io/) +- [Python GC 模块](https://docs.python.org/3/library/gc.html) +- [内存泄漏调试技巧](https://docs.python.org/3/library/tracemalloc.html) + +--- + +**快速开始**: +```powershell +pip install pympler +python scripts/run_bot_with_tracking.py --objects +``` +🎉 diff --git a/scripts/memory_profiler.py b/scripts/memory_profiler.py new file mode 100644 index 000000000..13e0d102c --- /dev/null +++ b/scripts/memory_profiler.py @@ -0,0 +1,757 @@ +#!/usr/bin/env python3 +""" +统一内存分析工具 - Bot 内存诊断完整解决方案 + +支持三种模式: + 1. 进程监控模式 (--monitor): 从外部监控 bot 进程内存、子进程 + 2. 对象分析模式 (--objects): 在 bot 内部统计所有对象(包括所有线程) + 3. 可视化模式 (--visualize): 将 JSONL 数据绘制成图表 + +示例: + # 进程监控(启动 bot 并监控) + python scripts/memory_profiler.py --monitor --interval 10 + + # 对象分析(深度对象统计) + python scripts/memory_profiler.py --objects --interval 10 --output memory_data.txt + + # 生成可视化图表 + python scripts/memory_profiler.py --visualize --input memory_data.txt.jsonl --top 15 +""" + +import argparse +import asyncio +import gc +import json +import os +import subprocess +import sys +import threading +import time +from collections import defaultdict +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional + +import psutil + +try: + from pympler import muppy, summary, tracker + PYMPLER_AVAILABLE = True +except ImportError: + PYMPLER_AVAILABLE = False + +try: + import matplotlib.pyplot as plt + MATPLOTLIB_AVAILABLE = True +except ImportError: + MATPLOTLIB_AVAILABLE = False + + +# ============================================================================ +# 进程监控模式 +# ============================================================================ + +async def monitor_bot_process(bot_process: subprocess.Popen, interval: int = 5): + """从外部监控 bot 进程的内存使用(进程级)""" + if bot_process.pid is None: + print("❌ Bot 进程 PID 为空") + return + + print(f"🔍 开始监控 Bot 内存(PID: {bot_process.pid})") + print(f"监控间隔: {interval} 秒") + print("按 Ctrl+C 停止监控和 Bot\n") + + try: + process = psutil.Process(bot_process.pid) + except psutil.NoSuchProcess: + print("❌ 无法找到 Bot 进程") + return + + history = [] + iteration = 0 + + try: + while bot_process.poll() is None: + try: + mem_info = process.memory_info() + mem_percent = process.memory_percent() + + children = process.children(recursive=True) + children_mem = sum(child.memory_info().rss for child in children) + + info = { + "timestamp": time.strftime("%H:%M:%S"), + "rss_mb": mem_info.rss / 1024 / 1024, + "vms_mb": mem_info.vms / 1024 / 1024, + "percent": mem_percent, + "children_count": len(children), + "children_mem_mb": children_mem / 1024 / 1024, + } + + history.append(info) + iteration += 1 + + print(f"{'=' * 80}") + print(f"检查点 #{iteration} - {info['timestamp']}") + print(f"Bot 进程 (PID: {bot_process.pid})") + print(f" RSS: {info['rss_mb']:.2f} MB") + print(f" VMS: {info['vms_mb']:.2f} MB") + print(f" 占比: {info['percent']:.2f}%") + + if children: + print(f" 子进程: {info['children_count']} 个") + print(f" 子进程内存: {info['children_mem_mb']:.2f} MB") + total_mem = info['rss_mb'] + info['children_mem_mb'] + print(f" 总内存: {total_mem:.2f} MB") + + print(f"\n 📋 子进程详情:") + for idx, child in enumerate(children, 1): + try: + child_mem = child.memory_info().rss / 1024 / 1024 + child_name = child.name() + child_cmdline = " ".join(child.cmdline()[:3]) + if len(child_cmdline) > 80: + child_cmdline = child_cmdline[:77] + "..." + print(f" [{idx}] PID {child.pid}: {child_name} - {child_mem:.2f} MB") + print(f" 命令: {child_cmdline}") + except (psutil.NoSuchProcess, psutil.AccessDenied): + print(f" [{idx}] 无法访问进程信息") + + if len(history) > 1: + prev = history[-2] + rss_diff = info['rss_mb'] - prev['rss_mb'] + print(f"\n变化:") + print(f" RSS: {rss_diff:+.2f} MB") + if rss_diff > 10: + print(f" ⚠️ 内存增长较快!") + if info['rss_mb'] > 1000: + print(f" ⚠️ 内存使用超过 1GB!") + + print(f"{'=' * 80}\n") + await asyncio.sleep(interval) + + except psutil.NoSuchProcess: + print("\n❌ Bot 进程已结束") + break + except Exception as e: + print(f"\n❌ 监控出错: {e}") + break + + except KeyboardInterrupt: + print("\n\n⚠️ 用户中断监控") + + finally: + if history and bot_process.pid: + save_process_history(history, bot_process.pid) + + +def save_process_history(history: list, pid: int): + """保存进程监控历史""" + output_dir = Path("data/memory_diagnostics") + output_dir.mkdir(parents=True, exist_ok=True) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + output_file = output_dir / f"process_monitor_{timestamp}_pid{pid}.txt" + + with open(output_file, "w", encoding="utf-8") as f: + f.write("Bot 进程内存监控历史记录\n") + f.write("=" * 80 + "\n\n") + f.write(f"Bot PID: {pid}\n\n") + + for info in history: + f.write(f"时间: {info['timestamp']}\n") + f.write(f"RSS: {info['rss_mb']:.2f} MB\n") + f.write(f"VMS: {info['vms_mb']:.2f} MB\n") + f.write(f"占比: {info['percent']:.2f}%\n") + if info['children_count'] > 0: + f.write(f"子进程: {info['children_count']} 个\n") + f.write(f"子进程内存: {info['children_mem_mb']:.2f} MB\n") + f.write("\n") + + print(f"\n✅ 监控历史已保存到: {output_file}") + + +async def run_monitor_mode(interval: int): + """进程监控模式主函数""" + print("=" * 80) + print("🚀 进程监控模式") + print("=" * 80) + print("此模式将:") + print(" 1. 使用虚拟环境启动 bot.py") + print(" 2. 实时监控进程内存(RSS、VMS)") + print(" 3. 显示子进程详细信息") + print(" 4. 自动保存监控历史") + print("=" * 80 + "\n") + + project_root = Path(__file__).parent.parent + bot_file = project_root / "bot.py" + + if not bot_file.exists(): + print(f"❌ 找不到 bot.py: {bot_file}") + return 1 + + # 检测虚拟环境 + venv_python = project_root / ".venv" / "Scripts" / "python.exe" + if not venv_python.exists(): + venv_python = project_root / ".venv" / "bin" / "python" + + if venv_python.exists(): + python_exe = str(venv_python) + print(f"🐍 使用虚拟环境: {venv_python}") + else: + python_exe = sys.executable + print(f"⚠️ 未找到虚拟环境,使用当前 Python: {python_exe}") + + print(f"🤖 启动 Bot: {bot_file}") + + bot_process = subprocess.Popen( + [python_exe, str(bot_file)], + cwd=str(project_root), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + + await asyncio.sleep(2) + + if bot_process.poll() is not None: + print("❌ Bot 启动失败") + if bot_process.stdout: + output = bot_process.stdout.read() + if output: + print(f"\nBot 输出:\n{output}") + return 1 + + print(f"✅ Bot 已启动 (PID: {bot_process.pid})\n") + + # 启动输出读取线程 + def read_bot_output(): + if bot_process.stdout: + try: + for line in bot_process.stdout: + print(f"[Bot] {line}", end="") + except Exception: + pass + + output_thread = threading.Thread(target=read_bot_output, daemon=True) + output_thread.start() + + try: + await monitor_bot_process(bot_process, interval) + except KeyboardInterrupt: + print("\n\n⚠️ 用户中断") + + if bot_process.poll() is None: + print("\n正在停止 Bot...") + bot_process.terminate() + try: + bot_process.wait(timeout=10) + except subprocess.TimeoutExpired: + print("⚠️ 强制终止 Bot...") + bot_process.kill() + bot_process.wait() + + print("✅ Bot 已停止") + + return 0 + + +# ============================================================================ +# 对象分析模式 +# ============================================================================ + +class ObjectMemoryProfiler: + """对象级内存分析器""" + + def __init__(self, interval: int = 10, output_file: Optional[str] = None, object_limit: int = 20): + self.interval = interval + self.output_file = output_file + self.object_limit = object_limit + self.running = False + self.tracker = None + if PYMPLER_AVAILABLE: + self.tracker = tracker.SummaryTracker() + self.iteration = 0 + + def get_object_stats(self) -> Dict: + """获取当前进程的对象统计(所有线程)""" + if not PYMPLER_AVAILABLE: + return {} + + try: + gc.collect() + all_objects = muppy.get_objects() + sum_data = summary.summarize(all_objects) + + # 按总大小(第3个元素)降序排序 + sorted_sum_data = sorted(sum_data, key=lambda x: x[2], reverse=True) + + # 按模块统计内存 + module_stats = self._get_module_stats(all_objects) + + threads = threading.enumerate() + thread_info = [ + { + "name": t.name, + "daemon": t.daemon, + "alive": t.is_alive(), + } + for t in threads + ] + + gc_stats = { + "collections": gc.get_count(), + "garbage": len(gc.garbage), + "tracked": len(gc.get_objects()), + } + + return { + "summary": sorted_sum_data[:self.object_limit], + "module_stats": module_stats, + "gc_stats": gc_stats, + "total_objects": len(all_objects), + "threads": thread_info, + } + except Exception as e: + print(f"❌ 获取对象统计失败: {e}") + return {} + + def _get_module_stats(self, all_objects: list) -> Dict: + """统计各模块的内存占用""" + module_mem = defaultdict(lambda: {"count": 0, "size": 0}) + + for obj in all_objects: + try: + # 获取对象所属模块 + obj_type = type(obj) + module_name = obj_type.__module__ + + if module_name: + # 获取顶级模块名(例如 src.chat.xxx -> src) + top_module = module_name.split('.')[0] + + obj_size = sys.getsizeof(obj) + module_mem[top_module]["count"] += 1 + module_mem[top_module]["size"] += obj_size + except Exception: + # 忽略无法获取大小的对象 + continue + + # 转换为列表并按大小排序 + sorted_modules = sorted( + [(mod, stats["count"], stats["size"]) + for mod, stats in module_mem.items()], + key=lambda x: x[2], + reverse=True + ) + + return { + "top_modules": sorted_modules[:20], # 前20个模块 + "total_modules": len(module_mem) + } + + def print_stats(self, stats: Dict, iteration: int): + """打印统计信息""" + print("\n" + "=" * 80) + print(f"🔍 对象级内存分析 #{iteration} - {time.strftime('%H:%M:%S')}") + print("=" * 80) + + if "summary" in stats: + print(f"\n📦 对象统计 (前 {self.object_limit} 个类型):\n") + print(f"{'类型':<50} {'数量':>12} {'总大小':>15}") + print("-" * 80) + + for obj_type, obj_count, obj_size in stats["summary"]: + if obj_size >= 1024 * 1024 * 1024: + size_str = f"{obj_size / 1024 / 1024 / 1024:.2f} GB" + elif obj_size >= 1024 * 1024: + size_str = f"{obj_size / 1024 / 1024:.2f} MB" + elif obj_size >= 1024: + size_str = f"{obj_size / 1024:.2f} KB" + else: + size_str = f"{obj_size} B" + + print(f"{obj_type:<50} {obj_count:>12,} {size_str:>15}") + + if "module_stats" in stats and stats["module_stats"]: + print(f"\n📚 模块内存占用 (前 20 个模块):\n") + print(f"{'模块名':<40} {'对象数':>12} {'总内存':>15}") + print("-" * 80) + + for module_name, obj_count, obj_size in stats["module_stats"]["top_modules"]: + if obj_size >= 1024 * 1024 * 1024: + size_str = f"{obj_size / 1024 / 1024 / 1024:.2f} GB" + elif obj_size >= 1024 * 1024: + size_str = f"{obj_size / 1024 / 1024:.2f} MB" + elif obj_size >= 1024: + size_str = f"{obj_size / 1024:.2f} KB" + else: + size_str = f"{obj_size} B" + + print(f"{module_name:<40} {obj_count:>12,} {size_str:>15}") + + print(f"\n 总模块数: {stats['module_stats']['total_modules']}") + + if "threads" in stats: + print(f"\n🧵 线程信息 ({len(stats['threads'])} 个):") + for idx, t in enumerate(stats["threads"], 1): + status = "✓" if t["alive"] else "✗" + daemon = "(守护)" if t["daemon"] else "" + print(f" [{idx}] {status} {t['name']} {daemon}") + + if "gc_stats" in stats: + gc_stats = stats["gc_stats"] + print(f"\n🗑️ 垃圾回收:") + print(f" 代 0: {gc_stats['collections'][0]:,} 次") + print(f" 代 1: {gc_stats['collections'][1]:,} 次") + print(f" 代 2: {gc_stats['collections'][2]:,} 次") + print(f" 追踪对象: {gc_stats['tracked']:,}") + + if "total_objects" in stats: + print(f"\n📊 总对象数: {stats['total_objects']:,}") + + print("=" * 80 + "\n") + + def print_diff(self): + """打印对象变化""" + if not PYMPLER_AVAILABLE or not self.tracker: + return + + print("\n📈 对象变化分析:") + print("-" * 80) + self.tracker.print_diff() + print("-" * 80) + + def save_to_file(self, stats: Dict): + """保存统计信息到文件""" + if not self.output_file: + return + + try: + # 保存文本 + with open(self.output_file, "a", encoding="utf-8") as f: + f.write(f"\n{'=' * 80}\n") + f.write(f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write(f"迭代: #{self.iteration}\n") + f.write(f"{'=' * 80}\n\n") + + if "summary" in stats: + f.write("对象统计:\n") + for obj_type, obj_count, obj_size in stats["summary"]: + f.write(f" {obj_type}: {obj_count:,} 个, {obj_size:,} 字节\n") + + if "module_stats" in stats and stats["module_stats"]: + f.write("\n模块统计 (前 20 个):\n") + for module_name, obj_count, obj_size in stats["module_stats"]["top_modules"]: + f.write(f" {module_name}: {obj_count:,} 个对象, {obj_size:,} 字节\n") + + f.write(f"\n总对象数: {stats.get('total_objects', 0):,}\n") + f.write(f"线程数: {len(stats.get('threads', []))}\n") + + # 保存 JSONL + jsonl_path = str(self.output_file) + ".jsonl" + record = { + "timestamp": time.strftime('%Y-%m-%d %H:%M:%S'), + "iteration": self.iteration, + "total_objects": stats.get("total_objects", 0), + "threads": stats.get("threads", []), + "gc_stats": stats.get("gc_stats", {}), + "summary": [ + {"type": t, "count": c, "size": s} + for (t, c, s) in stats.get("summary", []) + ], + "module_stats": stats.get("module_stats", {}), + } + + with open(jsonl_path, "a", encoding="utf-8") as jf: + jf.write(json.dumps(record, ensure_ascii=False) + "\n") + + if self.iteration == 1: + print(f"💾 数据保存到: {self.output_file}") + print(f"💾 结构化数据: {jsonl_path}") + + except Exception as e: + print(f"⚠️ 保存文件失败: {e}") + + def start_monitoring(self): + """启动监控线程""" + self.running = True + + def monitor_loop(): + print(f"🚀 对象分析器已启动") + print(f" 监控间隔: {self.interval} 秒") + print(f" 对象类型限制: {self.object_limit}") + print(f" 输出文件: {self.output_file or '无'}") + print() + + while self.running: + try: + self.iteration += 1 + stats = self.get_object_stats() + self.print_stats(stats, self.iteration) + + if self.iteration % 3 == 0 and self.tracker: + self.print_diff() + + if self.output_file: + self.save_to_file(stats) + + time.sleep(self.interval) + + except Exception as e: + print(f"❌ 监控出错: {e}") + import traceback + traceback.print_exc() + + monitor_thread = threading.Thread(target=monitor_loop, daemon=True) + monitor_thread.start() + print(f"✓ 监控线程已启动\n") + + def stop(self): + """停止监控""" + self.running = False + + +def run_objects_mode(interval: int, output: Optional[str], object_limit: int): + """对象分析模式主函数""" + if not PYMPLER_AVAILABLE: + print("❌ pympler 未安装,无法使用对象分析模式") + print(" 安装: pip install pympler") + return 1 + + print("=" * 80) + print("🔬 对象分析模式") + print("=" * 80) + print("此模式将:") + print(" 1. 在 bot.py 进程内部运行") + print(" 2. 统计所有对象(包括所有线程)") + print(" 3. 显示对象变化(diff)") + print(" 4. 保存 JSONL 数据用于可视化") + print("=" * 80 + "\n") + + # 添加项目根目录到 Python 路径 + project_root = Path(__file__).parent.parent + if str(project_root) not in sys.path: + sys.path.insert(0, str(project_root)) + print(f"✓ 已添加项目根目录到 Python 路径: {project_root}\n") + + profiler = ObjectMemoryProfiler( + interval=interval, + output_file=output, + object_limit=object_limit + ) + + profiler.start_monitoring() + + print("🤖 正在启动 Bot...\n") + + try: + import bot + + if hasattr(bot, 'main_async'): + asyncio.run(bot.main_async()) + elif hasattr(bot, 'main'): + bot.main() + else: + print("⚠️ bot.py 未找到 main_async() 或 main() 函数") + print(" Bot 模块已导入,监控线程在后台运行") + print(" 按 Ctrl+C 停止\n") + + while profiler.running: + time.sleep(1) + + except KeyboardInterrupt: + print("\n\n⚠️ 用户中断") + except Exception as e: + print(f"\n❌ Bot 运行出错: {e}") + import traceback + traceback.print_exc() + finally: + profiler.stop() + + return 0 + + +# ============================================================================ +# 可视化模式 +# ============================================================================ + +def load_jsonl(path: Path) -> List[Dict]: + """加载 JSONL 文件""" + snapshots = [] + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + snapshots.append(json.loads(line)) + except Exception: + continue + return snapshots + + +def aggregate_top_types(snapshots: List[Dict], top_n: int = 10): + """聚合前 N 个对象类型的时间序列""" + type_max = defaultdict(int) + for snap in snapshots: + for item in snap.get("summary", []): + t = item.get("type") + s = int(item.get("size", 0)) + type_max[t] = max(type_max[t], s) + + top_types = sorted(type_max.items(), key=lambda kv: kv[1], reverse=True)[:top_n] + top_names = [t for t, _ in top_types] + + times = [] + series = {t: [] for t in top_names} + + for snap in snapshots: + ts = snap.get("timestamp") + try: + times.append(datetime.strptime(ts, "%Y-%m-%d %H:%M:%S")) + except Exception: + times.append(None) + + summary = {item.get("type"): int(item.get("size", 0)) + for item in snap.get("summary", [])} + for t in top_names: + series[t].append(summary.get(t, 0) / 1024.0 / 1024.0) + + return times, series + + +def plot_series(times: List, series: Dict, output: Path, top_n: int): + """绘制时间序列图""" + plt.figure(figsize=(14, 8)) + + for name, values in series.items(): + if all(v == 0 for v in values): + continue + plt.plot(times, values, marker="o", label=name, linewidth=2) + + plt.xlabel("时间", fontsize=12) + plt.ylabel("内存 (MB)", fontsize=12) + plt.title(f"对象类型随时间的内存占用 (前 {top_n} 类型)", fontsize=14) + plt.legend(loc="upper left", fontsize="small") + plt.grid(True, alpha=0.3) + plt.tight_layout() + plt.savefig(str(output), dpi=150) + print(f"✅ 已保存图像: {output}") + + +def run_visualize_mode(input_file: str, output_file: str, top: int): + """可视化模式主函数""" + if not MATPLOTLIB_AVAILABLE: + print("❌ matplotlib 未安装,无法使用可视化模式") + print(" 安装: pip install matplotlib") + return 1 + + print("=" * 80) + print("📊 可视化模式") + print("=" * 80) + + path = Path(input_file) + if not path.exists(): + print(f"❌ 找不到输入文件: {path}") + return 1 + + print(f"📂 读取数据: {path}") + snaps = load_jsonl(path) + + if not snaps: + print("❌ 未读取到任何快照数据") + return 1 + + print(f"✓ 读取 {len(snaps)} 个快照") + + times, series = aggregate_top_types(snaps, top_n=top) + print(f"✓ 提取前 {top} 个对象类型") + + output_path = Path(output_file) + plot_series(times, series, output_path, top) + + return 0 + + +# ============================================================================ +# 主入口 +# ============================================================================ + +def main(): + """主函数""" + parser = argparse.ArgumentParser( + description="统一内存分析工具 - Bot 内存诊断完整解决方案", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +模式说明: + --monitor 进程监控模式:从外部监控 bot 进程内存、子进程 + --objects 对象分析模式:在 bot 内部统计所有对象(包括所有线程) + --visualize 可视化模式:将 JSONL 数据绘制成图表 + +使用示例: + # 进程监控(启动 bot 并监控) + python scripts/memory_profiler.py --monitor --interval 10 + + # 对象分析(深度对象统计) + python scripts/memory_profiler.py --objects --interval 10 --output memory_data.txt + + # 生成可视化图表 + python scripts/memory_profiler.py --visualize --input memory_data.txt.jsonl --top 15 --output plot.png + +注意: + - 对象分析模式需要: pip install pympler + - 可视化模式需要: pip install matplotlib + """, + ) + + # 模式选择 + mode_group = parser.add_mutually_exclusive_group(required=True) + mode_group.add_argument("--monitor", "-m", action="store_true", + help="进程监控模式(外部监控 bot 进程)") + mode_group.add_argument("--objects", "-o", action="store_true", + help="对象分析模式(内部统计所有对象)") + mode_group.add_argument("--visualize", "-v", action="store_true", + help="可视化模式(绘制 JSONL 数据)") + + # 通用参数 + parser.add_argument("--interval", "-i", type=int, default=10, + help="监控间隔(秒),默认 10") + + # 对象分析参数 + parser.add_argument("--output", type=str, + help="输出文件路径(对象分析模式)") + parser.add_argument("--object-limit", "-l", type=int, default=20, + help="对象类型显示数量,默认 20") + + # 可视化参数 + parser.add_argument("--input", type=str, + help="输入 JSONL 文件(可视化模式)") + parser.add_argument("--top", "-t", type=int, default=10, + help="展示前 N 个类型(可视化模式),默认 10") + parser.add_argument("--plot-output", type=str, default="memory_analysis_plot.png", + help="图表输出文件,默认 memory_analysis_plot.png") + + args = parser.parse_args() + + # 根据模式执行 + if args.monitor: + return asyncio.run(run_monitor_mode(args.interval)) + + elif args.objects: + if not args.output: + print("⚠️ 建议使用 --output 指定输出文件以保存数据") + return run_objects_mode(args.interval, args.output, args.object_limit) + + elif args.visualize: + if not args.input: + print("❌ 可视化模式需要 --input 参数指定 JSONL 文件") + return 1 + return run_visualize_mode(args.input, args.plot_output, args.top) + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/common/cache_manager.py b/src/common/cache_manager.py index d28ad6f1b..b656b0ca1 100644 --- a/src/common/cache_manager.py +++ b/src/common/cache_manager.py @@ -33,12 +33,12 @@ class CacheManager: cls._instance = super().__new__(cls) return cls._instance - def __init__(self, default_ttl: int = 3600): + def __init__(self, default_ttl: int | None = None): """ 初始化缓存管理器。 """ if not hasattr(self, "_initialized"): - self.default_ttl = default_ttl + self.default_ttl = default_ttl or 3600 self.semantic_cache_collection_name = "semantic_cache" # L1 缓存 (内存) @@ -360,6 +360,60 @@ class CacheManager: if expired_keys: logger.info(f"清理了 {len(expired_keys)} 个过期的L1缓存条目") + + def get_health_stats(self) -> dict[str, Any]: + """获取缓存健康统计信息""" + from src.common.memory_utils import format_size + + return { + "l1_count": len(self.l1_kv_cache), + "l1_memory": self.l1_current_memory, + "l1_memory_formatted": format_size(self.l1_current_memory), + "l1_max_memory": self.l1_max_memory, + "l1_memory_usage_percent": round((self.l1_current_memory / self.l1_max_memory) * 100, 2), + "l1_max_size": self.l1_max_size, + "l1_size_usage_percent": round((len(self.l1_kv_cache) / self.l1_max_size) * 100, 2), + "average_item_size": self.l1_current_memory // len(self.l1_kv_cache) if self.l1_kv_cache else 0, + "average_item_size_formatted": format_size(self.l1_current_memory // len(self.l1_kv_cache)) if self.l1_kv_cache else "0 B", + "largest_item_size": max(self.l1_size_map.values()) if self.l1_size_map else 0, + "largest_item_size_formatted": format_size(max(self.l1_size_map.values())) if self.l1_size_map else "0 B", + } + + def check_health(self) -> tuple[bool, list[str]]: + """检查缓存健康状态 + + Returns: + (is_healthy, warnings) - 是否健康,警告列表 + """ + warnings = [] + + # 检查内存使用 + memory_usage = (self.l1_current_memory / self.l1_max_memory) * 100 + if memory_usage > 90: + warnings.append(f"⚠️ L1缓存内存使用率过高: {memory_usage:.1f}%") + elif memory_usage > 75: + warnings.append(f"⚡ L1缓存内存使用率较高: {memory_usage:.1f}%") + + # 检查条目数 + size_usage = (len(self.l1_kv_cache) / self.l1_max_size) * 100 + if size_usage > 90: + warnings.append(f"⚠️ L1缓存条目数过多: {size_usage:.1f}%") + + # 检查平均条目大小 + if self.l1_kv_cache: + avg_size = self.l1_current_memory // len(self.l1_kv_cache) + if avg_size > 100 * 1024: # >100KB + from src.common.memory_utils import format_size + warnings.append(f"⚡ 平均缓存条目过大: {format_size(avg_size)}") + + # 检查最大单条目 + if self.l1_size_map: + max_size = max(self.l1_size_map.values()) + if max_size > 500 * 1024: # >500KB + from src.common.memory_utils import format_size + warnings.append(f"⚠️ 发现超大缓存条目: {format_size(max_size)}") + + return len(warnings) == 0, warnings # 全局实例 diff --git a/src/common/database/compatibility/adapter.py b/src/common/database/compatibility/adapter.py index f2d1b3e58..a4bd8f51a 100644 --- a/src/common/database/compatibility/adapter.py +++ b/src/common/database/compatibility/adapter.py @@ -175,7 +175,8 @@ async def db_query( if query_type == "get": # 使用QueryBuilder - query_builder = QueryBuilder(model_class) + # 🔧 兼容层默认禁用缓存(避免旧代码产生大量缓存) + query_builder = QueryBuilder(model_class).no_cache() # 应用过滤条件 if filters: diff --git a/src/common/database/optimization/batch_scheduler.py b/src/common/database/optimization/batch_scheduler.py index 35e9b039b..d6dcbcdd6 100644 --- a/src/common/database/optimization/batch_scheduler.py +++ b/src/common/database/optimization/batch_scheduler.py @@ -19,6 +19,7 @@ from sqlalchemy import delete, insert, select, update from src.common.database.core.session import get_db_session from src.common.logger import get_logger +from src.common.memory_utils import estimate_size_smart logger = get_logger("batch_scheduler") @@ -65,6 +66,10 @@ class BatchStats: last_batch_duration: float = 0.0 last_batch_size: int = 0 congestion_score: float = 0.0 # 拥塞评分 (0-1) + + # 🔧 新增:缓存统计 + cache_size: int = 0 # 缓存条目数 + cache_memory_mb: float = 0.0 # 缓存内存占用(MB) class AdaptiveBatchScheduler: @@ -118,8 +123,11 @@ class AdaptiveBatchScheduler: # 统计信息 self.stats = BatchStats() - # 简单的结果缓存 + # 🔧 改进的结果缓存(带大小限制和内存统计) self._result_cache: dict[str, tuple[Any, float]] = {} + self._cache_max_size = 1000 # 最大缓存条目数 + self._cache_memory_estimate = 0 # 缓存内存估算(字节) + self._cache_size_map: dict[str, int] = {} # 每个缓存条目的大小 logger.info( f"自适应批量调度器初始化: " @@ -530,11 +538,53 @@ class AdaptiveBatchScheduler: return None def _set_cache(self, cache_key: str, result: Any) -> None: - """设置缓存""" + """设置缓存(改进版,带大小限制和内存统计)""" + import sys + + # 🔧 检查缓存大小限制 + if len(self._result_cache) >= self._cache_max_size: + # 首先清理过期条目 + current_time = time.time() + expired_keys = [ + k for k, (_, ts) in self._result_cache.items() + if current_time - ts >= self.cache_ttl + ] + + for k in expired_keys: + # 更新内存统计 + if k in self._cache_size_map: + self._cache_memory_estimate -= self._cache_size_map[k] + del self._cache_size_map[k] + del self._result_cache[k] + + # 如果还是太大,清理最老的条目(LRU) + if len(self._result_cache) >= self._cache_max_size: + oldest_key = min( + self._result_cache.keys(), + key=lambda k: self._result_cache[k][1] + ) + # 更新内存统计 + if oldest_key in self._cache_size_map: + self._cache_memory_estimate -= self._cache_size_map[oldest_key] + del self._cache_size_map[oldest_key] + del self._result_cache[oldest_key] + logger.debug(f"缓存已满,淘汰最老条目: {oldest_key}") + + # 🔧 使用准确的内存估算方法 + try: + total_size = estimate_size_smart(cache_key) + estimate_size_smart(result) + self._cache_size_map[cache_key] = total_size + self._cache_memory_estimate += total_size + except Exception as e: + logger.debug(f"估算缓存大小失败: {e}") + # 使用默认值 + self._cache_size_map[cache_key] = 1024 + self._cache_memory_estimate += 1024 + self._result_cache[cache_key] = (result, time.time()) async def get_stats(self) -> BatchStats: - """获取统计信息""" + """获取统计信息(改进版,包含缓存统计)""" async with self._lock: return BatchStats( total_operations=self.stats.total_operations, @@ -547,6 +597,9 @@ class AdaptiveBatchScheduler: last_batch_duration=self.stats.last_batch_duration, last_batch_size=self.stats.last_batch_size, congestion_score=self.stats.congestion_score, + # 🔧 新增:缓存统计 + cache_size=len(self._result_cache), + cache_memory_mb=self._cache_memory_estimate / (1024 * 1024), ) diff --git a/src/common/database/optimization/cache_manager.py b/src/common/database/optimization/cache_manager.py index 7b95cca97..6a44ec021 100644 --- a/src/common/database/optimization/cache_manager.py +++ b/src/common/database/optimization/cache_manager.py @@ -16,6 +16,7 @@ from dataclasses import dataclass from typing import Any, Generic, TypeVar from src.common.logger import get_logger +from src.common.memory_utils import estimate_size_smart logger = get_logger("cache_manager") @@ -230,13 +231,12 @@ class LRUCache(Generic[T]): ) def _estimate_size(self, value: Any) -> int: - """估算数据大小(字节) + """估算数据大小(字节)- 使用准确的估算方法 - 这是一个简单的估算,实际大小可能不同 + 使用深度递归估算,比 sys.getsizeof() 更准确 """ - import sys try: - return sys.getsizeof(value) + return estimate_size_smart(value) except (TypeError, AttributeError): # 无法获取大小,返回默认值 return 1024 @@ -259,6 +259,7 @@ class MultiLevelCache: l2_max_size: int = 10000, l2_ttl: float = 300, max_memory_mb: int = 100, + max_item_size_mb: int = 1, ): """初始化多级缓存 @@ -268,15 +269,19 @@ class MultiLevelCache: l2_max_size: L2缓存最大条目数 l2_ttl: L2缓存TTL(秒) max_memory_mb: 最大内存占用(MB) + max_item_size_mb: 单个缓存条目最大大小(MB) """ self.l1_cache: LRUCache[Any] = LRUCache(l1_max_size, l1_ttl, "L1") self.l2_cache: LRUCache[Any] = LRUCache(l2_max_size, l2_ttl, "L2") self.max_memory_bytes = max_memory_mb * 1024 * 1024 + self.max_item_size_bytes = max_item_size_mb * 1024 * 1024 self._cleanup_task: asyncio.Task | None = None + self._is_closing = False # 🔧 添加关闭标志 logger.info( f"多级缓存初始化: L1({l1_max_size}项/{l1_ttl}s) " - f"L2({l2_max_size}项/{l2_ttl}s) 内存上限({max_memory_mb}MB)" + f"L2({l2_max_size}项/{l2_ttl}s) 内存上限({max_memory_mb}MB) " + f"单项上限({max_item_size_mb}MB)" ) async def get( @@ -337,6 +342,19 @@ class MultiLevelCache: size: 数据大小(字节) ttl: 自定义过期时间(秒),如果为None则使用默认TTL """ + # 估算数据大小(如果未提供) + if size is None: + size = estimate_size_smart(value) + + # 检查单个条目大小是否超过限制 + if size > self.max_item_size_bytes: + logger.warning( + f"缓存条目过大,跳过缓存: key={key}, " + f"size={size / (1024 * 1024):.2f}MB, " + f"limit={self.max_item_size_bytes / (1024 * 1024):.2f}MB" + ) + return + # 根据TTL决定写入哪个缓存层 if ttl is not None: # 有自定义TTL,根据TTL大小决定写入层级 @@ -373,17 +391,51 @@ class MultiLevelCache: logger.info("所有缓存已清空") async def get_stats(self) -> dict[str, Any]: - """获取所有缓存层的统计信息""" + """获取所有缓存层的统计信息(修正版,避免重复计数)""" l1_stats = await self.l1_cache.get_stats() l2_stats = await self.l2_cache.get_stats() - total_size_bytes = l1_stats.total_size + l2_stats.total_size + + # 🔧 修复:计算实际独占的内存,避免L1和L2共享数据的重复计数 + l1_keys = set(self.l1_cache._cache.keys()) + l2_keys = set(self.l2_cache._cache.keys()) + + shared_keys = l1_keys & l2_keys + l1_only_keys = l1_keys - l2_keys + l2_only_keys = l2_keys - l1_keys + + # 计算实际总内存(避免重复计数) + # L1独占内存 + l1_only_size = sum( + self.l1_cache._cache[k].size + for k in l1_only_keys + if k in self.l1_cache._cache + ) + # L2独占内存 + l2_only_size = sum( + self.l2_cache._cache[k].size + for k in l2_only_keys + if k in self.l2_cache._cache + ) + # 共享内存(只计算一次,使用L1的数据) + shared_size = sum( + self.l1_cache._cache[k].size + for k in shared_keys + if k in self.l1_cache._cache + ) + + actual_total_size = l1_only_size + l2_only_size + shared_size return { "l1": l1_stats, "l2": l2_stats, - "total_memory_mb": total_size_bytes / (1024 * 1024), + "total_memory_mb": actual_total_size / (1024 * 1024), + "l1_only_mb": l1_only_size / (1024 * 1024), + "l2_only_mb": l2_only_size / (1024 * 1024), + "shared_mb": shared_size / (1024 * 1024), + "shared_keys_count": len(shared_keys), + "dedup_savings_mb": (l1_stats.total_size + l2_stats.total_size - actual_total_size) / (1024 * 1024), "max_memory_mb": self.max_memory_bytes / (1024 * 1024), - "memory_usage_percent": (total_size_bytes / self.max_memory_bytes * 100) if self.max_memory_bytes > 0 else 0, + "memory_usage_percent": (actual_total_size / self.max_memory_bytes * 100) if self.max_memory_bytes > 0 else 0, } async def check_memory_limit(self) -> None: @@ -421,9 +473,13 @@ class MultiLevelCache: return async def cleanup_loop(): - while True: + while not self._is_closing: try: await asyncio.sleep(interval) + + if self._is_closing: + break + stats = await self.get_stats() l1_stats = stats["l1"] l2_stats = stats["l2"] @@ -433,9 +489,14 @@ class MultiLevelCache: f"L2: {l2_stats.item_count}项, " f"命中率{l2_stats.hit_rate:.2%} | " f"内存: {stats['total_memory_mb']:.2f}MB/{stats['max_memory_mb']:.2f}MB " - f"({stats['memory_usage_percent']:.1f}%)" + f"({stats['memory_usage_percent']:.1f}%) | " + f"共享: {stats['shared_keys_count']}键/{stats['shared_mb']:.2f}MB " + f"(去重节省{stats['dedup_savings_mb']:.2f}MB)" ) + # 🔧 清理过期条目 + await self._clean_expired_entries() + # 检查内存限制 await self.check_memory_limit() @@ -449,6 +510,8 @@ class MultiLevelCache: async def stop_cleanup_task(self) -> None: """停止清理任务""" + self._is_closing = True + if self._cleanup_task is not None: self._cleanup_task.cancel() try: @@ -457,6 +520,45 @@ class MultiLevelCache: pass self._cleanup_task = None logger.info("缓存清理任务已停止") + + async def _clean_expired_entries(self) -> None: + """清理过期的缓存条目""" + try: + current_time = time.time() + + # 清理 L1 过期条目 + async with self.l1_cache._lock: + expired_keys = [ + key for key, entry in self.l1_cache._cache.items() + if current_time - entry.created_at > self.l1_cache.ttl + ] + + for key in expired_keys: + entry = self.l1_cache._cache.pop(key, None) + if entry: + self.l1_cache._stats.evictions += 1 + self.l1_cache._stats.item_count -= 1 + self.l1_cache._stats.total_size -= entry.size + + # 清理 L2 过期条目 + async with self.l2_cache._lock: + expired_keys = [ + key for key, entry in self.l2_cache._cache.items() + if current_time - entry.created_at > self.l2_cache.ttl + ] + + for key in expired_keys: + entry = self.l2_cache._cache.pop(key, None) + if entry: + self.l2_cache._stats.evictions += 1 + self.l2_cache._stats.item_count -= 1 + self.l2_cache._stats.total_size -= entry.size + + if expired_keys: + logger.debug(f"清理了 {len(expired_keys)} 个过期缓存条目") + + except Exception as e: + logger.error(f"清理过期条目失败: {e}", exc_info=True) # 全局缓存实例 @@ -498,11 +600,13 @@ async def get_cache() -> MultiLevelCache: l2_max_size = db_config.cache_l2_max_size l2_ttl = db_config.cache_l2_ttl max_memory_mb = db_config.cache_max_memory_mb + max_item_size_mb = db_config.cache_max_item_size_mb cleanup_interval = db_config.cache_cleanup_interval logger.info( f"从配置加载缓存参数: L1({l1_max_size}/{l1_ttl}s), " - f"L2({l2_max_size}/{l2_ttl}s), 内存限制({max_memory_mb}MB)" + f"L2({l2_max_size}/{l2_ttl}s), 内存限制({max_memory_mb}MB), " + f"单项限制({max_item_size_mb}MB)" ) except Exception as e: # 配置未加载,使用默认值 @@ -512,6 +616,7 @@ async def get_cache() -> MultiLevelCache: l2_max_size = 10000 l2_ttl = 300 max_memory_mb = 100 + max_item_size_mb = 1 cleanup_interval = 60 _global_cache = MultiLevelCache( @@ -520,6 +625,7 @@ async def get_cache() -> MultiLevelCache: l2_max_size=l2_max_size, l2_ttl=l2_ttl, max_memory_mb=max_memory_mb, + max_item_size_mb=max_item_size_mb, ) await _global_cache.start_cleanup_task(interval=cleanup_interval) diff --git a/src/common/memory_utils.py b/src/common/memory_utils.py new file mode 100644 index 000000000..2f543ed1d --- /dev/null +++ b/src/common/memory_utils.py @@ -0,0 +1,192 @@ +""" +准确的内存大小估算工具 + +提供比 sys.getsizeof() 更准确的内存占用估算方法 +""" + +import sys +import pickle +from typing import Any +import numpy as np + + +def get_accurate_size(obj: Any, seen: set | None = None) -> int: + """ + 准确估算对象的内存大小(递归计算所有引用对象) + + 比 sys.getsizeof() 准确得多,特别是对于复杂嵌套对象。 + + Args: + obj: 要估算大小的对象 + seen: 已访问对象的集合(用于避免循环引用) + + Returns: + 估算的字节数 + """ + if seen is None: + seen = set() + + obj_id = id(obj) + if obj_id in seen: + return 0 + + seen.add(obj_id) + size = sys.getsizeof(obj) + + # NumPy 数组特殊处理 + if isinstance(obj, np.ndarray): + size += obj.nbytes + return size + + # 字典:递归计算所有键值对 + if isinstance(obj, dict): + size += sum(get_accurate_size(k, seen) + get_accurate_size(v, seen) + for k, v in obj.items()) + + # 列表、元组、集合:递归计算所有元素 + elif isinstance(obj, (list, tuple, set, frozenset)): + size += sum(get_accurate_size(item, seen) for item in obj) + + # 有 __dict__ 的对象:递归计算属性 + elif hasattr(obj, '__dict__'): + size += get_accurate_size(obj.__dict__, seen) + + # 其他可迭代对象 + elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes, bytearray)): + try: + size += sum(get_accurate_size(item, seen) for item in obj) + except: + pass + + return size + + +def get_pickle_size(obj: Any) -> int: + """ + 使用 pickle 序列化大小作为参考 + + 通常比 sys.getsizeof() 更接近实际内存占用, + 但可能略小于真实内存占用(不包括 Python 对象开销) + + Args: + obj: 要估算大小的对象 + + Returns: + pickle 序列化后的字节数,失败返回 0 + """ + try: + return len(pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)) + except Exception: + return 0 + + +def estimate_size_smart(obj: Any, max_depth: int = 5, sample_large: bool = True) -> int: + """ + 智能估算对象大小(平衡准确性和性能) + + 使用深度受限的递归估算+采样策略,平衡准确性和性能: + - 深度5层足以覆盖99%的缓存数据结构 + - 对大型容器(>100项)进行采样估算 + - 性能开销约60倍于sys.getsizeof,但准确度提升1000+倍 + + Args: + obj: 要估算大小的对象 + max_depth: 最大递归深度(默认5层,可覆盖大多数嵌套结构) + sample_large: 对大型容器是否采样(默认True,提升性能) + + Returns: + 估算的字节数 + """ + return _estimate_recursive(obj, max_depth, set(), sample_large) + + +def _estimate_recursive(obj: Any, depth: int, seen: set, sample_large: bool) -> int: + """递归估算,带深度限制和采样""" + # 检查深度限制 + if depth <= 0: + return sys.getsizeof(obj) + + # 检查循环引用 + obj_id = id(obj) + if obj_id in seen: + return 0 + seen.add(obj_id) + + # 基本大小 + size = sys.getsizeof(obj) + + # 简单类型直接返回 + if isinstance(obj, (int, float, bool, type(None), str, bytes, bytearray)): + return size + + # NumPy 数组特殊处理 + if isinstance(obj, np.ndarray): + return size + obj.nbytes + + # 字典递归 + if isinstance(obj, dict): + items = list(obj.items()) + if sample_large and len(items) > 100: + # 大字典采样:前50 + 中间50 + 最后50 + sample_items = items[:50] + items[len(items)//2-25:len(items)//2+25] + items[-50:] + sampled_size = sum( + _estimate_recursive(k, depth - 1, seen, sample_large) + + _estimate_recursive(v, depth - 1, seen, sample_large) + for k, v in sample_items + ) + # 按比例推算总大小 + size += int(sampled_size * len(items) / len(sample_items)) + else: + # 小字典全部计算 + for k, v in items: + size += _estimate_recursive(k, depth - 1, seen, sample_large) + size += _estimate_recursive(v, depth - 1, seen, sample_large) + return size + + # 列表、元组、集合递归 + if isinstance(obj, (list, tuple, set, frozenset)): + items = list(obj) + if sample_large and len(items) > 100: + # 大容器采样:前50 + 中间50 + 最后50 + sample_items = items[:50] + items[len(items)//2-25:len(items)//2+25] + items[-50:] + sampled_size = sum( + _estimate_recursive(item, depth - 1, seen, sample_large) + for item in sample_items + ) + # 按比例推算总大小 + size += int(sampled_size * len(items) / len(sample_items)) + else: + # 小容器全部计算 + for item in items: + size += _estimate_recursive(item, depth - 1, seen, sample_large) + return size + + # 有 __dict__ 的对象 + if hasattr(obj, '__dict__'): + size += _estimate_recursive(obj.__dict__, depth - 1, seen, sample_large) + + return size + + +def format_size(size_bytes: int) -> str: + """ + 格式化字节数为人类可读的格式 + + Args: + size_bytes: 字节数 + + Returns: + 格式化后的字符串,如 "1.23 MB" + """ + if size_bytes < 1024: + return f"{size_bytes} B" + elif size_bytes < 1024 * 1024: + return f"{size_bytes / 1024:.2f} KB" + elif size_bytes < 1024 * 1024 * 1024: + return f"{size_bytes / 1024 / 1024:.2f} MB" + else: + return f"{size_bytes / 1024 / 1024 / 1024:.2f} GB" + + +# 向后兼容的别名 +get_deep_size = get_accurate_size diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 53383c993..33a70a327 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -49,6 +49,7 @@ class DatabaseConfig(ValidatedConfigBase): cache_l2_ttl: int = Field(default=300, ge=60, le=7200, description="L2缓存生存时间(秒)") cache_cleanup_interval: int = Field(default=60, ge=30, le=600, description="缓存清理任务执行间隔(秒)") cache_max_memory_mb: int = Field(default=100, ge=10, le=1000, description="缓存最大内存占用(MB),超过此值将触发强制清理") + cache_max_item_size_mb: int = Field(default=1, ge=1, le=100, description="单个缓存条目最大大小(MB),超过此值将不缓存") class BotConfig(ValidatedConfigBase): diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 112d07786..37cd0b595 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "7.5.5" +version = "7.5.6" #----以下是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -50,7 +50,8 @@ cache_l1_ttl = 60 # L1缓存生存时间(秒) cache_l2_max_size = 10000 # L2缓存最大条目数(温数据,内存占用约10-50MB) cache_l2_ttl = 300 # L2缓存生存时间(秒) cache_cleanup_interval = 60 # 缓存清理任务执行间隔(秒) -cache_max_memory_mb = 100 # 缓存最大内存占用(MB),超过此值将触发强制清理 +cache_max_memory_mb = 500 # 缓存最大内存占用(MB),超过此值将触发强制清理 +cache_max_item_size_mb = 5 # 单个缓存条目最大大小(MB),超过此值将不缓存 [permission] # 权限系统配置 # Master用户配置(拥有最高权限,无视所有权限节点)