feat:添加了一个脚本来手动修改内存,包括添加、删除和查询节点和边
添加了一个脚本来手动修改内存,包括添加、删除和查询节点和边
This commit is contained in:
319
src/plugins/memory_system/manually_alter_memory.py
Normal file
319
src/plugins/memory_system/manually_alter_memory.py
Normal file
@@ -0,0 +1,319 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
import datetime
|
||||
from rich.console import Console
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
'''
|
||||
我想 总有那么一个瞬间
|
||||
你会想和某天才变态少女助手一样
|
||||
往Bot的海马体里插上几个电极 不是吗
|
||||
|
||||
Let's do some dirty job.
|
||||
'''
|
||||
|
||||
# 获取当前文件的目录
|
||||
current_dir = Path(__file__).resolve().parent
|
||||
# 获取项目根目录(上三层目录)
|
||||
project_root = current_dir.parent.parent.parent
|
||||
# env.dev文件路径
|
||||
env_path = project_root / ".env.dev"
|
||||
|
||||
# from chat.config import global_config
|
||||
root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.."))
|
||||
sys.path.append(root_path)
|
||||
|
||||
from src.common.logger import get_module_logger
|
||||
from src.common.database import db
|
||||
from src.plugins.memory_system.offline_llm import LLMModel
|
||||
|
||||
logger = get_module_logger('mem_alter')
|
||||
console = Console()
|
||||
|
||||
# 加载环境变量
|
||||
if env_path.exists():
|
||||
logger.info(f"从 {env_path} 加载环境变量")
|
||||
load_dotenv(env_path)
|
||||
else:
|
||||
logger.warning(f"未找到环境变量文件: {env_path}")
|
||||
logger.info("将使用默认配置")
|
||||
|
||||
from memory_manual_build import Memory_graph, Hippocampus #海马体和记忆图
|
||||
|
||||
# 查询节点信息
|
||||
def query_mem_info(memory_graph: Memory_graph):
|
||||
while True:
|
||||
query = input("\n请输入新的查询概念(输入'退出'以结束):")
|
||||
if query.lower() == '退出':
|
||||
break
|
||||
|
||||
items_list = memory_graph.get_related_item(query)
|
||||
if items_list:
|
||||
have_memory = False
|
||||
first_layer, second_layer = items_list
|
||||
if first_layer:
|
||||
have_memory = True
|
||||
print("\n直接相关的记忆:")
|
||||
for item in first_layer:
|
||||
print(f"- {item}")
|
||||
if second_layer:
|
||||
have_memory = True
|
||||
print("\n间接相关的记忆:")
|
||||
for item in second_layer:
|
||||
print(f"- {item}")
|
||||
if not have_memory:
|
||||
print("\n未找到相关记忆。")
|
||||
else:
|
||||
print("未找到相关记忆。")
|
||||
|
||||
# 增加概念节点
|
||||
def add_mem_node(hippocampus: Hippocampus):
|
||||
while True:
|
||||
concept = input("请输入节点概念名:\n")
|
||||
result = db.graph_data.nodes.count_documents({'concept': concept})
|
||||
|
||||
if result != 0:
|
||||
console.print("[yellow]已存在名为“{concept}”的节点,行为已取消[/yellow]")
|
||||
continue
|
||||
|
||||
memory_items = list()
|
||||
while True:
|
||||
context = input("请输入节点描述信息(输入'终止'以结束)")
|
||||
if context.lower() == "终止": break
|
||||
memory_items.append(context)
|
||||
|
||||
current_time = datetime.datetime.now().timestamp()
|
||||
hippocampus.memory_graph.G.add_node(concept,
|
||||
memory_items=memory_items,
|
||||
created_time=current_time,
|
||||
last_modified=current_time)
|
||||
# 删除概念节点(及连接到它的边)
|
||||
def remove_mem_node(hippocampus: Hippocampus):
|
||||
concept = input("请输入节点概念名:\n")
|
||||
result = db.graph_data.nodes.count_documents({'concept': concept})
|
||||
|
||||
if result == 0:
|
||||
console.print(f"[red]不存在名为“{concept}”的节点[/red]")
|
||||
|
||||
edges = db.graph_data.edges.find({
|
||||
'$or': [
|
||||
{'source': concept},
|
||||
{'target': concept}
|
||||
]
|
||||
})
|
||||
|
||||
for edge in edges:
|
||||
console.print(f"[yellow]存在边“{edge['source']} -> {edge['target']}”, 请慎重考虑[/yellow]")
|
||||
|
||||
console.print(f"[yellow]确定要移除名为“{concept}”的节点以及其相关边吗[/yellow]")
|
||||
destory = console.input(f"[red]请输入“{concept}”以删除节点 其他输入将被视为取消操作[/red]\n")
|
||||
if destory == concept:
|
||||
hippocampus.memory_graph.G.remove_node(concept)
|
||||
else:
|
||||
logger.info("[green]删除操作已取消[/green]")
|
||||
# 增加节点间边
|
||||
def add_mem_edge(hippocampus: Hippocampus):
|
||||
while True:
|
||||
source = input("请输入 **第一个节点** 名称(输入'退出'以结束):\n")
|
||||
if source.lower() == "退出": break
|
||||
if db.graph_data.nodes.count_documents({'concept': source}) == 0:
|
||||
console.print(f"[yellow]“{source}”节点不存在,操作已取消。[/yellow]")
|
||||
continue
|
||||
|
||||
target = input("请输入 **第二个节点** 名称:\n")
|
||||
if db.graph_data.nodes.count_documents({'concept': target}) == 0:
|
||||
console.print(f"[yellow]“{target}”节点不存在,操作已取消。[/yellow]")
|
||||
continue
|
||||
|
||||
if source == target:
|
||||
console.print(f"[yellow]试图创建“{source} <-> {target}”自环,操作已取消。[/yellow]")
|
||||
continue
|
||||
|
||||
hippocampus.memory_graph.connect_dot(source, target)
|
||||
edge = hippocampus.memory_graph.G.get_edge_data(source, target)
|
||||
if edge['strength'] == 1:
|
||||
console.print(f"[green]成功创建边“{source} <-> {target}”,默认权重1[/green]")
|
||||
else:
|
||||
console.print(f"[yellow]边“{source} <-> {target}”已存在,更新权重: {edge['strength']-1} <-> {edge['strength']}[/yellow]")
|
||||
# 删除节点间边
|
||||
def remove_mem_edge(hippocampus: Hippocampus):
|
||||
while True:
|
||||
source = input("请输入 **第一个节点** 名称(输入'退出'以结束):\n")
|
||||
if source.lower() == "退出": break
|
||||
if db.graph_data.nodes.count_documents({'concept': source}) == 0:
|
||||
console.print("[yellow]“{source}”节点不存在,操作已取消。[/yellow]")
|
||||
continue
|
||||
|
||||
target = input("请输入 **第二个节点** 名称:\n")
|
||||
if db.graph_data.nodes.count_documents({'concept': target}) == 0:
|
||||
console.print("[yellow]“{target}”节点不存在,操作已取消。[/yellow]")
|
||||
continue
|
||||
|
||||
if source == target:
|
||||
console.print("[yellow]试图创建“{source} <-> {target}”自环,操作已取消。[/yellow]")
|
||||
continue
|
||||
|
||||
edge = hippocampus.memory_graph.G.get_edge_data(source, target)
|
||||
if edge is None:
|
||||
console.print("[yellow]边“{source} <-> {target}”不存在,操作已取消。[/yellow]")
|
||||
continue
|
||||
else:
|
||||
accept = console.input("[orange]请输入“确认”以确认删除操作(其他输入视为取消)[/orange]\n")
|
||||
if accept.lower() == "确认":
|
||||
hippocampus.memory_graph.G.remove_edge(source, target)
|
||||
console.print(f"[green]边“{source} <-> {target}”已删除。[green]")
|
||||
|
||||
# 修改节点信息
|
||||
def alter_mem_node(hippocampus: Hippocampus):
|
||||
batchEnviroment = dict()
|
||||
while True:
|
||||
concept = input("请输入节点概念名(输入'终止'以结束):\n")
|
||||
if concept.lower() == "终止": break
|
||||
_, node = hippocampus.memory_graph.get_dot(concept)
|
||||
if node is None:
|
||||
console.print(f"[yellow]“{concept}”节点不存在,操作已取消。[/yellow]")
|
||||
continue
|
||||
|
||||
console.print("[yellow]注意,请确保你知道自己在做什么[/yellow]")
|
||||
console.print("[yellow]你将获得一个执行任意代码的环境[/yellow]")
|
||||
console.print("[red]你已经被警告过了。[/red]\n")
|
||||
|
||||
nodeEnviroment = {"concept": '<节点名>', 'memory_items': '<记忆文本数组>'}
|
||||
console.print("[green]环境变量中会有env与batchEnv两个dict, env在切换节点时会清空, batchEnv在操作终止时才会清空[/green]")
|
||||
console.print(f"[green] env 会被初始化为[/green]\n{nodeEnviroment}\n[green]且会在用户代码执行完毕后被提交 [/green]")
|
||||
console.print("[yellow]为便于书写临时脚本,请手动在输入代码通过Ctrl+C等方式触发KeyboardInterrupt来结束代码执行[/yellow]")
|
||||
|
||||
# 拷贝数据以防操作炸了
|
||||
nodeEnviroment = dict(node)
|
||||
nodeEnviroment['concept'] = concept
|
||||
|
||||
while True:
|
||||
userexec = lambda script, env, batchEnv: eval(script)
|
||||
try:
|
||||
command = console.input()
|
||||
except KeyboardInterrupt:
|
||||
# 稍微防一下小天才
|
||||
try:
|
||||
if isinstance(nodeEnviroment['memory_items'], list):
|
||||
node['memory_items'] = nodeEnviroment['memory_items']
|
||||
else:
|
||||
raise Exception
|
||||
|
||||
except:
|
||||
console.print("[red]我不知道你做了什么,但显然nodeEnviroment['memory_items']已经不是个数组了,操作已取消[/red]")
|
||||
break
|
||||
|
||||
try:
|
||||
userexec(command, nodeEnviroment, batchEnviroment)
|
||||
except Exception as e:
|
||||
console.print(e)
|
||||
console.print("[red]自定义代码执行时发生异常,已捕获,请重试(可通过 console.print(locals()) 检查环境状态)[/red]")
|
||||
# 修改边信息
|
||||
def alter_mem_edge(hippocampus: Hippocampus):
|
||||
batchEnviroment = dict()
|
||||
while True:
|
||||
source = input("请输入 **第一个节点** 名称(输入'终止'以结束):\n")
|
||||
if source.lower() == "终止": break
|
||||
if hippocampus.memory_graph.get_dot(source) is None:
|
||||
console.print(f"[yellow]“{source}”节点不存在,操作已取消。[/yellow]")
|
||||
continue
|
||||
|
||||
target = input("请输入 **第二个节点** 名称:\n")
|
||||
if hippocampus.memory_graph.get_dot(target) is None:
|
||||
console.print(f"[yellow]“{target}”节点不存在,操作已取消。[/yellow]")
|
||||
continue
|
||||
|
||||
edge = hippocampus.memory_graph.G.get_edge_data(source, target)
|
||||
if edge is None:
|
||||
console.print(f"[yellow]边“{source} <-> {target}”不存在,操作已取消。[/yellow]")
|
||||
continue
|
||||
|
||||
console.print("[yellow]注意,请确保你知道自己在做什么[/yellow]")
|
||||
console.print("[yellow]你将获得一个执行任意代码的环境[/yellow]")
|
||||
console.print("[red]你已经被警告过了。[/red]\n")
|
||||
|
||||
edgeEnviroment = {"source": '<节点名>', "target": '<节点名>', 'strength': '<强度值,装在一个list里>'}
|
||||
console.print("[green]环境变量中会有env与batchEnv两个dict, env在切换节点时会清空, batchEnv在操作终止时才会清空[/green]")
|
||||
console.print(f"[green] env 会被初始化为[/green]\n{edgeEnviroment}\n[green]且会在用户代码执行完毕后被提交 [/green]")
|
||||
console.print("[yellow]为便于书写临时脚本,请手动在输入代码通过Ctrl+C等方式触发KeyboardInterrupt来结束代码执行[/yellow]")
|
||||
|
||||
# 拷贝数据以防操作炸了
|
||||
edgeEnviroment['strength'] = [edge["strength"]]
|
||||
edgeEnviroment['source'] = source
|
||||
edgeEnviroment['target'] = target
|
||||
|
||||
while True:
|
||||
userexec = lambda script, env, batchEnv: eval(script)
|
||||
try:
|
||||
command = console.input()
|
||||
except KeyboardInterrupt:
|
||||
# 稍微防一下小天才
|
||||
try:
|
||||
if isinstance(edgeEnviroment['strength'][0], int):
|
||||
edge['strength'] = edgeEnviroment['strength'][0]
|
||||
else:
|
||||
raise Exception
|
||||
|
||||
except:
|
||||
console.print("[red]我不知道你做了什么,但显然edgeEnviroment['strength']已经不是个int了,操作已取消[/red]")
|
||||
break
|
||||
|
||||
try:
|
||||
userexec(command, edgeEnviroment, batchEnviroment)
|
||||
except Exception as e:
|
||||
console.print(e)
|
||||
console.print("[red]自定义代码执行时发生异常,已捕获,请重试(可通过 console.print(locals()) 检查环境状态)[/red]")
|
||||
|
||||
|
||||
|
||||
async def main():
|
||||
start_time = time.time()
|
||||
|
||||
# 创建记忆图
|
||||
memory_graph = Memory_graph()
|
||||
|
||||
# 创建海马体
|
||||
hippocampus = Hippocampus(memory_graph)
|
||||
|
||||
# 从数据库同步数据
|
||||
hippocampus.sync_memory_from_db()
|
||||
|
||||
end_time = time.time()
|
||||
logger.info(f"\033[32m[加载海马体耗时: {end_time - start_time:.2f} 秒]\033[0m")
|
||||
|
||||
while True:
|
||||
try:
|
||||
query = int(input("请输入操作类型\n0 -> 查询节点; 1 -> 增加节点; 2 -> 移除节点; 3 -> 增加边; 4 -> 移除边;\n5 -> 修改节点; 6 -> 修改边; 其他任意输入 -> 退出\n"))
|
||||
except:
|
||||
query = -1
|
||||
|
||||
if query == 0:
|
||||
query_mem_info(memory_graph)
|
||||
elif query == 1:
|
||||
add_mem_node(hippocampus)
|
||||
elif query == 2:
|
||||
remove_mem_node(hippocampus)
|
||||
elif query == 3:
|
||||
add_mem_edge(hippocampus)
|
||||
elif query == 4:
|
||||
remove_mem_edge(hippocampus)
|
||||
elif query == 5:
|
||||
alter_mem_node(hippocampus)
|
||||
elif query == 6:
|
||||
alter_mem_edge(hippocampus)
|
||||
else:
|
||||
print("已结束操作")
|
||||
break
|
||||
|
||||
hippocampus.sync_memory_to_db()
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user