删除不再使用的清理记忆数据脚本和时间解析器测试文件

This commit is contained in:
Windpicker-owo
2025-11-14 10:29:17 +08:00
parent c125c89fcd
commit c9f4e42dc6
3 changed files with 0 additions and 628 deletions

View File

@@ -1,355 +0,0 @@
#!/usr/bin/env python3
"""
清理记忆数据中的向量数据
此脚本用于清理现有 JSON 文件中的 embedding 字段,确保向量数据只存储在专门的向量数据库中。
这样可以:
1. 减少 JSON 文件大小
2. 提高读写性能
3. 避免数据冗余
4. 确保数据一致性
使用方法:
python clean_embedding_data.py [--dry-run]
--dry-run: 仅显示将要清理的统计信息,不实际修改文件
"""
import argparse
import json
import logging
from pathlib import Path
from typing import Any
import orjson
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("embedding_cleanup.log", encoding="utf-8")
]
)
logger = logging.getLogger(__name__)
class EmbeddingCleaner:
"""向量数据清理器"""
def __init__(self, data_dir: str = "data"):
"""
初始化清理器
Args:
data_dir: 数据目录路径
"""
self.data_dir = Path(data_dir)
self.cleaned_files = []
self.errors = []
self.stats = {
"files_processed": 0,
"embedings_removed": 0,
"bytes_saved": 0,
"nodes_processed": 0
}
def find_json_files(self) -> list[Path]:
"""查找可能包含向量数据的 JSON 文件"""
json_files = []
# 记忆图数据文件
memory_graph_file = self.data_dir / "memory_graph" / "memory_graph.json"
if memory_graph_file.exists():
json_files.append(memory_graph_file)
# 测试数据文件
self.data_dir / "test_*"
for test_path in self.data_dir.glob("test_*/memory_graph.json"):
if test_path.exists():
json_files.append(test_path)
# 其他可能的记忆相关文件
potential_files = [
self.data_dir / "memory_metadata_index.json",
]
for file_path in potential_files:
if file_path.exists():
json_files.append(file_path)
logger.info(f"找到 {len(json_files)} 个需要处理的 JSON 文件")
return json_files
def analyze_embedding_in_data(self, data: dict[str, Any]) -> int:
"""
分析数据中的 embedding 字段数量
Args:
data: 要分析的数据
Returns:
embedding 字段的数量
"""
embedding_count = 0
def count_embeddings(obj):
nonlocal embedding_count
if isinstance(obj, dict):
if "embedding" in obj:
embedding_count += 1
for value in obj.values():
count_embeddings(value)
elif isinstance(obj, list):
for item in obj:
count_embeddings(item)
count_embeddings(data)
return embedding_count
def clean_embedding_from_data(self, data: dict[str, Any]) -> tuple[dict[str, Any], int]:
"""
从数据中移除 embedding 字段
Args:
data: 要清理的数据
Returns:
(清理后的数据, 移除的 embedding 数量)
"""
removed_count = 0
def remove_embeddings(obj):
nonlocal removed_count
if isinstance(obj, dict):
if "embedding" in obj:
del obj["embedding"]
removed_count += 1
for value in obj.values():
remove_embeddings(value)
elif isinstance(obj, list):
for item in obj:
remove_embeddings(item)
# 创建深拷贝以避免修改原数据
import copy
cleaned_data = copy.deepcopy(data)
remove_embeddings(cleaned_data)
return cleaned_data, removed_count
def process_file(self, file_path: Path, dry_run: bool = False) -> bool:
"""
处理单个文件
Args:
file_path: 文件路径
dry_run: 是否为试运行模式
Returns:
是否处理成功
"""
try:
logger.info(f"处理文件: {file_path}")
# 读取原文件
original_content = file_path.read_bytes()
original_size = len(original_content)
# 解析 JSON 数据
try:
data = orjson.loads(original_content)
except orjson.JSONDecodeError:
# 回退到标准 json
with open(file_path, encoding="utf-8") as f:
data = json.load(f)
# 分析 embedding 数据
embedding_count = self.analyze_embedding_in_data(data)
if embedding_count == 0:
logger.info(" ✓ 文件中没有 embedding 数据,跳过")
return True
logger.info(f" 发现 {embedding_count} 个 embedding 字段")
if not dry_run:
# 清理 embedding 数据
cleaned_data, removed_count = self.clean_embedding_from_data(data)
if removed_count != embedding_count:
logger.warning(f" ⚠️ 清理数量不一致: 分析发现 {embedding_count}, 实际清理 {removed_count}")
# 序列化清理后的数据
try:
cleaned_content = orjson.dumps(
cleaned_data,
option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY
)
except Exception:
# 回退到标准 json
cleaned_content = json.dumps(
cleaned_data,
indent=2,
ensure_ascii=False
).encode("utf-8")
cleaned_size = len(cleaned_content)
bytes_saved = original_size - cleaned_size
# 原子写入
temp_file = file_path.with_suffix(".tmp")
temp_file.write_bytes(cleaned_content)
temp_file.replace(file_path)
logger.info(" ✓ 清理完成:")
logger.info(f" - 移除 embedding 字段: {removed_count}")
logger.info(f" - 节省空间: {bytes_saved:,} 字节 ({bytes_saved/original_size*100:.1f}%)")
logger.info(f" - 新文件大小: {cleaned_size:,} 字节")
# 更新统计
self.stats["embedings_removed"] += removed_count
self.stats["bytes_saved"] += bytes_saved
else:
logger.info(f" [试运行] 将移除 {embedding_count} 个 embedding 字段")
self.stats["embedings_removed"] += embedding_count
self.stats["files_processed"] += 1
self.cleaned_files.append(file_path)
return True
except Exception as e:
logger.error(f" ❌ 处理失败: {e}")
self.errors.append((str(file_path), str(e)))
return False
def analyze_nodes_in_file(self, file_path: Path) -> int:
"""
分析文件中的节点数量
Args:
file_path: 文件路径
Returns:
节点数量
"""
try:
with open(file_path, encoding="utf-8") as f:
data = json.load(f)
node_count = 0
if "nodes" in data and isinstance(data["nodes"], list):
node_count = len(data["nodes"])
return node_count
except Exception as e:
logger.warning(f"分析节点数量失败: {e}")
return 0
def run(self, dry_run: bool = False):
"""
运行清理过程
Args:
dry_run: 是否为试运行模式
"""
logger.info("开始向量数据清理")
logger.info(f"模式: {'试运行' if dry_run else '正式执行'}")
# 查找要处理的文件
json_files = self.find_json_files()
if not json_files:
logger.info("没有找到需要处理的文件")
return
# 统计总节点数
total_nodes = sum(self.analyze_nodes_in_file(f) for f in json_files)
self.stats["nodes_processed"] = total_nodes
logger.info(f"总计 {len(json_files)} 个文件,{total_nodes} 个节点")
# 处理每个文件
success_count = 0
for file_path in json_files:
if self.process_file(file_path, dry_run):
success_count += 1
# 输出统计信息
self.print_summary(dry_run, success_count, len(json_files))
def print_summary(self, dry_run: bool, success_count: int, total_files: int):
"""打印清理摘要"""
logger.info("=" * 60)
logger.info("清理摘要")
logger.info("=" * 60)
mode = "试运行" if dry_run else "正式执行"
logger.info(f"执行模式: {mode}")
logger.info(f"处理文件: {success_count}/{total_files}")
logger.info(f"处理节点: {self.stats['nodes_processed']}")
logger.info(f"清理 embedding 字段: {self.stats['embedings_removed']}")
if not dry_run:
logger.info(f"节省空间: {self.stats['bytes_saved']:,} 字节")
if self.stats["bytes_saved"] > 0:
mb_saved = self.stats["bytes_saved"] / 1024 / 1024
logger.info(f"节省空间: {mb_saved:.2f} MB")
if self.errors:
logger.warning(f"遇到 {len(self.errors)} 个错误:")
for file_path, error in self.errors:
logger.warning(f" {file_path}: {error}")
if success_count == total_files and not self.errors:
logger.info("所有文件处理成功!")
logger.info("=" * 60)
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description="清理记忆数据中的向量数据",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
python clean_embedding_data.py --dry-run # 试运行,查看统计信息
python clean_embedding_data.py # 正式执行清理
"""
)
parser.add_argument(
"--dry-run",
action="store_true",
help="试运行模式,不实际修改文件"
)
parser.add_argument(
"--data-dir",
default="data",
help="数据目录路径 (默认: data)"
)
args = parser.parse_args()
# 确认操作
if not args.dry_run:
print("警告:此操作将永久删除 JSON 文件中的 embedding 数据!")
print(" 请确保向量数据库正在正常工作。")
print()
response = input("确认继续?(yes/no): ")
if response.lower() not in ["yes", "y", ""]:
print("操作已取消")
return
# 执行清理
cleaner = EmbeddingCleaner(args.data_dir)
cleaner.run(dry_run=args.dry_run)
if __name__ == "__main__":
main()

View File

@@ -1,126 +0,0 @@
"""
测试记忆系统插件集成
验证:
1. 插件能否正常加载
2. 工具能否被识别为 LLM 可用工具
3. 工具能否正常执行
"""
import asyncio
import sys
from pathlib import Path
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
async def test_plugin_integration():
"""测试插件集成"""
print("=" * 60)
print("测试记忆系统插件集成")
print("=" * 60)
print()
# 1. 测试导入插件工具
print("[1] 测试导入插件工具...")
try:
from src.memory_graph.plugin_tools.memory_plugin_tools import (
CreateMemoryTool,
LinkMemoriesTool,
SearchMemoriesTool,
)
print(f" ✅ CreateMemoryTool: {CreateMemoryTool.name}")
print(f" ✅ LinkMemoriesTool: {LinkMemoriesTool.name}")
print(f" ✅ SearchMemoriesTool: {SearchMemoriesTool.name}")
except Exception as e:
print(f" ❌ 导入失败: {e}")
return False
# 2. 测试工具定义
print("\n[2] 测试工具定义...")
try:
create_def = CreateMemoryTool.get_tool_definition()
link_def = LinkMemoriesTool.get_tool_definition()
search_def = SearchMemoriesTool.get_tool_definition()
print(f" ✅ create_memory: {len(create_def['parameters'])} 个参数")
print(f" ✅ link_memories: {len(link_def['parameters'])} 个参数")
print(f" ✅ search_memories: {len(search_def['parameters'])} 个参数")
except Exception as e:
print(f" ❌ 获取工具定义失败: {e}")
return False
# 3. 测试初始化 MemoryManager
print("\n[3] 测试初始化 MemoryManager...")
try:
from src.memory_graph.manager_singleton import (
get_memory_manager,
initialize_memory_manager,
)
# 初始化
manager = await initialize_memory_manager(data_dir="data/test_plugin_integration")
print(f" ✅ MemoryManager 初始化成功")
# 获取单例
manager2 = get_memory_manager()
assert manager is manager2, "单例模式失败"
print(f" ✅ 单例模式正常")
except Exception as e:
print(f" ❌ 初始化失败: {e}")
import traceback
traceback.print_exc()
return False
# 4. 测试工具执行
print("\n[4] 测试工具执行...")
try:
# 创建记忆
create_tool = CreateMemoryTool()
result = await create_tool.execute(
{
"subject": "",
"memory_type": "事件",
"topic": "测试记忆系统插件",
"attributes": {"时间": "今天"},
"importance": 0.8,
}
)
print(f" ✅ create_memory: {result['content']}")
# 搜索记忆
search_tool = SearchMemoriesTool()
result = await search_tool.execute({"query": "测试", "top_k": 5})
print(f" ✅ search_memories: 找到记忆")
except Exception as e:
print(f" ❌ 工具执行失败: {e}")
import traceback
traceback.print_exc()
return False
# 5. 测试关闭
print("\n[5] 测试关闭...")
try:
from src.memory_graph.manager_singleton import shutdown_memory_manager
await shutdown_memory_manager()
print(f" ✅ MemoryManager 关闭成功")
except Exception as e:
print(f" ❌ 关闭失败: {e}")
return False
print("\n" + "=" * 60)
print("[SUCCESS] 所有测试通过!")
print("=" * 60)
return True
if __name__ == "__main__":
result = asyncio.run(test_plugin_integration())
sys.exit(0 if result else 1)

View File

@@ -1,147 +0,0 @@
"""
测试增强版时间解析器
验证各种时间表达式的解析能力
"""
from datetime import datetime, timedelta
from src.memory_graph.utils.time_parser import TimeParser
def test_time_parser():
"""测试时间解析器的各种情况"""
# 使用固定的参考时间进行测试
reference_time = datetime(2025, 11, 5, 15, 30, 0) # 2025年11月5日 15:30
parser = TimeParser(reference_time=reference_time)
print("=" * 60)
print("时间解析器增强测试")
print("=" * 60)
print(f"参考时间: {reference_time.strftime('%Y-%m-%d %H:%M:%S')}")
print()
test_cases = [
# 相对日期
("今天", "应该是今天0点"),
("明天", "应该是明天0点"),
("昨天", "应该是昨天0点"),
("前天", "应该是前天0点"),
("后天", "应该是后天0点"),
# X天前/后
("1天前", "应该是昨天0点"),
("2天前", "应该是前天0点"),
("5天前", "应该是5天前0点"),
("3天后", "应该是3天后0点"),
# X周前/后(新增)
("1周前", "应该是1周前0点"),
("2周前", "应该是2周前0点"),
("3周后", "应该是3周后0点"),
# X个月前/后(新增)
("1个月前", "应该是约30天前"),
("2月前", "应该是约60天前"),
("3个月后", "应该是约90天后"),
# X年前/后(新增)
("1年前", "应该是约365天前"),
("2年后", "应该是约730天后"),
# X小时前/后
("1小时前", "应该是1小时前"),
("3小时前", "应该是3小时前"),
("2小时后", "应该是2小时后"),
# X分钟前/后
("30分钟前", "应该是30分钟前"),
("15分钟后", "应该是15分钟后"),
# 时间段
("早上", "应该是今天早上8点"),
("上午", "应该是今天上午10点"),
("中午", "应该是今天中午12点"),
("下午", "应该是今天下午15点"),
("晚上", "应该是今天晚上20点"),
# 组合表达(新增)
("今天下午", "应该是今天下午15点"),
("昨天晚上", "应该是昨天晚上20点"),
("明天早上", "应该是明天早上8点"),
("前天中午", "应该是前天中午12点"),
# 具体时间点
("早上8点", "应该是今天早上8点"),
("下午3点", "应该是今天下午15点"),
("晚上9点", "应该是今天晚上21点"),
# 具体日期
("2025-11-05", "应该是2025年11月5日"),
("11月5日", "应该是今年11月5日"),
("11-05", "应该是今年11月5日"),
# 周/月/年
("上周", "应该是上周"),
("上个月", "应该是上个月"),
("去年", "应该是去年"),
# 中文数字
("一天前", "应该是昨天"),
("三天前", "应该是3天前"),
("五天后", "应该是5天后"),
("十天前", "应该是10天前"),
]
success_count = 0
fail_count = 0
for time_str, expected_desc in test_cases:
result = parser.parse(time_str)
# 计算与参考时间的差异
if result:
diff = result - reference_time
# 格式化输出
if diff.total_seconds() == 0:
diff_str = "当前时间"
elif abs(diff.days) > 0:
if diff.days > 0:
diff_str = f"+{diff.days}"
else:
diff_str = f"{diff.days}"
else:
hours = diff.seconds // 3600
minutes = (diff.seconds % 3600) // 60
if hours > 0:
diff_str = f"{hours}小时"
else:
diff_str = f"{minutes}分钟"
result_str = result.strftime("%Y-%m-%d %H:%M")
status = "[OK]"
success_count += 1
else:
result_str = "解析失败"
diff_str = "N/A"
status = "[FAILED]"
fail_count += 1
print(f"{status} '{time_str:15s}' -> {result_str:20s} ({diff_str:10s}) | {expected_desc}")
print()
print("=" * 60)
print(f"测试结果: 成功 {success_count}/{len(test_cases)}, 失败 {fail_count}/{len(test_cases)}")
if fail_count == 0:
print("[SUCCESS] 所有测试通过!")
else:
print(f"[WARNING] 有 {fail_count} 个测试失败")
print("=" * 60)
if __name__ == "__main__":
test_time_parser()