diff --git a/src/plugins/memory_system/manually_alter_memory.py b/src/plugins/memory_system/manually_alter_memory.py new file mode 100644 index 000000000..e049bd2a9 --- /dev/null +++ b/src/plugins/memory_system/manually_alter_memory.py @@ -0,0 +1,319 @@ +# -*- coding: utf-8 -*- +import os +import sys +import time +from pathlib import Path +import datetime +from rich.console import Console + +from dotenv import load_dotenv + + +''' +我想 总有那么一个瞬间 +你会想和某天才变态少女助手一样 +往Bot的海马体里插上几个电极 不是吗 + +Let's do some dirty job. +''' + +# 获取当前文件的目录 +current_dir = Path(__file__).resolve().parent +# 获取项目根目录(上三层目录) +project_root = current_dir.parent.parent.parent +# env.dev文件路径 +env_path = project_root / ".env.dev" + +# from chat.config import global_config +root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")) +sys.path.append(root_path) + +from src.common.logger import get_module_logger +from src.common.database import db +from src.plugins.memory_system.offline_llm import LLMModel + +logger = get_module_logger('mem_alter') +console = Console() + +# 加载环境变量 +if env_path.exists(): + logger.info(f"从 {env_path} 加载环境变量") + load_dotenv(env_path) +else: + logger.warning(f"未找到环境变量文件: {env_path}") + logger.info("将使用默认配置") + +from memory_manual_build import Memory_graph, Hippocampus #海马体和记忆图 + +# 查询节点信息 +def query_mem_info(memory_graph: Memory_graph): + while True: + query = input("\n请输入新的查询概念(输入'退出'以结束):") + if query.lower() == '退出': + break + + items_list = memory_graph.get_related_item(query) + if items_list: + have_memory = False + first_layer, second_layer = items_list + if first_layer: + have_memory = True + print("\n直接相关的记忆:") + for item in first_layer: + print(f"- {item}") + if second_layer: + have_memory = True + print("\n间接相关的记忆:") + for item in second_layer: + print(f"- {item}") + if not have_memory: + print("\n未找到相关记忆。") + else: + print("未找到相关记忆。") + +# 增加概念节点 +def add_mem_node(hippocampus: Hippocampus): + while True: + concept = input("请输入节点概念名:\n") + result = db.graph_data.nodes.count_documents({'concept': concept}) + + if result != 0: + console.print("[yellow]已存在名为“{concept}”的节点,行为已取消[/yellow]") + continue + + memory_items = list() + while True: + context = input("请输入节点描述信息(输入'终止'以结束)") + if context.lower() == "终止": break + memory_items.append(context) + + current_time = datetime.datetime.now().timestamp() + hippocampus.memory_graph.G.add_node(concept, + memory_items=memory_items, + created_time=current_time, + last_modified=current_time) +# 删除概念节点(及连接到它的边) +def remove_mem_node(hippocampus: Hippocampus): + concept = input("请输入节点概念名:\n") + result = db.graph_data.nodes.count_documents({'concept': concept}) + + if result == 0: + console.print(f"[red]不存在名为“{concept}”的节点[/red]") + + edges = db.graph_data.edges.find({ + '$or': [ + {'source': concept}, + {'target': concept} + ] + }) + + for edge in edges: + console.print(f"[yellow]存在边“{edge['source']} -> {edge['target']}”, 请慎重考虑[/yellow]") + + console.print(f"[yellow]确定要移除名为“{concept}”的节点以及其相关边吗[/yellow]") + destory = console.input(f"[red]请输入“{concept}”以删除节点 其他输入将被视为取消操作[/red]\n") + if destory == concept: + hippocampus.memory_graph.G.remove_node(concept) + else: + logger.info("[green]删除操作已取消[/green]") +# 增加节点间边 +def add_mem_edge(hippocampus: Hippocampus): + while True: + source = input("请输入 **第一个节点** 名称(输入'退出'以结束):\n") + if source.lower() == "退出": break + if db.graph_data.nodes.count_documents({'concept': source}) == 0: + console.print(f"[yellow]“{source}”节点不存在,操作已取消。[/yellow]") + continue + + target = input("请输入 **第二个节点** 名称:\n") + if db.graph_data.nodes.count_documents({'concept': target}) == 0: + console.print(f"[yellow]“{target}”节点不存在,操作已取消。[/yellow]") + continue + + if source == target: + console.print(f"[yellow]试图创建“{source} <-> {target}”自环,操作已取消。[/yellow]") + continue + + hippocampus.memory_graph.connect_dot(source, target) + edge = hippocampus.memory_graph.G.get_edge_data(source, target) + if edge['strength'] == 1: + console.print(f"[green]成功创建边“{source} <-> {target}”,默认权重1[/green]") + else: + console.print(f"[yellow]边“{source} <-> {target}”已存在,更新权重: {edge['strength']-1} <-> {edge['strength']}[/yellow]") +# 删除节点间边 +def remove_mem_edge(hippocampus: Hippocampus): + while True: + source = input("请输入 **第一个节点** 名称(输入'退出'以结束):\n") + if source.lower() == "退出": break + if db.graph_data.nodes.count_documents({'concept': source}) == 0: + console.print("[yellow]“{source}”节点不存在,操作已取消。[/yellow]") + continue + + target = input("请输入 **第二个节点** 名称:\n") + if db.graph_data.nodes.count_documents({'concept': target}) == 0: + console.print("[yellow]“{target}”节点不存在,操作已取消。[/yellow]") + continue + + if source == target: + console.print("[yellow]试图创建“{source} <-> {target}”自环,操作已取消。[/yellow]") + continue + + edge = hippocampus.memory_graph.G.get_edge_data(source, target) + if edge is None: + console.print("[yellow]边“{source} <-> {target}”不存在,操作已取消。[/yellow]") + continue + else: + accept = console.input("[orange]请输入“确认”以确认删除操作(其他输入视为取消)[/orange]\n") + if accept.lower() == "确认": + hippocampus.memory_graph.G.remove_edge(source, target) + console.print(f"[green]边“{source} <-> {target}”已删除。[green]") + +# 修改节点信息 +def alter_mem_node(hippocampus: Hippocampus): + batchEnviroment = dict() + while True: + concept = input("请输入节点概念名(输入'终止'以结束):\n") + if concept.lower() == "终止": break + _, node = hippocampus.memory_graph.get_dot(concept) + if node is None: + console.print(f"[yellow]“{concept}”节点不存在,操作已取消。[/yellow]") + continue + + console.print("[yellow]注意,请确保你知道自己在做什么[/yellow]") + console.print("[yellow]你将获得一个执行任意代码的环境[/yellow]") + console.print("[red]你已经被警告过了。[/red]\n") + + nodeEnviroment = {"concept": '<节点名>', 'memory_items': '<记忆文本数组>'} + console.print("[green]环境变量中会有env与batchEnv两个dict, env在切换节点时会清空, batchEnv在操作终止时才会清空[/green]") + console.print(f"[green] env 会被初始化为[/green]\n{nodeEnviroment}\n[green]且会在用户代码执行完毕后被提交 [/green]") + console.print("[yellow]为便于书写临时脚本,请手动在输入代码通过Ctrl+C等方式触发KeyboardInterrupt来结束代码执行[/yellow]") + + # 拷贝数据以防操作炸了 + nodeEnviroment = dict(node) + nodeEnviroment['concept'] = concept + + while True: + userexec = lambda script, env, batchEnv: eval(script) + try: + command = console.input() + except KeyboardInterrupt: + # 稍微防一下小天才 + try: + if isinstance(nodeEnviroment['memory_items'], list): + node['memory_items'] = nodeEnviroment['memory_items'] + else: + raise Exception + + except: + console.print("[red]我不知道你做了什么,但显然nodeEnviroment['memory_items']已经不是个数组了,操作已取消[/red]") + break + + try: + userexec(command, nodeEnviroment, batchEnviroment) + except Exception as e: + console.print(e) + console.print("[red]自定义代码执行时发生异常,已捕获,请重试(可通过 console.print(locals()) 检查环境状态)[/red]") +# 修改边信息 +def alter_mem_edge(hippocampus: Hippocampus): + batchEnviroment = dict() + while True: + source = input("请输入 **第一个节点** 名称(输入'终止'以结束):\n") + if source.lower() == "终止": break + if hippocampus.memory_graph.get_dot(source) is None: + console.print(f"[yellow]“{source}”节点不存在,操作已取消。[/yellow]") + continue + + target = input("请输入 **第二个节点** 名称:\n") + if hippocampus.memory_graph.get_dot(target) is None: + console.print(f"[yellow]“{target}”节点不存在,操作已取消。[/yellow]") + continue + + edge = hippocampus.memory_graph.G.get_edge_data(source, target) + if edge is None: + console.print(f"[yellow]边“{source} <-> {target}”不存在,操作已取消。[/yellow]") + continue + + console.print("[yellow]注意,请确保你知道自己在做什么[/yellow]") + console.print("[yellow]你将获得一个执行任意代码的环境[/yellow]") + console.print("[red]你已经被警告过了。[/red]\n") + + edgeEnviroment = {"source": '<节点名>', "target": '<节点名>', 'strength': '<强度值,装在一个list里>'} + console.print("[green]环境变量中会有env与batchEnv两个dict, env在切换节点时会清空, batchEnv在操作终止时才会清空[/green]") + console.print(f"[green] env 会被初始化为[/green]\n{edgeEnviroment}\n[green]且会在用户代码执行完毕后被提交 [/green]") + console.print("[yellow]为便于书写临时脚本,请手动在输入代码通过Ctrl+C等方式触发KeyboardInterrupt来结束代码执行[/yellow]") + + # 拷贝数据以防操作炸了 + edgeEnviroment['strength'] = [edge["strength"]] + edgeEnviroment['source'] = source + edgeEnviroment['target'] = target + + while True: + userexec = lambda script, env, batchEnv: eval(script) + try: + command = console.input() + except KeyboardInterrupt: + # 稍微防一下小天才 + try: + if isinstance(edgeEnviroment['strength'][0], int): + edge['strength'] = edgeEnviroment['strength'][0] + else: + raise Exception + + except: + console.print("[red]我不知道你做了什么,但显然edgeEnviroment['strength']已经不是个int了,操作已取消[/red]") + break + + try: + userexec(command, edgeEnviroment, batchEnviroment) + except Exception as e: + console.print(e) + console.print("[red]自定义代码执行时发生异常,已捕获,请重试(可通过 console.print(locals()) 检查环境状态)[/red]") + + + +async def main(): + start_time = time.time() + + # 创建记忆图 + memory_graph = Memory_graph() + + # 创建海马体 + hippocampus = Hippocampus(memory_graph) + + # 从数据库同步数据 + hippocampus.sync_memory_from_db() + + end_time = time.time() + logger.info(f"\033[32m[加载海马体耗时: {end_time - start_time:.2f} 秒]\033[0m") + + while True: + try: + query = int(input("请输入操作类型\n0 -> 查询节点; 1 -> 增加节点; 2 -> 移除节点; 3 -> 增加边; 4 -> 移除边;\n5 -> 修改节点; 6 -> 修改边; 其他任意输入 -> 退出\n")) + except: + query = -1 + + if query == 0: + query_mem_info(memory_graph) + elif query == 1: + add_mem_node(hippocampus) + elif query == 2: + remove_mem_node(hippocampus) + elif query == 3: + add_mem_edge(hippocampus) + elif query == 4: + remove_mem_edge(hippocampus) + elif query == 5: + alter_mem_node(hippocampus) + elif query == 6: + alter_mem_edge(hippocampus) + else: + print("已结束操作") + break + + hippocampus.sync_memory_to_db() + + + +if __name__ == "__main__": + import asyncio + asyncio.run(main())