移除多余的测试文件

This commit is contained in:
Furina-1013-create
2025-08-22 13:49:20 +08:00
parent 021a1a5906
commit 60eaef6b20
2 changed files with 196 additions and 252 deletions

View File

@@ -0,0 +1,196 @@
# 记忆系统异步优化说明
## 🎯 优化目标
解决MaiBot-Plus记忆系统阻塞主程序的问题将原本的线性同步调用改为异步非阻塞运行。
## ⚠️ 问题分析
### 原有问题
1. **瞬时记忆阻塞**:每次用户发消息时,`await self.instant_memory.get_memory_for_context(target)` 会阻塞等待LLM响应
2. **定时记忆构建阻塞**每600秒执行的 `build_memory_task()` 会完全阻塞主程序数十秒
3. **LLM调用链阻塞**记忆存储和检索都需要调用LLM延迟较高
### 卡顿表现
- 用户发消息后,程序响应延迟明显增加
- 定时记忆构建时,整个程序无响应
- 高并发时,记忆系统成为性能瓶颈
## 🚀 优化方案
### 1. 异步记忆队列系统 (`async_memory_optimizer.py`)
**核心思想**:将记忆操作放入异步队列,后台处理,不阻塞主程序。
**关键特性**
- 任务队列管理:支持存储、检索、构建三种任务类型
- 优先级调度:高优先级任务(用户查询)优先处理
- 线程池执行:避免阻塞事件循环
- 结果缓存:减少重复计算
- 失败重试:提高系统可靠性
```python
# 使用示例
from src.chat.memory_system.async_memory_optimizer import (
store_memory_nonblocking,
retrieve_memory_nonblocking,
build_memory_nonblocking
)
# 非阻塞存储记忆
task_id = await store_memory_nonblocking(chat_id, content)
# 非阻塞检索记忆(支持缓存)
memories = await retrieve_memory_nonblocking(chat_id, query)
# 非阻塞构建记忆
task_id = await build_memory_nonblocking()
```
### 2. 异步瞬时记忆包装器 (`async_instant_memory_wrapper.py`)
**核心思想**:为现有瞬时记忆系统提供异步包装,支持超时控制和多层回退。
**关键特性**
- 超时控制:防止长时间阻塞
- 缓存机制:热点查询快速响应
- 多系统融合LLM记忆 + 向量记忆
- 回退策略:保证系统稳定性
- 后台存储:存储操作完全非阻塞
```python
# 使用示例
from src.chat.memory_system.async_instant_memory_wrapper import get_async_instant_memory
async_memory = get_async_instant_memory(chat_id)
# 后台存储(发后即忘)
async_memory.store_memory_background(content)
# 快速检索(带超时)
result = await async_memory.get_memory_with_fallback(query, max_timeout=2.0)
```
### 3. 主程序优化
**记忆构建任务异步化**
- 原来:`await self.hippocampus_manager.build_memory()` 阻塞主程序
- 现在:使用异步队列或线程池,后台执行
**消息处理优化**
- 原来:同步等待记忆检索完成
- 现在最大2秒超时保证用户体验
## 📊 性能提升预期
### 响应速度
- **用户消息响应**从原来的3-10秒减少到0.5-2秒
- **记忆检索**:缓存命中时几乎即时响应
- **记忆存储**:从同步阻塞改为后台处理
### 并发能力
- **多用户同时使用**:不再因记忆系统相互阻塞
- **高峰期稳定性**:记忆任务排队处理,不会崩溃
### 资源使用
- **CPU使用**异步处理更好的CPU利用率
- **内存优化**:缓存机制,减少重复计算
- **网络延迟**LLM调用并行化减少等待时间
## 🔧 部署和配置
### 1. 自动部署
新的异步系统已经集成到现有代码中,支持自动回退:
```python
# 优先级回退机制
1. 异步瞬时记忆包装器 (最优)
2. 异步记忆管理器 (次优)
3. 带超时的同步模式 (保底)
```
### 2. 配置参数
`config.toml` 中可以调整相关参数:
```toml
[memory]
enable_memory = true
enable_instant_memory = true
memory_build_interval = 600 # 记忆构建间隔(秒)
```
### 3. 监控和调试
```python
# 查看异步队列状态
from src.chat.memory_system.async_memory_optimizer import async_memory_manager
status = async_memory_manager.get_status()
print(status)
# 查看包装器状态
from src.chat.memory_system.async_instant_memory_wrapper import get_async_instant_memory
wrapper = get_async_instant_memory(chat_id)
status = wrapper.get_status()
print(status)
```
## 🧪 验证方法
### 1. 性能测试
```bash
# 测试用户消息响应时间
time curl -X POST "http://localhost:8080/api/message" -d '{"message": "你还记得我们昨天聊的内容吗?"}'
# 观察内存构建时的程序响应
# 构建期间发送消息,观察是否还有阻塞
```
### 2. 并发测试
```python
import asyncio
import time
async def test_concurrent_messages():
"""测试并发消息处理"""
tasks = []
for i in range(10):
task = asyncio.create_task(send_message(f"测试消息 {i}"))
tasks.append(task)
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"10条并发消息处理完成耗时: {end_time - start_time:.2f}秒")
```
### 3. 日志监控
关注以下日志输出:
- `"异步瞬时记忆:"` - 确认使用了异步系统
- `"记忆构建任务已提交"` - 确认构建任务非阻塞
- `"瞬时记忆检索超时"` - 监控超时情况
## 🔄 回退机制
系统设计了多层回退机制,确保即使新系统出现问题,也能维持基本功能:
1. **异步包装器失败** → 使用异步队列管理器
2. **异步队列失败** → 使用带超时的同步模式
3. **超时保护** → 最长等待时间不超过2秒
4. **完全失败** → 跳过记忆功能,保证基本对话
## 📝 注意事项
1. **首次启动**:异步系统需要初始化时间,可能前几次记忆调用延迟稍高
2. **缓存预热**:系统运行一段时间后,缓存效果会显著提升响应速度
3. **内存使用**:缓存会增加内存使用,但相对于性能提升是值得的
4. **兼容性**:如果发现异步系统有问题,可以临时禁用相关导入,自动回退到原系统
## 🎉 预期效果
-**消息响应速度提升60%+**
-**记忆构建不再阻塞主程序**
-**支持更高的并发用户数**
-**系统整体稳定性提升**
-**保持原有记忆功能完整性**

View File

@@ -1,252 +0,0 @@
#!/usr/bin/env python3
"""
验证修复效果的测试脚本
本脚本验证:
1. no_reply 和 reply 动作是否正确注册
2. 思考循环间隔优化是否生效
3. Action系统的回退机制是否工作正常
"""
import sys
import os
import asyncio
import time
from typing import Dict, Any
# 添加项目根目录到 Python 路径
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, project_root)
async def test_action_registration():
"""测试Action注册情况"""
print("=== 测试Action注册情况 ===")
try:
# 导入插件系统
from src.plugin_system.manager import PluginManager
from src.plugin_system.component.action_manager import ActionManager
# 初始化组件
plugin_manager = PluginManager()
action_manager = ActionManager()
# 加载插件
await plugin_manager.load_all_plugins()
# 检查动作注册情况
print("正在检查已注册的动作...")
registered_actions = action_manager.list_actions()
print(f"总共注册了 {len(registered_actions)} 个动作:")
for action_name in registered_actions:
print(f" - {action_name}")
# 重点检查no_reply和reply
critical_actions = ["no_reply", "reply"]
missing_actions = []
for action in critical_actions:
if action in registered_actions:
print(f"{action} 动作已正确注册")
else:
print(f"{action} 动作未注册")
missing_actions.append(action)
if missing_actions:
print(f"\n⚠️ 缺失的关键动作: {missing_actions}")
return False
else:
print("\n✅ 所有关键动作都已正确注册")
return True
except Exception as e:
print(f"❌ 测试Action注册时出错: {e}")
import traceback
print(traceback.format_exc())
return False
def test_loop_timing_config():
"""测试循环时间配置"""
print("\n=== 测试循环时间配置 ===")
try:
# 模拟循环间隔逻辑
consecutive_empty_loops = 0
timing_schedule = []
# 模拟50次空循环记录间隔时间
for i in range(50):
if consecutive_empty_loops <= 5:
interval = 0.5
elif consecutive_empty_loops <= 20:
interval = 2.0
else:
interval = 5.0
timing_schedule.append((i+1, interval))
consecutive_empty_loops += 1
print("循环间隔调度表:")
print("循环次数 -> 等待时间(秒)")
for loop_num, interval in timing_schedule[::5]: # 每5次显示一次
print(f"{loop_num:2d}次 -> {interval}")
# 分析间隔分布
intervals = [schedule[1] for schedule in timing_schedule]
short_intervals = len([i for i in intervals if i == 0.5])
medium_intervals = len([i for i in intervals if i == 2.0])
long_intervals = len([i for i in intervals if i == 5.0])
print(f"\n间隔分布:")
print(f" 短间隔(0.5s): {short_intervals}")
print(f" 中间隔(2.0s): {medium_intervals}")
print(f" 长间隔(5.0s): {long_intervals}")
# 验证逻辑正确性
if short_intervals == 6 and medium_intervals == 15 and long_intervals == 29:
print("✅ 循环间隔逻辑配置正确")
return True
else:
print("❌ 循环间隔逻辑配置有误")
return False
except Exception as e:
print(f"❌ 测试循环时间配置时出错: {e}")
return False
def test_core_actions_config():
"""测试core_actions插件配置"""
print("\n=== 测试core_actions插件配置 ===")
try:
import json
import toml
# 检查manifest文件
manifest_path = "src/plugins/built_in/core_actions/_manifest.json"
if os.path.exists(manifest_path):
with open(manifest_path, 'r', encoding='utf-8') as f:
manifest = json.load(f)
components = manifest.get('plugin_info', {}).get('components', [])
component_names = [comp['name'] for comp in components]
print(f"Manifest中注册的组件: {component_names}")
if 'reply' in component_names:
print("✅ reply 动作已在manifest中注册")
else:
print("❌ reply 动作未在manifest中注册")
return False
else:
print("❌ 找不到manifest文件")
return False
# 检查config.toml文件
config_path = "src/plugins/built_in/core_actions/config.toml"
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config = toml.load(f)
components_config = config.get('components', {})
print(f"配置文件中的组件设置:")
for key, value in components_config.items():
print(f" {key}: {value}")
if components_config.get('enable_reply', False):
print("✅ reply 动作已在配置中启用")
else:
print("❌ reply 动作未在配置中启用")
return False
else:
print("❌ 找不到配置文件")
return False
print("✅ core_actions插件配置正确")
return True
except Exception as e:
print(f"❌ 测试插件配置时出错: {e}")
import traceback
print(traceback.format_exc())
return False
async def main():
"""主测试函数"""
print("🚀 开始验证MaiBot-Plus修复效果\n")
# 记录测试开始时间
start_time = time.time()
# 执行各项测试
tests = [
("插件配置", test_core_actions_config),
("循环时间配置", test_loop_timing_config),
("Action注册", test_action_registration),
]
results = []
for test_name, test_func in tests:
try:
if asyncio.iscoroutinefunction(test_func):
result = await test_func()
else:
result = test_func()
results.append((test_name, result))
except Exception as e:
print(f"❌ 测试 {test_name} 时发生异常: {e}")
results.append((test_name, False))
# 汇总结果
print("\n" + "="*50)
print("📊 测试结果汇总:")
passed_tests = 0
total_tests = len(results)
for test_name, result in results:
status = "✅ 通过" if result else "❌ 失败"
print(f" {test_name}: {status}")
if result:
passed_tests += 1
# 计算测试耗时
end_time = time.time()
duration = end_time - start_time
print(f"\n总体结果: {passed_tests}/{total_tests} 个测试通过")
print(f"测试耗时: {duration:.2f}")
if passed_tests == total_tests:
print("\n🎉 所有测试通过!修复已生效。")
print("\n主要修复内容:")
print("1. ✅ 修复了 reply 动作未注册的问题")
print("2. ✅ 优化了思考循环间隔,避免无谓的快速循环")
print("3. ✅ 更新了插件配置和manifest文件")
print("\n现在系统应该:")
print("- 有新消息时快速响应(0.1-0.5秒)")
print("- 无新消息时逐步延长等待时间(2-5秒)")
print("- no_reply 和 reply 动作都可用")
else:
print(f"\n⚠️ 还有 {total_tests - passed_tests} 个问题需要解决")
return False
return True
if __name__ == "__main__":
try:
# 切换到项目目录
os.chdir(project_root)
result = asyncio.run(main())
sys.exit(0 if result else 1)
except KeyboardInterrupt:
print("\n\n⚠️ 测试被用户中断")
sys.exit(1)
except Exception as e:
print(f"\n❌ 测试过程中发生未预期的错误: {e}")
import traceback
print(traceback.format_exc())
sys.exit(1)