diff --git a/src/plugins/memory_system/manually_alter_memory.py b/src/plugins/memory_system/manually_alter_memory.py new file mode 100644 index 000000000..98999e4f8 --- /dev/null +++ b/src/plugins/memory_system/manually_alter_memory.py @@ -0,0 +1,223 @@ +# -*- coding: utf-8 -*- +import os +import sys +import time +from pathlib import Path +import datetime +from rich.console import Console + +from dotenv import load_dotenv + + +''' +我想 总有那么一个瞬间 +你会想和某天才变态少女助手一样 +往Bot的海马体里插上几个电极 不是吗 + +Let's do some dirty job. +''' + +# 获取当前文件的目录 +current_dir = Path(__file__).resolve().parent +# 获取项目根目录(上三层目录) +project_root = current_dir.parent.parent.parent +# env.dev文件路径 +env_path = project_root / ".env.dev" + +# from chat.config import global_config +root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")) +sys.path.append(root_path) + +from src.common.logger import get_module_logger +from src.common.database import db +from src.plugins.memory_system.offline_llm import LLMModel + +logger = get_module_logger('mem_alter') +console = Console() + +# 加载环境变量 +if env_path.exists(): + logger.info(f"从 {env_path} 加载环境变量") + load_dotenv(env_path) +else: + logger.warning(f"未找到环境变量文件: {env_path}") + logger.info("将使用默认配置") + +from memory_manual_build import Memory_graph, Hippocampus #海马体和记忆图 + +# 查询节点信息 +def query_mem_info(memory_graph: Memory_graph): + while True: + query = input("\n请输入新的查询概念(输入'退出'以结束):") + if query.lower() == '退出': + break + + items_list = memory_graph.get_related_item(query) + if items_list: + have_memory = False + first_layer, second_layer = items_list + if first_layer: + have_memory = True + print("\n直接相关的记忆:") + for item in first_layer: + print(f"- {item}") + if second_layer: + have_memory = True + print("\n间接相关的记忆:") + for item in second_layer: + print(f"- {item}") + if not have_memory: + print("\n未找到相关记忆。") + else: + print("未找到相关记忆。") + +# 增加概念节点 +def add_mem_node(hippocampus: Hippocampus): + while True: + concept = input("请输入节点概念名:\n") + result = db.graph_data.nodes.count_documents({'concept': concept}) + + if result != 0: + console.print("[yellow]已存在名为“{concept}”的节点,行为已取消[/yellow]") + continue + + memory_items = list() + while True: + context = input("请输入节点描述信息(输入'终止'以结束)") + if context.lower() == "终止": break + memory_items.append(context) + + current_time = datetime.datetime.now().timestamp() + hippocampus.memory_graph.G.add_node(concept, + memory_items=memory_items, + created_time=current_time, + last_modified=current_time) +# 删除概念节点(及连接到它的边) +def remove_mem_node(hippocampus: Hippocampus): + concept = input("请输入节点概念名:\n") + result = db.graph_data.nodes.count_documents({'concept': concept}) + + if result == 0: + console.print(f"[red]不存在名为“{concept}”的节点[/red]") + + edges = db.graph_data.edges.find({ + '$or': [ + {'source': concept}, + {'target': concept} + ] + }) + + for edge in edges: + console.print(f"[yellow]存在边“{edge['source']} -> {edge['target']}”, 请慎重考虑[/yellow]") + + console.print(f"[yellow]确定要移除名为“{concept}”的节点以及其相关边吗[/yellow]") + destory = console.input(f"[orange]请输入“{concept}”以删除节点 其他输入将被视为取消操作[/orange]\n") + if destory == concept: + hippocampus.memory_graph.G.remove_node(concept) + else: + logger.info("[green]删除操作已取消[/green]") +# 增加节点间边 +def add_mem_edge(hippocampus: Hippocampus): + while True: + source = input("请输入 **第一个节点** 名称(输入'退出'以结束):\n") + if source.lower() == "退出": break + if db.graph_data.nodes.count_documents({'concept': source}) == 0: + console.print("[yellow]“{source}”节点不存在,操作已取消。[/yellow]") + continue + + target = input("请输入 **第二个节点** 名称:\n") + if db.graph_data.nodes.count_documents({'concept': target}) == 0: + console.print("[yellow]“{target}”节点不存在,操作已取消。[/yellow]") + continue + + if source == target: + console.print("[yellow]试图创建“{source} <-> {target}”自环,操作已取消。[/yellow]") + continue + + hippocampus.memory_graph.connect_dot(source, target) + edge = hippocampus.memory_graph.G.get_edge_data(source, target) + if edge['strength'] == 1: + console.print(f"[green]成功创建边“{source} <-> {target}”,默认权重1[/green]") + else: + console.print(f"[yellow]边“{source} <-> {target}”已存在,更新权重: {edge['strength']-1} <-> {edge['strength']}[/yellow]") +# 删除节点间边 +def remove_mem_edge(hippocampus: Hippocampus): + while True: + source = input("请输入 **第一个节点** 名称(输入'退出'以结束):\n") + if source.lower() == "退出": break + if db.graph_data.nodes.count_documents({'concept': source}) == 0: + console.print("[yellow]“{source}”节点不存在,操作已取消。[/yellow]") + continue + + target = input("请输入 **第二个节点** 名称:\n") + if db.graph_data.nodes.count_documents({'concept': target}) == 0: + console.print("[yellow]“{target}”节点不存在,操作已取消。[/yellow]") + continue + + if source == target: + console.print("[yellow]试图创建“{source} <-> {target}”自环,操作已取消。[/yellow]") + continue + + edge = hippocampus.memory_graph.G.get_edge_data(source, target) + if edge is None: + console.print("[yellow]边“{source} <-> {target}”不存在,操作已取消。[/yellow]") + continue + else: + accept = console.input("[orange]请输入“确认”以确认删除操作(其他输入视为取消)[/orange]\n") + if accept.lower() == "确认": + hippocampus.memory_graph.G.remove_edge(source, target) + console.print(f"[green]边“{source} <-> {target}”已删除。[green]") +# 修改节点信息 +def alter_mem_node(hippocampus: Hippocampus): + #todo... + #需要允许修改memory_items, last_modified + return +# 修改边信息 +def alter_mem_edge(hippocampus: Hippocampus): + #todo... + #需要允许修改strength, last_modified + return + +async def main(): + start_time = time.time() + + # 创建记忆图 + memory_graph = Memory_graph() + + # 创建海马体 + hippocampus = Hippocampus(memory_graph) + + # 从数据库同步数据 + hippocampus.sync_memory_from_db() + + end_time = time.time() + logger.info(f"\033[32m[加载海马体耗时: {end_time - start_time:.2f} 秒]\033[0m") + + while True: + query = int(input("请输入操作类型\n0 -> 查询节点; 1 -> 增加节点; 2 -> 移除节点; 3 -> 增加边; 4 -> 移除边;\n其他任意输入 -> 退出\n")) + + if query == 0: + query_mem_info(memory_graph) + elif query == 1: + add_mem_node(hippocampus) + elif query == 2: + remove_mem_node(hippocampus) + elif query == 3: + add_mem_edge(hippocampus) + elif query == 4: + remove_mem_edge(hippocampus) + elif query == 5: + continue + elif query == 6: + continue + else: + print("已结束操作") + break + + hippocampus.sync_memory_to_db() + + + +if __name__ == "__main__": + import asyncio + asyncio.run(main())