feat(memory-graph): Step 1 - 集成记忆工具到插件系统

完成记忆系统工具的插件化集成:

1. 创建记忆工具适配器 (memory_plugin_tools.py)
   - CreateMemoryTool: 创建新记忆
   - LinkMemoriesTool: 关联两条记忆
   - SearchMemoriesTool: 搜索相关记忆
   - 适配 BaseTool 接口,支持 LLM 调用

2. 创建全局 MemoryManager 单例 (manager_singleton.py)
   - initialize_memory_manager(): 初始化全局实例
   - get_memory_manager(): 获取单例实例
   - shutdown_memory_manager(): 关闭管理器
   - 线程安全的单例模式

3. 创建记忆系统插件 (plugins/memory_graph_plugin/)
   - MemoryGraphPlugin: 插件主类
   - 自动注册3个记忆工具到系统
   - on_plugin_loaded(): 初始化 MemoryManager
   - on_unload(): 清理资源

4. 修复类型问题
   - ToolParamType.OBJECT  STRING (JSON格式)
   - ToolParamType.NUMBER  FLOAT
   - attributes 参数支持 JSON 字符串解析
   - 修复 min_importance None 比较错误

5. 添加集成测试 (test_plugin_integration.py)
   - 测试工具导入和定义
   - 测试 MemoryManager 初始化
   - 测试工具执行(创建、搜索记忆)
   - 测试单例模式

测试结果:  所有测试通过
LLM 现在可以通过工具调用主动管理记忆!
This commit is contained in:
Windpicker-owo
2025-11-05 18:42:27 +08:00
parent 64b8636e9e
commit fc71aad817
6 changed files with 543 additions and 1 deletions

View File

@@ -0,0 +1,20 @@
"""
记忆系统插件
集成记忆管理功能到 Bot 系统中
"""
from src.plugin_system.base.plugin_metadata import PluginMetadata
__plugin_meta__ = PluginMetadata(
name="记忆图系统 (Memory Graph)",
description="基于图的记忆管理系统,支持记忆创建、关联和检索",
usage="LLM 可以通过工具调用创建和管理记忆,系统自动在回复时检索相关记忆",
version="0.1.0",
author="MoFox-Studio",
license="GPL-v3.0",
repository_url="https://github.com/MoFox-Studio",
keywords=["记忆", "知识图谱", "RAG", "长期记忆"],
categories=["AI", "Knowledge Management"],
extra={"is_built_in": False, "plugin_type": "memory"},
)

View File

@@ -0,0 +1,79 @@
"""
记忆系统插件主类
"""
from typing import ClassVar
from src.common.logger import get_logger
from src.plugin_system import BasePlugin, register_plugin
from src.plugin_system.base.component_types import ComponentInfo, ToolInfo
logger = get_logger("memory_graph_plugin")
@register_plugin
class MemoryGraphPlugin(BasePlugin):
"""记忆图系统插件"""
plugin_name = "memory_graph_plugin"
enable_plugin = True
dependencies: ClassVar = []
python_dependencies: ClassVar = []
config_file_name = "config.toml"
config_schema: ClassVar = {}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
logger.info(f"{self.log_prefix} 插件已加载")
def get_plugin_components(self):
"""返回插件组件列表"""
from src.memory_graph.plugin_tools.memory_plugin_tools import (
CreateMemoryTool,
LinkMemoriesTool,
SearchMemoriesTool,
)
components = []
# 添加工具组件
for tool_class in [CreateMemoryTool, LinkMemoriesTool, SearchMemoriesTool]:
tool_info = tool_class.get_tool_info()
components.append((tool_info, tool_class))
return components
async def on_plugin_loaded(self):
"""插件加载后的回调"""
try:
from src.memory_graph.manager_singleton import initialize_memory_manager
logger.info(f"{self.log_prefix} 正在初始化记忆系统...")
await initialize_memory_manager()
logger.info(f"{self.log_prefix} ✅ 记忆系统初始化成功")
except Exception as e:
logger.error(f"{self.log_prefix} 初始化记忆系统失败: {e}", exc_info=True)
raise
def on_unload(self):
"""插件卸载时的回调"""
try:
import asyncio
from src.memory_graph.manager_singleton import shutdown_memory_manager
logger.info(f"{self.log_prefix} 正在关闭记忆系统...")
# 在事件循环中运行异步关闭
loop = asyncio.get_event_loop()
if loop.is_running():
# 如果循环正在运行,创建任务
asyncio.create_task(shutdown_memory_manager())
else:
# 如果循环未运行,直接运行
loop.run_until_complete(shutdown_memory_manager())
logger.info(f"{self.log_prefix} ✅ 记忆系统已关闭")
except Exception as e:
logger.error(f"{self.log_prefix} 关闭记忆系统时出错: {e}", exc_info=True)

View File

@@ -362,7 +362,7 @@ class MemoryManager:
continue
# 重要性过滤
if memory.importance < min_importance:
if min_importance is not None and memory.importance < min_importance:
continue
# 遗忘状态过滤

View File

@@ -0,0 +1,93 @@
"""
记忆系统管理单例
提供全局访问的 MemoryManager 实例
"""
from __future__ import annotations
from pathlib import Path
from typing import Optional
from src.common.logger import get_logger
from src.memory_graph.manager import MemoryManager
logger = get_logger(__name__)
# 全局 MemoryManager 实例
_memory_manager: Optional[MemoryManager] = None
_initialized: bool = False
async def initialize_memory_manager(data_dir: Optional[Path | str] = None) -> MemoryManager:
"""
初始化全局 MemoryManager
Args:
data_dir: 数据目录,默认使用 data/memory_graph
Returns:
MemoryManager 实例
"""
global _memory_manager, _initialized
if _initialized and _memory_manager:
logger.info("MemoryManager 已经初始化,返回现有实例")
return _memory_manager
try:
if data_dir is None:
data_dir = Path("data/memory_graph")
elif isinstance(data_dir, str):
data_dir = Path(data_dir)
logger.info(f"正在初始化全局 MemoryManager (data_dir={data_dir})...")
_memory_manager = MemoryManager(data_dir=data_dir)
await _memory_manager.initialize()
_initialized = True
logger.info("✅ 全局 MemoryManager 初始化成功")
return _memory_manager
except Exception as e:
logger.error(f"初始化 MemoryManager 失败: {e}", exc_info=True)
_initialized = False
_memory_manager = None
raise
def get_memory_manager() -> Optional[MemoryManager]:
"""
获取全局 MemoryManager 实例
Returns:
MemoryManager 实例,如果未初始化则返回 None
"""
if not _initialized or _memory_manager is None:
logger.warning("MemoryManager 尚未初始化,请先调用 initialize_memory_manager()")
return None
return _memory_manager
async def shutdown_memory_manager():
"""关闭全局 MemoryManager"""
global _memory_manager, _initialized
if _memory_manager:
try:
logger.info("正在关闭全局 MemoryManager...")
await _memory_manager.shutdown()
logger.info("✅ 全局 MemoryManager 已关闭")
except Exception as e:
logger.error(f"关闭 MemoryManager 时出错: {e}", exc_info=True)
finally:
_memory_manager = None
_initialized = False
def is_initialized() -> bool:
"""检查 MemoryManager 是否已初始化"""
return _initialized and _memory_manager is not None

View File

@@ -0,0 +1,224 @@
"""
记忆系统插件工具
将 MemoryTools 适配为 BaseTool 格式,供 LLM 使用
"""
from __future__ import annotations
from typing import Any, ClassVar
from src.common.logger import get_logger
from src.plugin_system.base.base_tool import BaseTool
from src.plugin_system.base.component_types import ToolParamType
logger = get_logger(__name__)
class CreateMemoryTool(BaseTool):
"""创建记忆工具"""
name = "create_memory"
description = "创建一个新的记忆。记忆由主体、类型、主题、客体(可选)和属性组成。用于记录重要的信息、事件、想法等。"
parameters: ClassVar[list[tuple[str, ToolParamType, str, bool, list[str] | None]]] = [
("subject", ToolParamType.STRING, "记忆的主体,通常是'''用户'或具体的人名", True, None),
("memory_type", ToolParamType.STRING, "记忆类型", True, ["事件", "事实", "关系", "观点"]),
("topic", ToolParamType.STRING, "记忆的主题,即发生的事情或状态", True, None),
("object", ToolParamType.STRING, "记忆的客体,即主题作用的对象(可选)", False, None),
("attributes", ToolParamType.STRING, "记忆的属性JSON格式字符串{\"时间\":\"今天\",\"地点\":\"家里\"}", False, None),
("importance", ToolParamType.FLOAT, "记忆的重要性0.0-1.0默认0.5", False, None),
]
available_for_llm = True
async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]:
"""执行创建记忆"""
try:
# 获取全局 memory_manager
from src.memory_graph.manager_singleton import get_memory_manager
manager = get_memory_manager()
if not manager:
return {
"name": self.name,
"content": "记忆系统未初始化"
}
# 提取参数
subject = function_args.get("subject", "")
memory_type = function_args.get("memory_type", "")
topic = function_args.get("topic", "")
obj = function_args.get("object")
# 处理 attributes可能是字符串或字典
attributes_raw = function_args.get("attributes", {})
if isinstance(attributes_raw, str):
import orjson
try:
attributes = orjson.loads(attributes_raw)
except Exception:
attributes = {}
else:
attributes = attributes_raw
importance = function_args.get("importance", 0.5)
# 创建记忆
memory = await manager.create_memory(
subject=subject,
memory_type=memory_type,
topic=topic,
object_=obj,
attributes=attributes,
importance=importance,
)
if memory:
logger.info(f"[CreateMemoryTool] 成功创建记忆: {memory.id}")
return {
"name": self.name,
"content": f"成功创建记忆ID: {memory.id}"
}
else:
return {
"name": self.name,
"content": "创建记忆失败"
}
except Exception as e:
logger.error(f"[CreateMemoryTool] 执行失败: {e}", exc_info=True)
return {
"name": self.name,
"content": f"创建记忆时出错: {str(e)}"
}
class LinkMemoriesTool(BaseTool):
"""关联记忆工具"""
name = "link_memories"
description = "在两个记忆之间建立关联关系。用于连接相关的记忆,形成知识网络。"
parameters: ClassVar[list[tuple[str, ToolParamType, str, bool, list[str] | None]]] = [
("source_query", ToolParamType.STRING, "源记忆的搜索查询(如记忆的主题关键词)", True, None),
("target_query", ToolParamType.STRING, "目标记忆的搜索查询", True, None),
("relation", ToolParamType.STRING, "关系类型", True, ["导致", "引用", "相似", "相反", "部分"]),
("strength", ToolParamType.FLOAT, "关系强度0.0-1.0默认0.7", False, None),
]
available_for_llm = True
async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]:
"""执行关联记忆"""
try:
from src.memory_graph.manager_singleton import get_memory_manager
manager = get_memory_manager()
if not manager:
return {
"name": self.name,
"content": "记忆系统未初始化"
}
source_query = function_args.get("source_query", "")
target_query = function_args.get("target_query", "")
relation = function_args.get("relation", "引用")
strength = function_args.get("strength", 0.7)
# 关联记忆
success = await manager.link_memories(
source_description=source_query,
target_description=target_query,
relation_type=relation,
importance=strength,
)
if success:
logger.info(f"[LinkMemoriesTool] 成功关联记忆: {source_query} -> {target_query}")
return {
"name": self.name,
"content": f"成功建立关联: {source_query} --{relation}--> {target_query}"
}
else:
return {
"name": self.name,
"content": "关联记忆失败,可能找不到匹配的记忆"
}
except Exception as e:
logger.error(f"[LinkMemoriesTool] 执行失败: {e}", exc_info=True)
return {
"name": self.name,
"content": f"关联记忆时出错: {str(e)}"
}
class SearchMemoriesTool(BaseTool):
"""搜索记忆工具"""
name = "search_memories"
description = "搜索相关的记忆。根据查询词搜索记忆库,返回最相关的记忆。"
parameters: ClassVar[list[tuple[str, ToolParamType, str, bool, list[str] | None]]] = [
("query", ToolParamType.STRING, "搜索查询词,描述想要找什么样的记忆", True, None),
("top_k", ToolParamType.INTEGER, "返回的记忆数量默认5", False, None),
("min_importance", ToolParamType.FLOAT, "最低重要性阈值0.0-1.0),只返回重要性不低于此值的记忆", False, None),
]
available_for_llm = True
async def execute(self, function_args: dict[str, Any]) -> dict[str, Any]:
"""执行搜索记忆"""
try:
from src.memory_graph.manager_singleton import get_memory_manager
manager = get_memory_manager()
if not manager:
return {
"name": self.name,
"content": "记忆系统未初始化"
}
query = function_args.get("query", "")
top_k = function_args.get("top_k", 5)
min_importance_raw = function_args.get("min_importance")
min_importance = float(min_importance_raw) if min_importance_raw is not None else None
# 搜索记忆
memories = await manager.search_memories(
query=query,
top_k=top_k,
min_importance=min_importance,
)
if memories:
# 格式化结果
result_lines = [f"找到 {len(memories)} 条相关记忆:\n"]
for i, mem in enumerate(memories, 1):
topic = mem.metadata.get("topic", "N/A")
mem_type = mem.metadata.get("memory_type", "N/A")
importance = mem.importance
result_lines.append(
f"{i}. [{mem_type}] {topic} (重要性: {importance:.2f})"
)
result_text = "\n".join(result_lines)
logger.info(f"[SearchMemoriesTool] 搜索成功: 查询='{query}', 结果数={len(memories)}")
return {
"name": self.name,
"content": result_text
}
else:
return {
"name": self.name,
"content": f"未找到与 '{query}' 相关的记忆"
}
except Exception as e:
logger.error(f"[SearchMemoriesTool] 执行失败: {e}", exc_info=True)
return {
"name": self.name,
"content": f"搜索记忆时出错: {str(e)}"
}

View File

@@ -0,0 +1,126 @@
"""
测试记忆系统插件集成
验证:
1. 插件能否正常加载
2. 工具能否被识别为 LLM 可用工具
3. 工具能否正常执行
"""
import asyncio
import sys
from pathlib import Path
# 添加项目根目录到路径
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
async def test_plugin_integration():
"""测试插件集成"""
print("=" * 60)
print("测试记忆系统插件集成")
print("=" * 60)
print()
# 1. 测试导入插件工具
print("[1] 测试导入插件工具...")
try:
from src.memory_graph.plugin_tools.memory_plugin_tools import (
CreateMemoryTool,
LinkMemoriesTool,
SearchMemoriesTool,
)
print(f" ✅ CreateMemoryTool: {CreateMemoryTool.name}")
print(f" ✅ LinkMemoriesTool: {LinkMemoriesTool.name}")
print(f" ✅ SearchMemoriesTool: {SearchMemoriesTool.name}")
except Exception as e:
print(f" ❌ 导入失败: {e}")
return False
# 2. 测试工具定义
print("\n[2] 测试工具定义...")
try:
create_def = CreateMemoryTool.get_tool_definition()
link_def = LinkMemoriesTool.get_tool_definition()
search_def = SearchMemoriesTool.get_tool_definition()
print(f" ✅ create_memory: {len(create_def['parameters'])} 个参数")
print(f" ✅ link_memories: {len(link_def['parameters'])} 个参数")
print(f" ✅ search_memories: {len(search_def['parameters'])} 个参数")
except Exception as e:
print(f" ❌ 获取工具定义失败: {e}")
return False
# 3. 测试初始化 MemoryManager
print("\n[3] 测试初始化 MemoryManager...")
try:
from src.memory_graph.manager_singleton import (
get_memory_manager,
initialize_memory_manager,
)
# 初始化
manager = await initialize_memory_manager(data_dir="data/test_plugin_integration")
print(f" ✅ MemoryManager 初始化成功")
# 获取单例
manager2 = get_memory_manager()
assert manager is manager2, "单例模式失败"
print(f" ✅ 单例模式正常")
except Exception as e:
print(f" ❌ 初始化失败: {e}")
import traceback
traceback.print_exc()
return False
# 4. 测试工具执行
print("\n[4] 测试工具执行...")
try:
# 创建记忆
create_tool = CreateMemoryTool()
result = await create_tool.execute(
{
"subject": "",
"memory_type": "事件",
"topic": "测试记忆系统插件",
"attributes": {"时间": "今天"},
"importance": 0.8,
}
)
print(f" ✅ create_memory: {result['content']}")
# 搜索记忆
search_tool = SearchMemoriesTool()
result = await search_tool.execute({"query": "测试", "top_k": 5})
print(f" ✅ search_memories: 找到记忆")
except Exception as e:
print(f" ❌ 工具执行失败: {e}")
import traceback
traceback.print_exc()
return False
# 5. 测试关闭
print("\n[5] 测试关闭...")
try:
from src.memory_graph.manager_singleton import shutdown_memory_manager
await shutdown_memory_manager()
print(f" ✅ MemoryManager 关闭成功")
except Exception as e:
print(f" ❌ 关闭失败: {e}")
return False
print("\n" + "=" * 60)
print("[SUCCESS] 所有测试通过!")
print("=" * 60)
return True
if __name__ == "__main__":
result = asyncio.run(test_plugin_integration())
sys.exit(0 if result else 1)