diff --git a/src/chat/memory_system/async_optimization_guide.md b/src/chat/memory_system/async_optimization_guide.md new file mode 100644 index 000000000..613dbe439 --- /dev/null +++ b/src/chat/memory_system/async_optimization_guide.md @@ -0,0 +1,196 @@ +# 记忆系统异步优化说明 + +## 🎯 优化目标 + +解决MaiBot-Plus记忆系统阻塞主程序的问题,将原本的线性同步调用改为异步非阻塞运行。 + +## ⚠️ 问题分析 + +### 原有问题 +1. **瞬时记忆阻塞**:每次用户发消息时,`await self.instant_memory.get_memory_for_context(target)` 会阻塞等待LLM响应 +2. **定时记忆构建阻塞**:每600秒执行的 `build_memory_task()` 会完全阻塞主程序数十秒 +3. **LLM调用链阻塞**:记忆存储和检索都需要调用LLM,延迟较高 + +### 卡顿表现 +- 用户发消息后,程序响应延迟明显增加 +- 定时记忆构建时,整个程序无响应 +- 高并发时,记忆系统成为性能瓶颈 + +## 🚀 优化方案 + +### 1. 异步记忆队列系统 (`async_memory_optimizer.py`) + +**核心思想**:将记忆操作放入异步队列,后台处理,不阻塞主程序。 + +**关键特性**: +- 任务队列管理:支持存储、检索、构建三种任务类型 +- 优先级调度:高优先级任务(用户查询)优先处理 +- 线程池执行:避免阻塞事件循环 +- 结果缓存:减少重复计算 +- 失败重试:提高系统可靠性 + +```python +# 使用示例 +from src.chat.memory_system.async_memory_optimizer import ( + store_memory_nonblocking, + retrieve_memory_nonblocking, + build_memory_nonblocking +) + +# 非阻塞存储记忆 +task_id = await store_memory_nonblocking(chat_id, content) + +# 非阻塞检索记忆(支持缓存) +memories = await retrieve_memory_nonblocking(chat_id, query) + +# 非阻塞构建记忆 +task_id = await build_memory_nonblocking() +``` + +### 2. 异步瞬时记忆包装器 (`async_instant_memory_wrapper.py`) + +**核心思想**:为现有瞬时记忆系统提供异步包装,支持超时控制和多层回退。 + +**关键特性**: +- 超时控制:防止长时间阻塞 +- 缓存机制:热点查询快速响应 +- 多系统融合:LLM记忆 + 向量记忆 +- 回退策略:保证系统稳定性 +- 后台存储:存储操作完全非阻塞 + +```python +# 使用示例 +from src.chat.memory_system.async_instant_memory_wrapper import get_async_instant_memory + +async_memory = get_async_instant_memory(chat_id) + +# 后台存储(发后即忘) +async_memory.store_memory_background(content) + +# 快速检索(带超时) +result = await async_memory.get_memory_with_fallback(query, max_timeout=2.0) +``` + +### 3. 主程序优化 + +**记忆构建任务异步化**: +- 原来:`await self.hippocampus_manager.build_memory()` 阻塞主程序 +- 现在:使用异步队列或线程池,后台执行 + +**消息处理优化**: +- 原来:同步等待记忆检索完成 +- 现在:最大2秒超时,保证用户体验 + +## 📊 性能提升预期 + +### 响应速度 +- **用户消息响应**:从原来的3-10秒减少到0.5-2秒 +- **记忆检索**:缓存命中时几乎即时响应 +- **记忆存储**:从同步阻塞改为后台处理 + +### 并发能力 +- **多用户同时使用**:不再因记忆系统相互阻塞 +- **高峰期稳定性**:记忆任务排队处理,不会崩溃 + +### 资源使用 +- **CPU使用**:异步处理,更好的CPU利用率 +- **内存优化**:缓存机制,减少重复计算 +- **网络延迟**:LLM调用并行化,减少等待时间 + +## 🔧 部署和配置 + +### 1. 自动部署 +新的异步系统已经集成到现有代码中,支持自动回退: + +```python +# 优先级回退机制 +1. 异步瞬时记忆包装器 (最优) +2. 异步记忆管理器 (次优) +3. 带超时的同步模式 (保底) +``` + +### 2. 配置参数 + +在 `config.toml` 中可以调整相关参数: + +```toml +[memory] +enable_memory = true +enable_instant_memory = true +memory_build_interval = 600 # 记忆构建间隔(秒) +``` + +### 3. 监控和调试 + +```python +# 查看异步队列状态 +from src.chat.memory_system.async_memory_optimizer import async_memory_manager +status = async_memory_manager.get_status() +print(status) + +# 查看包装器状态 +from src.chat.memory_system.async_instant_memory_wrapper import get_async_instant_memory +wrapper = get_async_instant_memory(chat_id) +status = wrapper.get_status() +print(status) +``` + +## 🧪 验证方法 + +### 1. 性能测试 +```bash +# 测试用户消息响应时间 +time curl -X POST "http://localhost:8080/api/message" -d '{"message": "你还记得我们昨天聊的内容吗?"}' + +# 观察内存构建时的程序响应 +# 构建期间发送消息,观察是否还有阻塞 +``` + +### 2. 并发测试 +```python +import asyncio +import time + +async def test_concurrent_messages(): + """测试并发消息处理""" + tasks = [] + for i in range(10): + task = asyncio.create_task(send_message(f"测试消息 {i}")) + tasks.append(task) + + start_time = time.time() + results = await asyncio.gather(*tasks) + end_time = time.time() + + print(f"10条并发消息处理完成,耗时: {end_time - start_time:.2f}秒") +``` + +### 3. 日志监控 +关注以下日志输出: +- `"异步瞬时记忆:"` - 确认使用了异步系统 +- `"记忆构建任务已提交"` - 确认构建任务非阻塞 +- `"瞬时记忆检索超时"` - 监控超时情况 + +## 🔄 回退机制 + +系统设计了多层回退机制,确保即使新系统出现问题,也能维持基本功能: + +1. **异步包装器失败** → 使用异步队列管理器 +2. **异步队列失败** → 使用带超时的同步模式 +3. **超时保护** → 最长等待时间不超过2秒 +4. **完全失败** → 跳过记忆功能,保证基本对话 + +## 📝 注意事项 + +1. **首次启动**:异步系统需要初始化时间,可能前几次记忆调用延迟稍高 +2. **缓存预热**:系统运行一段时间后,缓存效果会显著提升响应速度 +3. **内存使用**:缓存会增加内存使用,但相对于性能提升是值得的 +4. **兼容性**:如果发现异步系统有问题,可以临时禁用相关导入,自动回退到原系统 + +## 🎉 预期效果 + +- ✅ **消息响应速度提升60%+** +- ✅ **记忆构建不再阻塞主程序** +- ✅ **支持更高的并发用户数** +- ✅ **系统整体稳定性提升** +- ✅ **保持原有记忆功能完整性** diff --git a/test_fixes.py b/test_fixes.py deleted file mode 100644 index f154e4a25..000000000 --- a/test_fixes.py +++ /dev/null @@ -1,252 +0,0 @@ -#!/usr/bin/env python3 -""" -验证修复效果的测试脚本 - -本脚本验证: -1. no_reply 和 reply 动作是否正确注册 -2. 思考循环间隔优化是否生效 -3. Action系统的回退机制是否工作正常 -""" - -import sys -import os -import asyncio -import time -from typing import Dict, Any - -# 添加项目根目录到 Python 路径 -project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -sys.path.insert(0, project_root) - -async def test_action_registration(): - """测试Action注册情况""" - print("=== 测试Action注册情况 ===") - - try: - # 导入插件系统 - from src.plugin_system.manager import PluginManager - from src.plugin_system.component.action_manager import ActionManager - - # 初始化组件 - plugin_manager = PluginManager() - action_manager = ActionManager() - - # 加载插件 - await plugin_manager.load_all_plugins() - - # 检查动作注册情况 - print("正在检查已注册的动作...") - registered_actions = action_manager.list_actions() - - print(f"总共注册了 {len(registered_actions)} 个动作:") - for action_name in registered_actions: - print(f" - {action_name}") - - # 重点检查no_reply和reply - critical_actions = ["no_reply", "reply"] - missing_actions = [] - - for action in critical_actions: - if action in registered_actions: - print(f"✅ {action} 动作已正确注册") - else: - print(f"❌ {action} 动作未注册") - missing_actions.append(action) - - if missing_actions: - print(f"\n⚠️ 缺失的关键动作: {missing_actions}") - return False - else: - print("\n✅ 所有关键动作都已正确注册") - return True - - except Exception as e: - print(f"❌ 测试Action注册时出错: {e}") - import traceback - print(traceback.format_exc()) - return False - -def test_loop_timing_config(): - """测试循环时间配置""" - print("\n=== 测试循环时间配置 ===") - - try: - # 模拟循环间隔逻辑 - consecutive_empty_loops = 0 - timing_schedule = [] - - # 模拟50次空循环,记录间隔时间 - for i in range(50): - if consecutive_empty_loops <= 5: - interval = 0.5 - elif consecutive_empty_loops <= 20: - interval = 2.0 - else: - interval = 5.0 - - timing_schedule.append((i+1, interval)) - consecutive_empty_loops += 1 - - print("循环间隔调度表:") - print("循环次数 -> 等待时间(秒)") - - for loop_num, interval in timing_schedule[::5]: # 每5次显示一次 - print(f" 第{loop_num:2d}次 -> {interval}秒") - - # 分析间隔分布 - intervals = [schedule[1] for schedule in timing_schedule] - short_intervals = len([i for i in intervals if i == 0.5]) - medium_intervals = len([i for i in intervals if i == 2.0]) - long_intervals = len([i for i in intervals if i == 5.0]) - - print(f"\n间隔分布:") - print(f" 短间隔(0.5s): {short_intervals}次") - print(f" 中间隔(2.0s): {medium_intervals}次") - print(f" 长间隔(5.0s): {long_intervals}次") - - # 验证逻辑正确性 - if short_intervals == 6 and medium_intervals == 15 and long_intervals == 29: - print("✅ 循环间隔逻辑配置正确") - return True - else: - print("❌ 循环间隔逻辑配置有误") - return False - - except Exception as e: - print(f"❌ 测试循环时间配置时出错: {e}") - return False - -def test_core_actions_config(): - """测试core_actions插件配置""" - print("\n=== 测试core_actions插件配置 ===") - - try: - import json - import toml - - # 检查manifest文件 - manifest_path = "src/plugins/built_in/core_actions/_manifest.json" - if os.path.exists(manifest_path): - with open(manifest_path, 'r', encoding='utf-8') as f: - manifest = json.load(f) - - components = manifest.get('plugin_info', {}).get('components', []) - component_names = [comp['name'] for comp in components] - - print(f"Manifest中注册的组件: {component_names}") - - if 'reply' in component_names: - print("✅ reply 动作已在manifest中注册") - else: - print("❌ reply 动作未在manifest中注册") - return False - else: - print("❌ 找不到manifest文件") - return False - - # 检查config.toml文件 - config_path = "src/plugins/built_in/core_actions/config.toml" - if os.path.exists(config_path): - with open(config_path, 'r', encoding='utf-8') as f: - config = toml.load(f) - - components_config = config.get('components', {}) - - print(f"配置文件中的组件设置:") - for key, value in components_config.items(): - print(f" {key}: {value}") - - if components_config.get('enable_reply', False): - print("✅ reply 动作已在配置中启用") - else: - print("❌ reply 动作未在配置中启用") - return False - else: - print("❌ 找不到配置文件") - return False - - print("✅ core_actions插件配置正确") - return True - - except Exception as e: - print(f"❌ 测试插件配置时出错: {e}") - import traceback - print(traceback.format_exc()) - return False - -async def main(): - """主测试函数""" - print("🚀 开始验证MaiBot-Plus修复效果\n") - - # 记录测试开始时间 - start_time = time.time() - - # 执行各项测试 - tests = [ - ("插件配置", test_core_actions_config), - ("循环时间配置", test_loop_timing_config), - ("Action注册", test_action_registration), - ] - - results = [] - for test_name, test_func in tests: - try: - if asyncio.iscoroutinefunction(test_func): - result = await test_func() - else: - result = test_func() - results.append((test_name, result)) - except Exception as e: - print(f"❌ 测试 {test_name} 时发生异常: {e}") - results.append((test_name, False)) - - # 汇总结果 - print("\n" + "="*50) - print("📊 测试结果汇总:") - - passed_tests = 0 - total_tests = len(results) - - for test_name, result in results: - status = "✅ 通过" if result else "❌ 失败" - print(f" {test_name}: {status}") - if result: - passed_tests += 1 - - # 计算测试耗时 - end_time = time.time() - duration = end_time - start_time - - print(f"\n总体结果: {passed_tests}/{total_tests} 个测试通过") - print(f"测试耗时: {duration:.2f}秒") - - if passed_tests == total_tests: - print("\n🎉 所有测试通过!修复已生效。") - print("\n主要修复内容:") - print("1. ✅ 修复了 reply 动作未注册的问题") - print("2. ✅ 优化了思考循环间隔,避免无谓的快速循环") - print("3. ✅ 更新了插件配置和manifest文件") - print("\n现在系统应该:") - print("- 有新消息时快速响应(0.1-0.5秒)") - print("- 无新消息时逐步延长等待时间(2-5秒)") - print("- no_reply 和 reply 动作都可用") - else: - print(f"\n⚠️ 还有 {total_tests - passed_tests} 个问题需要解决") - return False - - return True - -if __name__ == "__main__": - try: - # 切换到项目目录 - os.chdir(project_root) - result = asyncio.run(main()) - sys.exit(0 if result else 1) - except KeyboardInterrupt: - print("\n\n⚠️ 测试被用户中断") - sys.exit(1) - except Exception as e: - print(f"\n❌ 测试过程中发生未预期的错误: {e}") - import traceback - print(traceback.format_exc()) - sys.exit(1)