diff --git a/docs/installation_cute.md b/docs/installation_cute.md index ca97f18e9..5eb5dfdcd 100644 --- a/docs/installation_cute.md +++ b/docs/installation_cute.md @@ -147,9 +147,7 @@ enable_check = false # 是否要检查表情包是不是合适的喵 check_prompt = "符合公序良俗" # 检查表情包的标准呢 [others] -enable_advance_output = true # 是否要显示更多的运行信息呢 enable_kuuki_read = true # 让机器人能够"察言观色"喵 -enable_debug_output = false # 是否启用调试输出喵 enable_friend_chat = false # 是否启用好友聊天喵 [groups] diff --git a/docs/installation_standard.md b/docs/installation_standard.md index dcbbf0c99..a2e60f22a 100644 --- a/docs/installation_standard.md +++ b/docs/installation_standard.md @@ -115,9 +115,7 @@ talk_frequency_down = [] # 降低回复频率的群号 ban_user_id = [] # 禁止回复的用户QQ号 [others] -enable_advance_output = true # 是否启用高级输出 enable_kuuki_read = true # 是否启用读空气功能 -enable_debug_output = false # 是否启用调试输出 enable_friend_chat = false # 是否启用好友聊天 # 模型配置 diff --git a/src/common/logger.py b/src/common/logger.py index f0b2dfe5c..2673275a5 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -31,9 +31,10 @@ _handler_registry: Dict[str, List[int]] = {} current_file_path = Path(__file__).resolve() LOG_ROOT = "logs" -ENABLE_ADVANCE_OUTPUT = False +ENABLE_ADVANCE_OUTPUT = os.getenv("SIMPLE_OUTPUT", "false") +print(f"ENABLE_ADVANCE_OUTPUT: {ENABLE_ADVANCE_OUTPUT}") -if ENABLE_ADVANCE_OUTPUT: +if not ENABLE_ADVANCE_OUTPUT: # 默认全局配置 DEFAULT_CONFIG = { # 日志级别配置 diff --git a/src/plugins/chat/__init__.py b/src/plugins/chat/__init__.py index a54f781a0..e73d0a230 100644 --- a/src/plugins/chat/__init__.py +++ b/src/plugins/chat/__init__.py @@ -110,7 +110,7 @@ async def build_memory_task(): """每build_memory_interval秒执行一次记忆构建""" logger.debug("[记忆构建]------------------------------------开始构建记忆--------------------------------------") start_time = time.time() - await hippocampus.operation_build_memory(chat_size=20) + await hippocampus.operation_build_memory() end_time = time.time() logger.success( f"[记忆构建]--------------------------记忆构建完成:耗时: {end_time - start_time:.2f} " diff --git a/src/plugins/chat/config.py b/src/plugins/chat/config.py index ce30b280b..d0cb18822 100644 --- a/src/plugins/chat/config.py +++ b/src/plugins/chat/config.py @@ -68,9 +68,9 @@ class BotConfig: MODEL_V3_PROBABILITY: float = 0.1 # V3模型概率 MODEL_R1_DISTILL_PROBABILITY: float = 0.1 # R1蒸馏模型概率 - enable_advance_output: bool = False # 是否启用高级输出 + # enable_advance_output: bool = False # 是否启用高级输出 enable_kuuki_read: bool = True # 是否启用读空气功能 - enable_debug_output: bool = False # 是否启用调试输出 + # enable_debug_output: bool = False # 是否启用调试输出 enable_friend_chat: bool = False # 是否启用好友聊天 mood_update_interval: float = 1.0 # 情绪更新间隔 单位秒 @@ -106,6 +106,11 @@ class BotConfig: memory_forget_time: int = 24 # 记忆遗忘时间(小时) memory_forget_percentage: float = 0.01 # 记忆遗忘比例 memory_compress_rate: float = 0.1 # 记忆压缩率 + build_memory_sample_num: int = 10 # 记忆构建采样数量 + build_memory_sample_length: int = 20 # 记忆构建采样长度 + memory_build_distribution: list = field( + default_factory=lambda: [4,2,0.6,24,8,0.4] + ) # 记忆构建分布,参数:分布1均值,标准差,权重,分布2均值,标准差,权重 memory_ban_words: list = field( default_factory=lambda: ["表情包", "图片", "回复", "聊天记录"] ) # 添加新的配置项默认值 @@ -315,6 +320,11 @@ class BotConfig: "memory_forget_percentage", config.memory_forget_percentage ) config.memory_compress_rate = memory_config.get("memory_compress_rate", config.memory_compress_rate) + if config.INNER_VERSION in SpecifierSet(">=0.0.11"): + config.memory_build_distribution = memory_config.get("memory_build_distribution", config.memory_build_distribution) + config.build_memory_sample_num = memory_config.get("build_memory_sample_num", config.build_memory_sample_num) + config.build_memory_sample_length = memory_config.get("build_memory_sample_length", config.build_memory_sample_length) + def remote(parent: dict): remote_config = parent["remote"] @@ -351,10 +361,10 @@ class BotConfig: def others(parent: dict): others_config = parent["others"] - config.enable_advance_output = others_config.get("enable_advance_output", config.enable_advance_output) + # config.enable_advance_output = others_config.get("enable_advance_output", config.enable_advance_output) config.enable_kuuki_read = others_config.get("enable_kuuki_read", config.enable_kuuki_read) if config.INNER_VERSION in SpecifierSet(">=0.0.7"): - config.enable_debug_output = others_config.get("enable_debug_output", config.enable_debug_output) + # config.enable_debug_output = others_config.get("enable_debug_output", config.enable_debug_output) config.enable_friend_chat = others_config.get("enable_friend_chat", config.enable_friend_chat) # 版本表达式:>=1.0.0,<2.0.0 diff --git a/src/plugins/memory_system/memory.py b/src/plugins/memory_system/memory.py index 07a7fb2ee..b55dcf7b3 100644 --- a/src/plugins/memory_system/memory.py +++ b/src/plugins/memory_system/memory.py @@ -18,6 +18,7 @@ from ..chat.utils import ( ) from ..models.utils_model import LLM_request from src.common.logger import get_module_logger, LogConfig, MEMORY_STYLE_CONFIG +from src.plugins.memory_system.sample_distribution import MemoryBuildScheduler # 定义日志配置 memory_config = LogConfig( @@ -195,19 +196,9 @@ class Hippocampus: return hash(f"{nodes[0]}:{nodes[1]}") def random_get_msg_snippet(self, target_timestamp: float, chat_size: int, max_memorized_time_per_msg: int) -> list: - """随机抽取一段时间内的消息片段 - Args: - - target_timestamp: 目标时间戳 - - chat_size: 抽取的消息数量 - - max_memorized_time_per_msg: 每条消息的最大记忆次数 - - Returns: - - list: 抽取出的消息记录列表 - - """ try_count = 0 - # 最多尝试三次抽取 - while try_count < 3: + # 最多尝试2次抽取 + while try_count < 2: messages = get_closest_chat_from_db(length=chat_size, timestamp=target_timestamp) if messages: # 检查messages是否均没有达到记忆次数限制 @@ -224,54 +215,37 @@ class Hippocampus: ) return messages try_count += 1 - # 三次尝试均失败 return None - def get_memory_sample(self, chat_size=20, time_frequency=None): - """获取记忆样本 - - Returns: - list: 消息记录列表,每个元素是一个消息记录字典列表 - """ + def get_memory_sample(self): # 硬编码:每条消息最大记忆次数 # 如有需求可写入global_config - if time_frequency is None: - time_frequency = {"near": 2, "mid": 4, "far": 3} max_memorized_time_per_msg = 3 - current_timestamp = datetime.datetime.now().timestamp() + # 创建双峰分布的记忆调度器 + scheduler = MemoryBuildScheduler( + n_hours1=global_config.memory_build_distribution[0], # 第一个分布均值(4小时前) + std_hours1=global_config.memory_build_distribution[1], # 第一个分布标准差 + weight1=global_config.memory_build_distribution[2], # 第一个分布权重 60% + n_hours2=global_config.memory_build_distribution[3], # 第二个分布均值(24小时前) + std_hours2=global_config.memory_build_distribution[4], # 第二个分布标准差 + weight2=global_config.memory_build_distribution[5], # 第二个分布权重 40% + total_samples=global_config.build_memory_sample_num # 总共生成10个时间点 + ) + + # 生成时间戳数组 + timestamps = scheduler.get_timestamp_array() + logger.debug(f"生成的时间戳数组: {timestamps}") + chat_samples = [] - - # 短期:1h 中期:4h 长期:24h - logger.debug("正在抽取短期消息样本") - for i in range(time_frequency.get("near")): - random_time = current_timestamp - random.randint(1, 3600) - messages = self.random_get_msg_snippet(random_time, chat_size, max_memorized_time_per_msg) + for timestamp in timestamps: + messages = self.random_get_msg_snippet(timestamp, global_config.build_memory_sample_length, max_memorized_time_per_msg) if messages: - logger.debug(f"成功抽取短期消息样本{len(messages)}条") + time_diff = (datetime.datetime.now().timestamp() - timestamp) / 3600 + logger.debug(f"成功抽取 {time_diff:.1f} 小时前的消息样本,共{len(messages)}条") chat_samples.append(messages) else: - logger.warning(f"第{i}次短期消息样本抽取失败") - - logger.debug("正在抽取中期消息样本") - for i in range(time_frequency.get("mid")): - random_time = current_timestamp - random.randint(3600, 3600 * 4) - messages = self.random_get_msg_snippet(random_time, chat_size, max_memorized_time_per_msg) - if messages: - logger.debug(f"成功抽取中期消息样本{len(messages)}条") - chat_samples.append(messages) - else: - logger.warning(f"第{i}次中期消息样本抽取失败") - - logger.debug("正在抽取长期消息样本") - for i in range(time_frequency.get("far")): - random_time = current_timestamp - random.randint(3600 * 4, 3600 * 24) - messages = self.random_get_msg_snippet(random_time, chat_size, max_memorized_time_per_msg) - if messages: - logger.debug(f"成功抽取长期消息样本{len(messages)}条") - chat_samples.append(messages) - else: - logger.warning(f"第{i}次长期消息样本抽取失败") + logger.warning(f"时间戳 {timestamp} 的消息样本抽取失败") return chat_samples @@ -372,9 +346,8 @@ class Hippocampus: ) return topic_num - async def operation_build_memory(self, chat_size=20): - time_frequency = {"near": 1, "mid": 4, "far": 4} - memory_samples = self.get_memory_sample(chat_size, time_frequency) + async def operation_build_memory(self): + memory_samples = self.get_memory_sample() for i, messages in enumerate(memory_samples, 1): all_topics = [] diff --git a/src/plugins/memory_system/memory_manual_build.py b/src/plugins/memory_system/memory_manual_build.py index 0bf276ddd..4d6596e9f 100644 --- a/src/plugins/memory_system/memory_manual_build.py +++ b/src/plugins/memory_system/memory_manual_build.py @@ -7,11 +7,9 @@ import sys import time from collections import Counter from pathlib import Path - import matplotlib.pyplot as plt import networkx as nx from dotenv import load_dotenv -from src.common.logger import get_module_logger import jieba # from chat.config import global_config @@ -19,6 +17,7 @@ import jieba root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")) sys.path.append(root_path) +from src.common.logger import get_module_logger from src.common.database import db # noqa E402 from src.plugins.memory_system.offline_llm import LLMModel # noqa E402 diff --git a/src/plugins/memory_system/memory_test1.py b/src/plugins/memory_system/memory_test1.py deleted file mode 100644 index df4f892d0..000000000 --- a/src/plugins/memory_system/memory_test1.py +++ /dev/null @@ -1,1185 +0,0 @@ -# -*- coding: utf-8 -*- -import datetime -import math -import random -import sys -import time -from collections import Counter -from pathlib import Path - -import matplotlib.pyplot as plt -import networkx as nx -from dotenv import load_dotenv -from src.common.logger import get_module_logger -import jieba - -logger = get_module_logger("mem_test") - -""" -该理论认为,当两个或多个事物在形态上具有相似性时, -它们在记忆中会形成关联。 -例如,梨和苹果在形状和都是水果这一属性上有相似性, -所以当我们看到梨时,很容易通过形态学联想记忆联想到苹果。 -这种相似性联想有助于我们对新事物进行分类和理解, -当遇到一个新的类似水果时, -我们可以通过与已有的水果记忆进行相似性匹配, -来推测它的一些特征。 - - - -时空关联性联想: -除了相似性联想,MAM 还强调时空关联性联想。 -如果两个事物在时间或空间上经常同时出现,它们也会在记忆中形成关联。 -比如,每次在公园里看到花的时候,都能听到鸟儿的叫声, -那么花和鸟儿叫声的形态特征(花的视觉形态和鸟叫的听觉形态)就会在记忆中形成关联, -以后听到鸟叫可能就会联想到公园里的花。 - -""" - -# from chat.config import global_config -sys.path.append("C:/GitHub/MaiMBot") # 添加项目根目录到 Python 路径 -from src.common.database import db # noqa E402 -from src.plugins.memory_system.offline_llm import LLMModel # noqa E402 - -# 获取当前文件的目录 -current_dir = Path(__file__).resolve().parent -# 获取项目根目录(上三层目录) -project_root = current_dir.parent.parent.parent -# env.dev文件路径 -env_path = project_root / ".env.dev" - -# 加载环境变量 -if env_path.exists(): - logger.info(f"从 {env_path} 加载环境变量") - load_dotenv(env_path) -else: - logger.warning(f"未找到环境变量文件: {env_path}") - logger.info("将使用默认配置") - - -def calculate_information_content(text): - """计算文本的信息量(熵)""" - char_count = Counter(text) - total_chars = len(text) - - entropy = 0 - for count in char_count.values(): - probability = count / total_chars - entropy -= probability * math.log2(probability) - - return entropy - - -def get_closest_chat_from_db(length: int, timestamp: str): - """从数据库中获取最接近指定时间戳的聊天记录,并记录读取次数 - - Returns: - list: 消息记录字典列表,每个字典包含消息内容和时间信息 - """ - chat_records = [] - closest_record = db.messages.find_one({"time": {"$lte": timestamp}}, sort=[("time", -1)]) - - if closest_record and closest_record.get("memorized", 0) < 4: - closest_time = closest_record["time"] - group_id = closest_record["group_id"] - # 获取该时间戳之后的length条消息,且groupid相同 - records = list( - db.messages.find({"time": {"$gt": closest_time}, "group_id": group_id}).sort("time", 1).limit(length) - ) - - # 更新每条消息的memorized属性 - for record in records: - current_memorized = record.get("memorized", 0) - if current_memorized > 3: - print("消息已读取3次,跳过") - return "" - - # 更新memorized值 - db.messages.update_one({"_id": record["_id"]}, {"$set": {"memorized": current_memorized + 1}}) - - # 添加到记录列表中 - chat_records.append( - {"text": record["detailed_plain_text"], "time": record["time"], "group_id": record["group_id"]} - ) - - return chat_records - - -class Memory_cortex: - def __init__(self, memory_graph: "Memory_graph"): - self.memory_graph = memory_graph - - def sync_memory_from_db(self): - """ - 从数据库同步数据到内存中的图结构 - 将清空当前内存中的图,并从数据库重新加载所有节点和边 - """ - # 清空当前图 - self.memory_graph.G.clear() - - # 获取当前时间作为默认时间 - default_time = datetime.datetime.now().timestamp() - - # 从数据库加载所有节点 - nodes = db.graph_data.nodes.find() - for node in nodes: - concept = node["concept"] - memory_items = node.get("memory_items", []) - # 确保memory_items是列表 - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - # 获取时间属性,如果不存在则使用默认时间 - created_time = node.get("created_time") - last_modified = node.get("last_modified") - - # 如果时间属性不存在,则更新数据库 - if created_time is None or last_modified is None: - created_time = default_time - last_modified = default_time - # 更新数据库中的节点 - db.graph_data.nodes.update_one( - {"concept": concept}, {"$set": {"created_time": created_time, "last_modified": last_modified}} - ) - logger.info(f"为节点 {concept} 添加默认时间属性") - - # 添加节点到图中,包含时间属性 - self.memory_graph.G.add_node( - concept, memory_items=memory_items, created_time=created_time, last_modified=last_modified - ) - - # 从数据库加载所有边 - edges = db.graph_data.edges.find() - for edge in edges: - source = edge["source"] - target = edge["target"] - - # 只有当源节点和目标节点都存在时才添加边 - if source in self.memory_graph.G and target in self.memory_graph.G: - # 获取时间属性,如果不存在则使用默认时间 - created_time = edge.get("created_time") - last_modified = edge.get("last_modified") - - # 如果时间属性不存在,则更新数据库 - if created_time is None or last_modified is None: - created_time = default_time - last_modified = default_time - # 更新数据库中的边 - db.graph_data.edges.update_one( - {"source": source, "target": target}, - {"$set": {"created_time": created_time, "last_modified": last_modified}}, - ) - logger.info(f"为边 {source} - {target} 添加默认时间属性") - - self.memory_graph.G.add_edge( - source, - target, - strength=edge.get("strength", 1), - created_time=created_time, - last_modified=last_modified, - ) - - logger.success("从数据库同步记忆图谱完成") - - def calculate_node_hash(self, concept, memory_items): - """ - 计算节点的特征值 - """ - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - # 将记忆项排序以确保相同内容生成相同的哈希值 - sorted_items = sorted(memory_items) - # 组合概念和记忆项生成特征值 - content = f"{concept}:{'|'.join(sorted_items)}" - return hash(content) - - def calculate_edge_hash(self, source, target): - """ - 计算边的特征值 - """ - # 对源节点和目标节点排序以确保相同的边生成相同的哈希值 - nodes = sorted([source, target]) - return hash(f"{nodes[0]}:{nodes[1]}") - - def sync_memory_to_db(self): - """ - 检查并同步内存中的图结构与数据库 - 使用特征值(哈希值)快速判断是否需要更新 - """ - current_time = datetime.datetime.now().timestamp() - - # 获取数据库中所有节点和内存中所有节点 - db_nodes = list(db.graph_data.nodes.find()) - memory_nodes = list(self.memory_graph.G.nodes(data=True)) - - # 转换数据库节点为字典格式,方便查找 - db_nodes_dict = {node["concept"]: node for node in db_nodes} - - # 检查并更新节点 - for concept, data in memory_nodes: - memory_items = data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - # 计算内存中节点的特征值 - memory_hash = self.calculate_node_hash(concept, memory_items) - - if concept not in db_nodes_dict: - # 数据库中缺少的节点,添加 - node_data = { - "concept": concept, - "memory_items": memory_items, - "hash": memory_hash, - "created_time": data.get("created_time", current_time), - "last_modified": data.get("last_modified", current_time), - } - db.graph_data.nodes.insert_one(node_data) - else: - # 获取数据库中节点的特征值 - db_node = db_nodes_dict[concept] - db_hash = db_node.get("hash", None) - - # 如果特征值不同,则更新节点 - if db_hash != memory_hash: - db.graph_data.nodes.update_one( - {"concept": concept}, - {"$set": {"memory_items": memory_items, "hash": memory_hash, "last_modified": current_time}}, - ) - - # 检查并删除数据库中多余的节点 - memory_concepts = set(node[0] for node in memory_nodes) - for db_node in db_nodes: - if db_node["concept"] not in memory_concepts: - db.graph_data.nodes.delete_one({"concept": db_node["concept"]}) - - # 处理边的信息 - db_edges = list(db.graph_data.edges.find()) - memory_edges = list(self.memory_graph.G.edges(data=True)) - - # 创建边的哈希值字典 - db_edge_dict = {} - for edge in db_edges: - edge_hash = self.calculate_edge_hash(edge["source"], edge["target"]) - db_edge_dict[(edge["source"], edge["target"])] = {"hash": edge_hash, "strength": edge.get("strength", 1)} - - # 检查并更新边 - for source, target, data in memory_edges: - edge_hash = self.calculate_edge_hash(source, target) - edge_key = (source, target) - strength = data.get("strength", 1) - - if edge_key not in db_edge_dict: - # 添加新边 - edge_data = { - "source": source, - "target": target, - "strength": strength, - "hash": edge_hash, - "created_time": data.get("created_time", current_time), - "last_modified": data.get("last_modified", current_time), - } - db.graph_data.edges.insert_one(edge_data) - else: - # 检查边的特征值是否变化 - if db_edge_dict[edge_key]["hash"] != edge_hash: - db.graph_data.edges.update_one( - {"source": source, "target": target}, - {"$set": {"hash": edge_hash, "strength": strength, "last_modified": current_time}}, - ) - - # 删除多余的边 - memory_edge_set = set((source, target) for source, target, _ in memory_edges) - for edge_key in db_edge_dict: - if edge_key not in memory_edge_set: - source, target = edge_key - db.graph_data.edges.delete_one({"source": source, "target": target}) - - logger.success("完成记忆图谱与数据库的差异同步") - - def remove_node_from_db(self, topic): - """ - 从数据库中删除指定节点及其相关的边 - - Args: - topic: 要删除的节点概念 - """ - # 删除节点 - db.graph_data.nodes.delete_one({"concept": topic}) - # 删除所有涉及该节点的边 - db.graph_data.edges.delete_many({"$or": [{"source": topic}, {"target": topic}]}) - - -class Memory_graph: - def __init__(self): - self.G = nx.Graph() # 使用 networkx 的图结构 - - def connect_dot(self, concept1, concept2): - # 避免自连接 - if concept1 == concept2: - return - - current_time = datetime.datetime.now().timestamp() - - # 如果边已存在,增加 strength - if self.G.has_edge(concept1, concept2): - self.G[concept1][concept2]["strength"] = self.G[concept1][concept2].get("strength", 1) + 1 - # 更新最后修改时间 - self.G[concept1][concept2]["last_modified"] = current_time - else: - # 如果是新边,初始化 strength 为 1 - self.G.add_edge(concept1, concept2, strength=1, created_time=current_time, last_modified=current_time) - - def add_dot(self, concept, memory): - current_time = datetime.datetime.now().timestamp() - - if concept in self.G: - # 如果节点已存在,将新记忆添加到现有列表中 - if "memory_items" in self.G.nodes[concept]: - if not isinstance(self.G.nodes[concept]["memory_items"], list): - # 如果当前不是列表,将其转换为列表 - self.G.nodes[concept]["memory_items"] = [self.G.nodes[concept]["memory_items"]] - self.G.nodes[concept]["memory_items"].append(memory) - # 更新最后修改时间 - self.G.nodes[concept]["last_modified"] = current_time - else: - self.G.nodes[concept]["memory_items"] = [memory] - self.G.nodes[concept]["last_modified"] = current_time - else: - # 如果是新节点,创建新的记忆列表 - self.G.add_node(concept, memory_items=[memory], created_time=current_time, last_modified=current_time) - - def get_dot(self, concept): - # 检查节点是否存在于图中 - if concept in self.G: - # 从图中获取节点数据 - node_data = self.G.nodes[concept] - return concept, node_data - return None - - def get_related_item(self, topic, depth=1): - if topic not in self.G: - return [], [] - - first_layer_items = [] - second_layer_items = [] - - # 获取相邻节点 - neighbors = list(self.G.neighbors(topic)) - - # 获取当前节点的记忆项 - node_data = self.get_dot(topic) - if node_data: - concept, data = node_data - if "memory_items" in data: - memory_items = data["memory_items"] - if isinstance(memory_items, list): - first_layer_items.extend(memory_items) - else: - first_layer_items.append(memory_items) - - # 只在depth=2时获取第二层记忆 - if depth >= 2: - # 获取相邻节点的记忆项 - for neighbor in neighbors: - node_data = self.get_dot(neighbor) - if node_data: - concept, data = node_data - if "memory_items" in data: - memory_items = data["memory_items"] - if isinstance(memory_items, list): - second_layer_items.extend(memory_items) - else: - second_layer_items.append(memory_items) - - return first_layer_items, second_layer_items - - @property - def dots(self): - # 返回所有节点对应的 Memory_dot 对象 - return [self.get_dot(node) for node in self.G.nodes()] - - -# 海马体 -class Hippocampus: - def __init__(self, memory_graph: Memory_graph): - self.memory_graph = memory_graph - self.memory_cortex = Memory_cortex(memory_graph) - self.llm_model = LLMModel() - self.llm_model_small = LLMModel(model_name="deepseek-ai/DeepSeek-V2.5") - self.llm_model_get_topic = LLMModel(model_name="Pro/Qwen/Qwen2.5-7B-Instruct") - self.llm_model_summary = LLMModel(model_name="Qwen/Qwen2.5-32B-Instruct") - - def get_memory_sample(self, chat_size=20, time_frequency=None): - """获取记忆样本 - - Returns: - list: 消息记录列表,每个元素是一个消息记录字典列表 - """ - if time_frequency is None: - time_frequency = {"near": 2, "mid": 4, "far": 3} - current_timestamp = datetime.datetime.now().timestamp() - chat_samples = [] - - # 短期:1h 中期:4h 长期:24h - for _ in range(time_frequency.get("near")): - random_time = current_timestamp - random.randint(1, 3600 * 4) - messages = get_closest_chat_from_db(length=chat_size, timestamp=random_time) - if messages: - chat_samples.append(messages) - - for _ in range(time_frequency.get("mid")): - random_time = current_timestamp - random.randint(3600 * 4, 3600 * 24) - messages = get_closest_chat_from_db(length=chat_size, timestamp=random_time) - if messages: - chat_samples.append(messages) - - for _ in range(time_frequency.get("far")): - random_time = current_timestamp - random.randint(3600 * 24, 3600 * 24 * 7) - messages = get_closest_chat_from_db(length=chat_size, timestamp=random_time) - if messages: - chat_samples.append(messages) - - return chat_samples - - def calculate_topic_num(self, text, compress_rate): - """计算文本的话题数量""" - information_content = calculate_information_content(text) - topic_by_length = text.count("\n") * compress_rate - topic_by_information_content = max(1, min(5, int((information_content - 3) * 2))) - topic_num = int((topic_by_length + topic_by_information_content) / 2) - print( - f"topic_by_length: {topic_by_length}, topic_by_information_content: {topic_by_information_content}, " - f"topic_num: {topic_num}" - ) - return topic_num - - async def memory_compress(self, messages: list, compress_rate=0.1): - """压缩消息记录为记忆 - - Args: - messages: 消息记录字典列表,每个字典包含text和time字段 - compress_rate: 压缩率 - - Returns: - tuple: (压缩记忆集合, 相似主题字典) - - 压缩记忆集合: set of (话题, 记忆) 元组 - - 相似主题字典: dict of {话题: [(相似主题, 相似度), ...]} - """ - if not messages: - return set(), {} - - # 合并消息文本,同时保留时间信息 - input_text = "" - time_info = "" - # 计算最早和最晚时间 - earliest_time = min(msg["time"] for msg in messages) - latest_time = max(msg["time"] for msg in messages) - - earliest_dt = datetime.datetime.fromtimestamp(earliest_time) - latest_dt = datetime.datetime.fromtimestamp(latest_time) - - # 如果是同一年 - if earliest_dt.year == latest_dt.year: - earliest_str = earliest_dt.strftime("%m-%d %H:%M:%S") - latest_str = latest_dt.strftime("%m-%d %H:%M:%S") - time_info += f"是在{earliest_dt.year}年,{earliest_str} 到 {latest_str} 的对话:\n" - else: - earliest_str = earliest_dt.strftime("%Y-%m-%d %H:%M:%S") - latest_str = latest_dt.strftime("%Y-%m-%d %H:%M:%S") - time_info += f"是从 {earliest_str} 到 {latest_str} 的对话:\n" - - for msg in messages: - input_text += f"{msg['text']}\n" - - print(input_text) - - topic_num = self.calculate_topic_num(input_text, compress_rate) - topics_response = self.llm_model_get_topic.generate_response(self.find_topic_llm(input_text, topic_num)) - - # 过滤topics - filter_keywords = ["表情包", "图片", "回复", "聊天记录"] - topics = [ - topic.strip() - for topic in topics_response[0].replace(",", ",").replace("、", ",").replace(" ", ",").split(",") - if topic.strip() - ] - filtered_topics = [topic for topic in topics if not any(keyword in topic for keyword in filter_keywords)] - - print(f"过滤后话题: {filtered_topics}") - - # 为每个话题查找相似的已存在主题 - print("\n检查相似主题:") - similar_topics_dict = {} # 存储每个话题的相似主题列表 - - for topic in filtered_topics: - # 获取所有现有节点 - existing_topics = list(self.memory_graph.G.nodes()) - similar_topics = [] - - # 对每个现有节点计算相似度 - for existing_topic in existing_topics: - # 使用jieba分词并计算余弦相似度 - topic_words = set(jieba.cut(topic)) - existing_words = set(jieba.cut(existing_topic)) - - # 计算词向量 - all_words = topic_words | existing_words - v1 = [1 if word in topic_words else 0 for word in all_words] - v2 = [1 if word in existing_words else 0 for word in all_words] - - # 计算余弦相似度 - similarity = cosine_similarity(v1, v2) - - # 如果相似度超过阈值,添加到结果中 - if similarity >= 0.6: # 设置相似度阈值 - similar_topics.append((existing_topic, similarity)) - - # 按相似度降序排序 - similar_topics.sort(key=lambda x: x[1], reverse=True) - # 只保留前5个最相似的主题 - similar_topics = similar_topics[:5] - - # 存储到字典中 - similar_topics_dict[topic] = similar_topics - - # 输出结果 - if similar_topics: - print(f"\n主题「{topic}」的相似主题:") - for similar_topic, score in similar_topics: - print(f"- {similar_topic} (相似度: {score:.3f})") - else: - print(f"\n主题「{topic}」没有找到相似主题") - - # 创建所有话题的请求任务 - tasks = [] - for topic in filtered_topics: - topic_what_prompt = self.topic_what(input_text, topic, time_info) - # 创建异步任务 - task = self.llm_model_small.generate_response_async(topic_what_prompt) - tasks.append((topic.strip(), task)) - - # 等待所有任务完成 - compressed_memory = set() - for topic, task in tasks: - response = await task - if response: - compressed_memory.add((topic, response[0])) - - return compressed_memory, similar_topics_dict - - async def operation_build_memory(self, chat_size=12): - # 最近消息获取频率 - time_frequency = {"near": 3, "mid": 8, "far": 5} - memory_samples = self.get_memory_sample(chat_size, time_frequency) - - all_topics = [] # 用于存储所有话题 - - for i, messages in enumerate(memory_samples, 1): - # 加载进度可视化 - all_topics = [] - progress = (i / len(memory_samples)) * 100 - bar_length = 30 - filled_length = int(bar_length * i // len(memory_samples)) - bar = "█" * filled_length + "-" * (bar_length - filled_length) - print(f"\n进度: [{bar}] {progress:.1f}% ({i}/{len(memory_samples)})") - - # 生成压缩后记忆 - compress_rate = 0.1 - compressed_memory, similar_topics_dict = await self.memory_compress(messages, compress_rate) - print( - f"\033[1;33m压缩后记忆数量\033[0m: {len(compressed_memory)},似曾相识的话题: {len(similar_topics_dict)}" - ) - - # 将记忆加入到图谱中 - for topic, memory in compressed_memory: - print(f"\033[1;32m添加节点\033[0m: {topic}") - self.memory_graph.add_dot(topic, memory) - all_topics.append(topic) - - # 连接相似的已存在主题 - if topic in similar_topics_dict: - similar_topics = similar_topics_dict[topic] - for similar_topic, similarity in similar_topics: - # 避免自连接 - if topic != similar_topic: - # 根据相似度设置连接强度 - strength = int(similarity * 10) # 将0.3-1.0的相似度映射到3-10的强度 - print(f"\033[1;36m连接相似节点\033[0m: {topic} 和 {similar_topic} (强度: {strength})") - # 使用相似度作为初始连接强度 - self.memory_graph.G.add_edge(topic, similar_topic, strength=strength) - - # 连接同批次的相关话题 - for i in range(len(all_topics)): - for j in range(i + 1, len(all_topics)): - print(f"\033[1;32m连接同批次节点\033[0m: {all_topics[i]} 和 {all_topics[j]}") - self.memory_graph.connect_dot(all_topics[i], all_topics[j]) - - self.memory_cortex.sync_memory_to_db() - - def forget_connection(self, source, target): - """ - 检查并可能遗忘一个连接 - - Args: - source: 连接的源节点 - target: 连接的目标节点 - - Returns: - tuple: (是否有变化, 变化类型, 变化详情) - 变化类型: 0-无变化, 1-强度减少, 2-连接移除 - """ - current_time = datetime.datetime.now().timestamp() - # 获取边的属性 - edge_data = self.memory_graph.G[source][target] - last_modified = edge_data.get("last_modified", current_time) - - # 如果连接超过7天未更新 - if current_time - last_modified > 6000: # test - # 获取当前强度 - current_strength = edge_data.get("strength", 1) - # 减少连接强度 - new_strength = current_strength - 1 - edge_data["strength"] = new_strength - edge_data["last_modified"] = current_time - - # 如果强度降为0,移除连接 - if new_strength <= 0: - self.memory_graph.G.remove_edge(source, target) - return True, 2, f"移除连接: {source} - {target} (强度降至0)" - else: - return True, 1, f"减弱连接: {source} - {target} (强度: {current_strength} -> {new_strength})" - - return False, 0, "" - - def forget_topic(self, topic): - """ - 检查并可能遗忘一个话题的记忆 - - Args: - topic: 要检查的话题 - - Returns: - tuple: (是否有变化, 变化类型, 变化详情) - 变化类型: 0-无变化, 1-记忆减少, 2-节点移除 - """ - current_time = datetime.datetime.now().timestamp() - # 获取节点的最后修改时间 - node_data = self.memory_graph.G.nodes[topic] - last_modified = node_data.get("last_modified", current_time) - - # 如果话题超过7天未更新 - if current_time - last_modified > 3000: # test - memory_items = node_data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - if memory_items: - # 获取当前记忆数量 - current_count = len(memory_items) - # 随机选择一条记忆删除 - removed_item = random.choice(memory_items) - memory_items.remove(removed_item) - - if memory_items: - # 更新节点的记忆项和最后修改时间 - self.memory_graph.G.nodes[topic]["memory_items"] = memory_items - self.memory_graph.G.nodes[topic]["last_modified"] = current_time - return ( - True, - 1, - f"减少记忆: {topic} (记忆数量: {current_count} -> " - f"{len(memory_items)})\n被移除的记忆: {removed_item}", - ) - else: - # 如果没有记忆了,删除节点及其所有连接 - self.memory_graph.G.remove_node(topic) - return True, 2, f"移除节点: {topic} (无剩余记忆)\n最后一条记忆: {removed_item}" - - return False, 0, "" - - async def operation_forget_topic(self, percentage=0.1): - """ - 随机选择图中一定比例的节点和边进行检查,根据时间条件决定是否遗忘 - - Args: - percentage: 要检查的节点和边的比例,默认为0.1(10%) - """ - # 获取所有节点和边 - all_nodes = list(self.memory_graph.G.nodes()) - all_edges = list(self.memory_graph.G.edges()) - - # 计算要检查的数量 - check_nodes_count = max(1, int(len(all_nodes) * percentage)) - check_edges_count = max(1, int(len(all_edges) * percentage)) - - # 随机选择要检查的节点和边 - nodes_to_check = random.sample(all_nodes, check_nodes_count) - edges_to_check = random.sample(all_edges, check_edges_count) - - # 用于统计不同类型的变化 - edge_changes = {"weakened": 0, "removed": 0} - node_changes = {"reduced": 0, "removed": 0} - - # 检查并遗忘连接 - print("\n开始检查连接...") - for source, target in edges_to_check: - changed, change_type, details = self.forget_connection(source, target) - if changed: - if change_type == 1: - edge_changes["weakened"] += 1 - logger.info(f"\033[1;34m[连接减弱]\033[0m {details}") - elif change_type == 2: - edge_changes["removed"] += 1 - logger.info(f"\033[1;31m[连接移除]\033[0m {details}") - - # 检查并遗忘话题 - print("\n开始检查节点...") - for node in nodes_to_check: - changed, change_type, details = self.forget_topic(node) - if changed: - if change_type == 1: - node_changes["reduced"] += 1 - logger.info(f"\033[1;33m[记忆减少]\033[0m {details}") - elif change_type == 2: - node_changes["removed"] += 1 - logger.info(f"\033[1;31m[节点移除]\033[0m {details}") - - # 同步到数据库 - if any(count > 0 for count in edge_changes.values()) or any(count > 0 for count in node_changes.values()): - self.memory_cortex.sync_memory_to_db() - print("\n遗忘操作统计:") - print(f"连接变化: {edge_changes['weakened']} 个减弱, {edge_changes['removed']} 个移除") - print(f"节点变化: {node_changes['reduced']} 个减少记忆, {node_changes['removed']} 个移除") - else: - print("\n本次检查没有节点或连接满足遗忘条件") - - async def merge_memory(self, topic): - """ - 对指定话题的记忆进行合并压缩 - - Args: - topic: 要合并的话题节点 - """ - # 获取节点的记忆项 - memory_items = self.memory_graph.G.nodes[topic].get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - # 如果记忆项不足,直接返回 - if len(memory_items) < 10: - return - - # 随机选择10条记忆 - selected_memories = random.sample(memory_items, 10) - - # 拼接成文本 - merged_text = "\n".join(selected_memories) - print(f"\n[合并记忆] 话题: {topic}") - print(f"选择的记忆:\n{merged_text}") - - # 使用memory_compress生成新的压缩记忆 - compressed_memories, _ = await self.memory_compress(selected_memories, 0.1) - - # 从原记忆列表中移除被选中的记忆 - for memory in selected_memories: - memory_items.remove(memory) - - # 添加新的压缩记忆 - for _, compressed_memory in compressed_memories: - memory_items.append(compressed_memory) - print(f"添加压缩记忆: {compressed_memory}") - - # 更新节点的记忆项 - self.memory_graph.G.nodes[topic]["memory_items"] = memory_items - print(f"完成记忆合并,当前记忆数量: {len(memory_items)}") - - async def operation_merge_memory(self, percentage=0.1): - """ - 随机检查一定比例的节点,对内容数量超过100的节点进行记忆合并 - - Args: - percentage: 要检查的节点比例,默认为0.1(10%) - """ - # 获取所有节点 - all_nodes = list(self.memory_graph.G.nodes()) - # 计算要检查的节点数量 - check_count = max(1, int(len(all_nodes) * percentage)) - # 随机选择节点 - nodes_to_check = random.sample(all_nodes, check_count) - - merged_nodes = [] - for node in nodes_to_check: - # 获取节点的内容条数 - memory_items = self.memory_graph.G.nodes[node].get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - content_count = len(memory_items) - - # 如果内容数量超过100,进行合并 - if content_count > 100: - print(f"\n检查节点: {node}, 当前记忆数量: {content_count}") - await self.merge_memory(node) - merged_nodes.append(node) - - # 同步到数据库 - if merged_nodes: - self.memory_cortex.sync_memory_to_db() - print(f"\n完成记忆合并操作,共处理 {len(merged_nodes)} 个节点") - else: - print("\n本次检查没有需要合并的节点") - - async def _identify_topics(self, text: str) -> list: - """从文本中识别可能的主题""" - topics_response = self.llm_model_get_topic.generate_response(self.find_topic_llm(text, 5)) - topics = [ - topic.strip() - for topic in topics_response[0].replace(",", ",").replace("、", ",").replace(" ", ",").split(",") - if topic.strip() - ] - return topics - - def _find_similar_topics(self, topics: list, similarity_threshold: float = 0.4, debug_info: str = "") -> list: - """查找与给定主题相似的记忆主题""" - all_memory_topics = list(self.memory_graph.G.nodes()) - all_similar_topics = [] - - for topic in topics: - if debug_info: - pass - - topic_vector = text_to_vector(topic) - - for memory_topic in all_memory_topics: - memory_vector = text_to_vector(memory_topic) - all_words = set(topic_vector.keys()) | set(memory_vector.keys()) - v1 = [topic_vector.get(word, 0) for word in all_words] - v2 = [memory_vector.get(word, 0) for word in all_words] - similarity = cosine_similarity(v1, v2) - - if similarity >= similarity_threshold: - all_similar_topics.append((memory_topic, similarity)) - - return all_similar_topics - - def _get_top_topics(self, similar_topics: list, max_topics: int = 5) -> list: - """获取相似度最高的主题""" - seen_topics = set() - top_topics = [] - - for topic, score in sorted(similar_topics, key=lambda x: x[1], reverse=True): - if topic not in seen_topics and len(top_topics) < max_topics: - seen_topics.add(topic) - top_topics.append((topic, score)) - - return top_topics - - async def memory_activate_value(self, text: str, max_topics: int = 5, similarity_threshold: float = 0.3) -> int: - """计算输入文本对记忆的激活程度""" - logger.info(f"[记忆激活]识别主题: {await self._identify_topics(text)}") - - identified_topics = await self._identify_topics(text) - if not identified_topics: - return 0 - - all_similar_topics = self._find_similar_topics( - identified_topics, similarity_threshold=similarity_threshold, debug_info="记忆激活" - ) - - if not all_similar_topics: - return 0 - - top_topics = self._get_top_topics(all_similar_topics, max_topics) - - if len(top_topics) == 1: - topic, score = top_topics[0] - memory_items = self.memory_graph.G.nodes[topic].get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - content_count = len(memory_items) - penalty = 1.0 / (1 + math.log(content_count + 1)) - - activation = int(score * 50 * penalty) - print( - f"\033[1;32m[记忆激活]\033[0m 单主题「{topic}」- 相似度: {score:.3f}, 内容数: {content_count}, " - f"激活值: {activation}" - ) - return activation - - matched_topics = set() - topic_similarities = {} - - for memory_topic, _similarity in top_topics: - memory_items = self.memory_graph.G.nodes[memory_topic].get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - content_count = len(memory_items) - penalty = 1.0 / (1 + math.log(content_count + 1)) - - for input_topic in identified_topics: - topic_vector = text_to_vector(input_topic) - memory_vector = text_to_vector(memory_topic) - all_words = set(topic_vector.keys()) | set(memory_vector.keys()) - v1 = [topic_vector.get(word, 0) for word in all_words] - v2 = [memory_vector.get(word, 0) for word in all_words] - sim = cosine_similarity(v1, v2) - if sim >= similarity_threshold: - matched_topics.add(input_topic) - adjusted_sim = sim * penalty - topic_similarities[input_topic] = max(topic_similarities.get(input_topic, 0), adjusted_sim) - print( - f"\033[1;32m[记忆激活]\033[0m 主题「{input_topic}」-> " - f"「{memory_topic}」(内容数: {content_count}, " - f"相似度: {adjusted_sim:.3f})" - ) - - topic_match = len(matched_topics) / len(identified_topics) - average_similarities = sum(topic_similarities.values()) / len(topic_similarities) if topic_similarities else 0 - - activation = int((topic_match + average_similarities) / 2 * 100) - print( - f"\033[1;32m[记忆激活]\033[0m 匹配率: {topic_match:.3f}, 平均相似度: {average_similarities:.3f}, " - f"激活值: {activation}" - ) - - return activation - - async def get_relevant_memories( - self, text: str, max_topics: int = 5, similarity_threshold: float = 0.4, max_memory_num: int = 5 - ) -> list: - """根据输入文本获取相关的记忆内容""" - identified_topics = await self._identify_topics(text) - - all_similar_topics = self._find_similar_topics( - identified_topics, similarity_threshold=similarity_threshold, debug_info="记忆检索" - ) - - relevant_topics = self._get_top_topics(all_similar_topics, max_topics) - - relevant_memories = [] - for topic, score in relevant_topics: - first_layer, _ = self.memory_graph.get_related_item(topic, depth=1) - if first_layer: - if len(first_layer) > max_memory_num / 2: - first_layer = random.sample(first_layer, max_memory_num // 2) - for memory in first_layer: - relevant_memories.append({"topic": topic, "similarity": score, "content": memory}) - - relevant_memories.sort(key=lambda x: x["similarity"], reverse=True) - - if len(relevant_memories) > max_memory_num: - relevant_memories = random.sample(relevant_memories, max_memory_num) - - return relevant_memories - - def find_topic_llm(self, text, topic_num): - prompt = ( - f"这是一段文字:{text}。请你从这段话中总结出{topic_num}个关键的概念,可以是名词,动词,或者特定人物,帮我列出来," - f"用逗号,隔开,尽可能精简。只需要列举{topic_num}个话题就好,不要有序号,不要告诉我其他内容。" - ) - return prompt - - def topic_what(self, text, topic, time_info): - prompt = ( - f'这是一段文字,{time_info}:{text}。我想让你基于这段文字来概括"{topic}"这个概念,帮我总结成一句自然的话,' - f"可以包含时间和人物,以及具体的观点。只输出这句话就好" - ) - return prompt - - -def segment_text(text): - """使用jieba进行文本分词""" - seg_text = list(jieba.cut(text)) - return seg_text - - -def text_to_vector(text): - """将文本转换为词频向量""" - words = segment_text(text) - vector = {} - for word in words: - vector[word] = vector.get(word, 0) + 1 - return vector - - -def cosine_similarity(v1, v2): - """计算两个向量的余弦相似度""" - dot_product = sum(a * b for a, b in zip(v1, v2)) - norm1 = math.sqrt(sum(a * a for a in v1)) - norm2 = math.sqrt(sum(b * b for b in v2)) - if norm1 == 0 or norm2 == 0: - return 0 - return dot_product / (norm1 * norm2) - - -def visualize_graph_lite(memory_graph: Memory_graph, color_by_memory: bool = False): - # 设置中文字体 - plt.rcParams["font.sans-serif"] = ["SimHei"] # 用来正常显示中文标签 - plt.rcParams["axes.unicode_minus"] = False # 用来正常显示负号 - - G = memory_graph.G - - # 创建一个新图用于可视化 - H = G.copy() - - # 过滤掉内容数量小于2的节点 - nodes_to_remove = [] - for node in H.nodes(): - memory_items = H.nodes[node].get("memory_items", []) - memory_count = len(memory_items) if isinstance(memory_items, list) else (1 if memory_items else 0) - if memory_count < 2: - nodes_to_remove.append(node) - - H.remove_nodes_from(nodes_to_remove) - - # 如果没有符合条件的节点,直接返回 - if len(H.nodes()) == 0: - print("没有找到内容数量大于等于2的节点") - return - - # 计算节点大小和颜色 - node_colors = [] - node_sizes = [] - nodes = list(H.nodes()) - - # 获取最大记忆数用于归一化节点大小 - max_memories = 1 - for node in nodes: - memory_items = H.nodes[node].get("memory_items", []) - memory_count = len(memory_items) if isinstance(memory_items, list) else (1 if memory_items else 0) - max_memories = max(max_memories, memory_count) - - # 计算每个节点的大小和颜色 - for node in nodes: - # 计算节点大小(基于记忆数量) - memory_items = H.nodes[node].get("memory_items", []) - memory_count = len(memory_items) if isinstance(memory_items, list) else (1 if memory_items else 0) - # 使用指数函数使变化更明显 - ratio = memory_count / max_memories - size = 400 + 2000 * (ratio**2) # 增大节点大小 - node_sizes.append(size) - - # 计算节点颜色(基于连接数) - degree = H.degree(node) - if degree >= 30: - node_colors.append((1.0, 0, 0)) # 亮红色 (#FF0000) - else: - # 将1-10映射到0-1的范围 - color_ratio = (degree - 1) / 29.0 if degree > 1 else 0 - # 使用蓝到红的渐变 - red = min(0.9, color_ratio) - blue = max(0.0, 1.0 - color_ratio) - node_colors.append((red, 0, blue)) - - # 绘制图形 - plt.figure(figsize=(16, 12)) # 减小图形尺寸 - pos = nx.spring_layout( - H, - k=1, # 调整节点间斥力 - iterations=100, # 增加迭代次数 - scale=1.5, # 减小布局尺寸 - weight="strength", - ) # 使用边的strength属性作为权重 - - nx.draw( - H, - pos, - with_labels=True, - node_color=node_colors, - node_size=node_sizes, - font_size=12, # 保持增大的字体大小 - font_family="SimHei", - font_weight="bold", - edge_color="gray", - width=1.5, - ) # 统一的边宽度 - - title = """记忆图谱可视化(仅显示内容≥2的节点) -节点大小表示记忆数量 -节点颜色:蓝(弱连接)到红(强连接)渐变,边的透明度表示连接强度 -连接强度越大的节点距离越近""" - plt.title(title, fontsize=16, fontfamily="SimHei") - plt.show() - - -async def main(): - # 初始化数据库 - logger.info("正在初始化数据库连接...") - start_time = time.time() - - test_pare = { - "do_build_memory": True, - "do_forget_topic": False, - "do_visualize_graph": True, - "do_query": False, - "do_merge_memory": False, - } - - # 创建记忆图 - memory_graph = Memory_graph() - - # 创建海马体 - hippocampus = Hippocampus(memory_graph) - - # 从数据库同步数据 - hippocampus.memory_cortex.sync_memory_from_db() - - end_time = time.time() - logger.info(f"\033[32m[加载海马体耗时: {end_time - start_time:.2f} 秒]\033[0m") - - # 构建记忆 - if test_pare["do_build_memory"]: - logger.info("开始构建记忆...") - chat_size = 20 - await hippocampus.operation_build_memory(chat_size=chat_size) - - end_time = time.time() - logger.info( - f"\033[32m[构建记忆耗时: {end_time - start_time:.2f} 秒,chat_size={chat_size},chat_count = 16]\033[0m" - ) - - if test_pare["do_forget_topic"]: - logger.info("开始遗忘记忆...") - await hippocampus.operation_forget_topic(percentage=0.01) - - end_time = time.time() - logger.info(f"\033[32m[遗忘记忆耗时: {end_time - start_time:.2f} 秒]\033[0m") - - if test_pare["do_merge_memory"]: - logger.info("开始合并记忆...") - await hippocampus.operation_merge_memory(percentage=0.1) - - end_time = time.time() - logger.info(f"\033[32m[合并记忆耗时: {end_time - start_time:.2f} 秒]\033[0m") - - if test_pare["do_visualize_graph"]: - # 展示优化后的图形 - logger.info("生成记忆图谱可视化...") - print("\n生成优化后的记忆图谱:") - visualize_graph_lite(memory_graph) - - if test_pare["do_query"]: - # 交互式查询 - while True: - query = input("\n请输入新的查询概念(输入'退出'以结束):") - if query.lower() == "退出": - break - - items_list = memory_graph.get_related_item(query) - if items_list: - first_layer, second_layer = items_list - if first_layer: - print("\n直接相关的记忆:") - for item in first_layer: - print(f"- {item}") - if second_layer: - print("\n间接相关的记忆:") - for item in second_layer: - print(f"- {item}") - else: - print("未找到相关记忆。") - - -if __name__ == "__main__": - import asyncio - - asyncio.run(main()) diff --git a/src/plugins/memory_system/sample_distribution.py b/src/plugins/memory_system/sample_distribution.py new file mode 100644 index 000000000..1d285f7b4 --- /dev/null +++ b/src/plugins/memory_system/sample_distribution.py @@ -0,0 +1,172 @@ +import numpy as np +import matplotlib.pyplot as plt +from scipy import stats +import time +from datetime import datetime, timedelta + +class DistributionVisualizer: + def __init__(self, mean=0, std=1, skewness=0, sample_size=10): + """ + 初始化分布可视化器 + + 参数: + mean (float): 期望均值 + std (float): 标准差 + skewness (float): 偏度 + sample_size (int): 样本大小 + """ + self.mean = mean + self.std = std + self.skewness = skewness + self.sample_size = sample_size + self.samples = None + + def generate_samples(self): + """生成具有指定参数的样本""" + if self.skewness == 0: + # 对于无偏度的情况,直接使用正态分布 + self.samples = np.random.normal(loc=self.mean, scale=self.std, size=self.sample_size) + else: + # 使用 scipy.stats 生成具有偏度的分布 + self.samples = stats.skewnorm.rvs(a=self.skewness, + loc=self.mean, + scale=self.std, + size=self.sample_size) + + def get_weighted_samples(self): + """获取加权后的样本数列""" + if self.samples is None: + self.generate_samples() + # 将样本值乘以样本大小 + return self.samples * self.sample_size + + def get_statistics(self): + """获取分布的统计信息""" + if self.samples is None: + self.generate_samples() + + return { + "均值": np.mean(self.samples), + "标准差": np.std(self.samples), + "实际偏度": stats.skew(self.samples) + } + +class MemoryBuildScheduler: + def __init__(self, + n_hours1, std_hours1, weight1, + n_hours2, std_hours2, weight2, + total_samples=50): + """ + 初始化记忆构建调度器 + + 参数: + n_hours1 (float): 第一个分布的均值(距离现在的小时数) + std_hours1 (float): 第一个分布的标准差(小时) + weight1 (float): 第一个分布的权重 + n_hours2 (float): 第二个分布的均值(距离现在的小时数) + std_hours2 (float): 第二个分布的标准差(小时) + weight2 (float): 第二个分布的权重 + total_samples (int): 要生成的总时间点数量 + """ + # 归一化权重 + total_weight = weight1 + weight2 + self.weight1 = weight1 / total_weight + self.weight2 = weight2 / total_weight + + self.n_hours1 = n_hours1 + self.std_hours1 = std_hours1 + self.n_hours2 = n_hours2 + self.std_hours2 = std_hours2 + self.total_samples = total_samples + self.base_time = datetime.now() + + def generate_time_samples(self): + """生成混合分布的时间采样点""" + # 根据权重计算每个分布的样本数 + samples1 = int(self.total_samples * self.weight1) + samples2 = self.total_samples - samples1 + + # 生成两个正态分布的小时偏移 + hours_offset1 = np.random.normal( + loc=self.n_hours1, + scale=self.std_hours1, + size=samples1 + ) + + hours_offset2 = np.random.normal( + loc=self.n_hours2, + scale=self.std_hours2, + size=samples2 + ) + + # 合并两个分布的偏移 + hours_offset = np.concatenate([hours_offset1, hours_offset2]) + + # 将偏移转换为实际时间戳(使用绝对值确保时间点在过去) + timestamps = [self.base_time - timedelta(hours=abs(offset)) for offset in hours_offset] + + # 按时间排序(从最早到最近) + return sorted(timestamps) + + def get_timestamp_array(self): + """返回时间戳数组""" + timestamps = self.generate_time_samples() + return [int(t.timestamp()) for t in timestamps] + +def print_time_samples(timestamps, show_distribution=True): + """打印时间样本和分布信息""" + print(f"\n生成的{len(timestamps)}个时间点分布:") + print("序号".ljust(5), "时间戳".ljust(25), "距现在(小时)") + print("-" * 50) + + now = datetime.now() + time_diffs = [] + + for i, timestamp in enumerate(timestamps, 1): + hours_diff = (now - timestamp).total_seconds() / 3600 + time_diffs.append(hours_diff) + print(f"{str(i).ljust(5)} {timestamp.strftime('%Y-%m-%d %H:%M:%S').ljust(25)} {hours_diff:.2f}") + + # 打印统计信息 + print("\n统计信息:") + print(f"平均时间偏移:{np.mean(time_diffs):.2f}小时") + print(f"标准差:{np.std(time_diffs):.2f}小时") + print(f"最早时间:{min(timestamps).strftime('%Y-%m-%d %H:%M:%S')} ({max(time_diffs):.2f}小时前)") + print(f"最近时间:{max(timestamps).strftime('%Y-%m-%d %H:%M:%S')} ({min(time_diffs):.2f}小时前)") + + if show_distribution: + # 计算时间分布的直方图 + hist, bins = np.histogram(time_diffs, bins=40) + print("\n时间分布(每个*代表一个时间点):") + for i in range(len(hist)): + if hist[i] > 0: + print(f"{bins[i]:6.1f}-{bins[i+1]:6.1f}小时: {'*' * int(hist[i])}") + +# 使用示例 +if __name__ == "__main__": + # 创建一个双峰分布的记忆调度器 + scheduler = MemoryBuildScheduler( + n_hours1=12, # 第一个分布均值(12小时前) + std_hours1=8, # 第一个分布标准差 + weight1=0.7, # 第一个分布权重 70% + n_hours2=36, # 第二个分布均值(36小时前) + std_hours2=24, # 第二个分布标准差 + weight2=0.3, # 第二个分布权重 30% + total_samples=50 # 总共生成50个时间点 + ) + + # 生成时间分布 + timestamps = scheduler.generate_time_samples() + + # 打印结果,包含分布可视化 + print_time_samples(timestamps, show_distribution=True) + + # 打印时间戳数组 + timestamp_array = scheduler.get_timestamp_array() + print("\n时间戳数组(Unix时间戳):") + print("[", end="") + for i, ts in enumerate(timestamp_array): + if i > 0: + print(", ", end="") + print(ts, end="") + print("]") \ No newline at end of file diff --git a/src/plugins/schedule/offline_llm.py b/src/plugins/schedule/offline_llm.py new file mode 100644 index 000000000..e4dc23f93 --- /dev/null +++ b/src/plugins/schedule/offline_llm.py @@ -0,0 +1,123 @@ +import asyncio +import os +import time +from typing import Tuple, Union + +import aiohttp +import requests +from src.common.logger import get_module_logger + +logger = get_module_logger("offline_llm") + + +class LLMModel: + def __init__(self, model_name="deepseek-ai/DeepSeek-V3", **kwargs): + self.model_name = model_name + self.params = kwargs + self.api_key = os.getenv("SILICONFLOW_KEY") + self.base_url = os.getenv("SILICONFLOW_BASE_URL") + + if not self.api_key or not self.base_url: + raise ValueError("环境变量未正确加载:SILICONFLOW_KEY 或 SILICONFLOW_BASE_URL 未设置") + + logger.info(f"API URL: {self.base_url}") # 使用 logger 记录 base_url + + def generate_response(self, prompt: str) -> Union[str, Tuple[str, str]]: + """根据输入的提示生成模型的响应""" + headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} + + # 构建请求体 + data = { + "model": self.model_name, + "messages": [{"role": "user", "content": prompt}], + "temperature": 0.5, + **self.params, + } + + # 发送请求到完整的 chat/completions 端点 + api_url = f"{self.base_url.rstrip('/')}/chat/completions" + logger.info(f"Request URL: {api_url}") # 记录请求的 URL + + max_retries = 3 + base_wait_time = 15 # 基础等待时间(秒) + + for retry in range(max_retries): + try: + response = requests.post(api_url, headers=headers, json=data) + + if response.status_code == 429: + wait_time = base_wait_time * (2**retry) # 指数退避 + logger.warning(f"遇到请求限制(429),等待{wait_time}秒后重试...") + time.sleep(wait_time) + continue + + response.raise_for_status() # 检查其他响应状态 + + result = response.json() + if "choices" in result and len(result["choices"]) > 0: + content = result["choices"][0]["message"]["content"] + reasoning_content = result["choices"][0]["message"].get("reasoning_content", "") + return content, reasoning_content + return "没有返回结果", "" + + except Exception as e: + if retry < max_retries - 1: # 如果还有重试机会 + wait_time = base_wait_time * (2**retry) + logger.error(f"[回复]请求失败,等待{wait_time}秒后重试... 错误: {str(e)}") + time.sleep(wait_time) + else: + logger.error(f"请求失败: {str(e)}") + return f"请求失败: {str(e)}", "" + + logger.error("达到最大重试次数,请求仍然失败") + return "达到最大重试次数,请求仍然失败", "" + + async def generate_response_async(self, prompt: str) -> Union[str, Tuple[str, str]]: + """异步方式根据输入的提示生成模型的响应""" + headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} + + # 构建请求体 + data = { + "model": self.model_name, + "messages": [{"role": "user", "content": prompt}], + "temperature": 0.5, + **self.params, + } + + # 发送请求到完整的 chat/completions 端点 + api_url = f"{self.base_url.rstrip('/')}/chat/completions" + logger.info(f"Request URL: {api_url}") # 记录请求的 URL + + max_retries = 3 + base_wait_time = 15 + + async with aiohttp.ClientSession() as session: + for retry in range(max_retries): + try: + async with session.post(api_url, headers=headers, json=data) as response: + if response.status == 429: + wait_time = base_wait_time * (2**retry) # 指数退避 + logger.warning(f"遇到请求限制(429),等待{wait_time}秒后重试...") + await asyncio.sleep(wait_time) + continue + + response.raise_for_status() # 检查其他响应状态 + + result = await response.json() + if "choices" in result and len(result["choices"]) > 0: + content = result["choices"][0]["message"]["content"] + reasoning_content = result["choices"][0]["message"].get("reasoning_content", "") + return content, reasoning_content + return "没有返回结果", "" + + except Exception as e: + if retry < max_retries - 1: # 如果还有重试机会 + wait_time = base_wait_time * (2**retry) + logger.error(f"[回复]请求失败,等待{wait_time}秒后重试... 错误: {str(e)}") + await asyncio.sleep(wait_time) + else: + logger.error(f"请求失败: {str(e)}") + return f"请求失败: {str(e)}", "" + + logger.error("达到最大重试次数,请求仍然失败") + return "达到最大重试次数,请求仍然失败", "" diff --git a/src/plugins/schedule/schedule_generator copy.py b/src/plugins/schedule/schedule_generator copy.py new file mode 100644 index 000000000..7ebc00a54 --- /dev/null +++ b/src/plugins/schedule/schedule_generator copy.py @@ -0,0 +1,192 @@ +import datetime +import json +import re +import os +import sys +from typing import Dict, Union + +from nonebot import get_driver + +# 添加项目根目录到 Python 路径 +root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")) +sys.path.append(root_path) + +# from src.plugins.chat.config import global_config +from src.common.database import db # 使用正确的导入语法 +from src.plugins.schedule.offline_llm import LLMModel +from src.common.logger import get_module_logger + +logger = get_module_logger("scheduler") + + +class ScheduleGenerator: + enable_output: bool = True + + def __init__(self): + # 使用离线LLM模型 + self.llm_scheduler = LLMModel(model_name="Pro/deepseek-ai/DeepSeek-V3", temperature=0.9) + self.today_schedule_text = "" + self.today_schedule = {} + self.tomorrow_schedule_text = "" + self.tomorrow_schedule = {} + self.yesterday_schedule_text = "" + self.yesterday_schedule = {} + + async def initialize(self): + today = datetime.datetime.now() + tomorrow = datetime.datetime.now() + datetime.timedelta(days=1) + yesterday = datetime.datetime.now() - datetime.timedelta(days=1) + + self.today_schedule_text, self.today_schedule = await self.generate_daily_schedule(target_date=today) + self.tomorrow_schedule_text, self.tomorrow_schedule = await self.generate_daily_schedule( + target_date=tomorrow, read_only=True + ) + self.yesterday_schedule_text, self.yesterday_schedule = await self.generate_daily_schedule( + target_date=yesterday, read_only=True + ) + + async def generate_daily_schedule( + self, target_date: datetime.datetime = None, read_only: bool = False + ) -> Dict[str, str]: + date_str = target_date.strftime("%Y-%m-%d") + weekday = target_date.strftime("%A") + + schedule_text = str + + existing_schedule = db.schedule.find_one({"date": date_str}) + if existing_schedule: + if self.enable_output: + logger.debug(f"{date_str}的日程已存在:") + schedule_text = existing_schedule["schedule"] + # print(self.schedule_text) + + elif not read_only: + logger.debug(f"{date_str}的日程不存在,准备生成新的日程。") + prompt = ( + f"""我是{global_config.BOT_NICKNAME},{global_config.PROMPT_SCHEDULE_GEN},请为我生成{date_str}({weekday})的日程安排,包括:""" + + """ + 1. 早上的学习和工作安排 + 2. 下午的活动和任务 + 3. 晚上的计划和休息时间 + 请按照时间顺序列出具体时间点和对应的活动,用一个时间点而不是时间段来表示时间,用JSON格式返回日程表, + 仅返回内容,不要返回注释,不要添加任何markdown或代码块样式,时间采用24小时制, + 格式为{"时间": "活动","时间": "活动",...}。""" + ) + + try: + schedule_text, _ = self.llm_scheduler.generate_response(prompt) + db.schedule.insert_one({"date": date_str, "schedule": schedule_text}) + self.enable_output = True + except Exception as e: + logger.error(f"生成日程失败: {str(e)}") + schedule_text = "生成日程时出错了" + # print(self.schedule_text) + else: + if self.enable_output: + logger.debug(f"{date_str}的日程不存在。") + schedule_text = "忘了" + + return schedule_text, None + + schedule_form = self._parse_schedule(schedule_text) + return schedule_text, schedule_form + + def _parse_schedule(self, schedule_text: str) -> Union[bool, Dict[str, str]]: + """解析日程文本,转换为时间和活动的字典""" + try: + reg = r"\{(.|\r|\n)+\}" + matched = re.search(reg, schedule_text)[0] + schedule_dict = json.loads(matched) + return schedule_dict + except json.JSONDecodeError: + logger.exception("解析日程失败: {}".format(schedule_text)) + return False + + def _parse_time(self, time_str: str) -> str: + """解析时间字符串,转换为时间""" + return datetime.datetime.strptime(time_str, "%H:%M") + + def get_current_task(self) -> str: + """获取当前时间应该进行的任务""" + current_time = datetime.datetime.now().strftime("%H:%M") + + # 找到最接近当前时间的任务 + closest_time = None + min_diff = float("inf") + + # 检查今天的日程 + if not self.today_schedule: + return "摸鱼" + for time_str in self.today_schedule.keys(): + diff = abs(self._time_diff(current_time, time_str)) + if closest_time is None or diff < min_diff: + closest_time = time_str + min_diff = diff + + # 检查昨天的日程中的晚间任务 + if self.yesterday_schedule: + for time_str in self.yesterday_schedule.keys(): + if time_str >= "20:00": # 只考虑晚上8点之后的任务 + # 计算与昨天这个时间点的差异(需要加24小时) + diff = abs(self._time_diff(current_time, time_str)) + if diff < min_diff: + closest_time = time_str + min_diff = diff + return closest_time, self.yesterday_schedule[closest_time] + + if closest_time: + return closest_time, self.today_schedule[closest_time] + return "摸鱼" + + def _time_diff(self, time1: str, time2: str) -> int: + """计算两个时间字符串之间的分钟差""" + if time1 == "24:00": + time1 = "23:59" + if time2 == "24:00": + time2 = "23:59" + t1 = datetime.datetime.strptime(time1, "%H:%M") + t2 = datetime.datetime.strptime(time2, "%H:%M") + diff = int((t2 - t1).total_seconds() / 60) + # 考虑时间的循环性 + if diff < -720: + diff += 1440 # 加一天的分钟 + elif diff > 720: + diff -= 1440 # 减一天的分钟 + # print(f"时间1[{time1}]: 时间2[{time2}],差值[{diff}]分钟") + return diff + + def print_schedule(self): + """打印完整的日程安排""" + if not self._parse_schedule(self.today_schedule_text): + logger.warning("今日日程有误,将在下次运行时重新生成") + db.schedule.delete_one({"date": datetime.datetime.now().strftime("%Y-%m-%d")}) + else: + logger.info("=== 今日日程安排 ===") + for time_str, activity in self.today_schedule.items(): + logger.info(f"时间[{time_str}]: 活动[{activity}]") + logger.info("==================") + self.enable_output = False + + +async def main(): + # 使用示例 + scheduler = ScheduleGenerator() + await scheduler.initialize() + scheduler.print_schedule() + print("\n当前任务:") + print(await scheduler.get_current_task()) + + print("昨天日程:") + print(scheduler.yesterday_schedule) + print("今天日程:") + print(scheduler.today_schedule) + print("明天日程:") + print(scheduler.tomorrow_schedule) + +# 当作为组件导入时使用的实例 +bot_schedule = ScheduleGenerator() + +if __name__ == "__main__": + import asyncio + # 当直接运行此文件时执行 + asyncio.run(main()) diff --git a/src/plugins/schedule/schedule_generator.py b/src/plugins/schedule/schedule_generator.py index 11db6664d..3fabfa389 100644 --- a/src/plugins/schedule/schedule_generator.py +++ b/src/plugins/schedule/schedule_generator.py @@ -1,12 +1,15 @@ import datetime import json import re +import os +import sys from typing import Dict, Union from nonebot import get_driver -from src.plugins.chat.config import global_config +# 添加项目根目录到 Python 路径 +from src.plugins.chat.config import global_config from ...common.database import db # 使用正确的导入语法 from ..models.utils_model import LLM_request from src.common.logger import get_module_logger @@ -165,24 +168,5 @@ class ScheduleGenerator: logger.info(f"时间[{time_str}]: 活动[{activity}]") logger.info("==================") self.enable_output = False - - -# def main(): -# # 使用示例 -# scheduler = ScheduleGenerator() -# # new_schedule = scheduler.generate_daily_schedule() -# scheduler.print_schedule() -# print("\n当前任务:") -# print(scheduler.get_current_task()) - -# print("昨天日程:") -# print(scheduler.yesterday_schedule) -# print("今天日程:") -# print(scheduler.today_schedule) -# print("明天日程:") -# print(scheduler.tomorrow_schedule) - -# if __name__ == "__main__": -# main() - +# 当作为组件导入时使用的实例 bot_schedule = ScheduleGenerator() diff --git a/template.env b/template.env index 6791c5842..934a331d0 100644 --- a/template.env +++ b/template.env @@ -1,8 +1,6 @@ HOST=127.0.0.1 PORT=8080 -ENABLE_ADVANCE_OUTPUT=false - # 插件配置 PLUGINS=["src2.plugins.chat"] @@ -31,6 +29,7 @@ CHAT_ANY_WHERE_KEY= SILICONFLOW_KEY= # 定义日志相关配置 +SIMPLE_OUTPUT=true # 精简控制台输出格式 CONSOLE_LOG_LEVEL=INFO # 自定义日志的默认控制台输出日志级别 FILE_LOG_LEVEL=DEBUG # 自定义日志的默认文件输出日志级别 DEFAULT_CONSOLE_LOG_LEVEL=SUCCESS # 原生日志的控制台输出日志级别(nonebot就是这一类) diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index ec2b5fbd4..e5cf1df86 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "0.0.10" +version = "0.0.11" #以下是给开发人员阅读的,一般用户不需要阅读 #如果你想要修改配置文件,请在修改后将version的值进行变更 @@ -66,12 +66,15 @@ model_r1_distill_probability = 0.1 # 麦麦回答时选择次要回复模型3 max_response_length = 1024 # 麦麦回答的最大token数 [willing] -willing_mode = "classical" -# willing_mode = "dynamic" -# willing_mode = "custom" +willing_mode = "classical" # 回复意愿模式 经典模式 +# willing_mode = "dynamic" # 动态模式(可能不兼容) +# willing_mode = "custom" # 自定义模式(可自行调整 [memory] build_memory_interval = 2000 # 记忆构建间隔 单位秒 间隔越低,麦麦学习越多,但是冗余信息也会增多 +build_memory_distribution = [4,2,0.6,24,8,0.4] # 记忆构建分布,参数:分布1均值,标准差,权重,分布2均值,标准差,权重 +build_memory_sample_num = 10 # 采样数量,数值越高记忆采样次数越多 +build_memory_sample_length = 20 # 采样长度,数值越高一段记忆内容越丰富 memory_compress_rate = 0.1 # 记忆压缩率 控制记忆精简程度 建议保持默认,调高可以获得更多信息,但是冗余信息也会增多 forget_memory_interval = 1000 # 记忆遗忘间隔 单位秒 间隔越低,麦麦遗忘越频繁,记忆更精简,但更难学习 @@ -109,9 +112,7 @@ tone_error_rate=0.2 # 声调错误概率 word_replace_rate=0.006 # 整词替换概率 [others] -enable_advance_output = false # 是否启用高级输出 enable_kuuki_read = true # 是否启用读空气功能 -enable_debug_output = false # 是否启用调试输出 enable_friend_chat = false # 是否启用好友聊天 [groups] @@ -120,9 +121,9 @@ talk_allowed = [ 123, ] #可以回复消息的群 talk_frequency_down = [] #降低回复频率的群 -ban_user_id = [] #禁止回复消息的QQ号 +ban_user_id = [] #禁止回复和读取消息的QQ号 -[remote] #测试功能,发送统计信息,主要是看全球有多少只麦麦 +[remote] #发送统计信息,主要是看全球有多少只麦麦 enable = true