diff --git a/bot.py b/bot.py index 5b12b0389..c9568ecba 100644 --- a/bot.py +++ b/bot.py @@ -7,12 +7,16 @@ from pathlib import Path import time import platform from dotenv import load_dotenv -from src.common.logger import get_module_logger +from src.common.logger import get_module_logger, LogConfig, CONFIRM_STYLE_CONFIG from src.common.crash_logger import install_crash_handler from src.main import MainSystem logger = get_module_logger("main_bot") - +confirm_logger_config = LogConfig( + console_format=CONFIRM_STYLE_CONFIG["console_format"], + file_format=CONFIRM_STYLE_CONFIG["file_format"], +) +confirm_logger = get_module_logger("main_bot", config=confirm_logger_config) # 获取没有加载env时的环境变量 env_mask = {key: os.getenv(key) for key in os.environ} @@ -166,8 +170,8 @@ def check_eula(): # 如果EULA或隐私条款有更新,提示用户重新确认 if eula_updated or privacy_updated: - print("EULA或隐私条款内容已更新,请在阅读后重新确认,继续运行视为同意更新后的以上两款协议") - print( + confirm_logger.critical("EULA或隐私条款内容已更新,请在阅读后重新确认,继续运行视为同意更新后的以上两款协议") + confirm_logger.critical( f'输入"同意"或"confirmed"或设置环境变量"EULA_AGREE={eula_new_hash}"和"PRIVACY_AGREE={privacy_new_hash}"继续运行' ) while True: @@ -176,14 +180,14 @@ def check_eula(): # print("确认成功,继续运行") # print(f"确认成功,继续运行{eula_updated} {privacy_updated}") if eula_updated: - print(f"更新EULA确认文件{eula_new_hash}") + logger.info(f"更新EULA确认文件{eula_new_hash}") eula_confirm_file.write_text(eula_new_hash, encoding="utf-8") if privacy_updated: - print(f"更新隐私条款确认文件{privacy_new_hash}") + logger.info(f"更新隐私条款确认文件{privacy_new_hash}") privacy_confirm_file.write_text(privacy_new_hash, encoding="utf-8") break else: - print('请输入"同意"或"confirmed"以继续运行') + confirm_logger.critical('请输入"同意"或"confirmed"以继续运行') return elif eula_confirmed and privacy_confirmed: return diff --git a/src/common/logger.py b/src/common/logger.py index 9e118622d..cded9467c 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -290,6 +290,12 @@ WILLING_STYLE_CONFIG = { }, } +CONFIRM_STYLE_CONFIG = { + "console_format": ( + "{message}" + ), # noqa: E501 + "file_format": ("{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {extra[module]: <15} | EULA与PRIVACY确认 | {message}"), +} # 根据SIMPLE_OUTPUT选择配置 MEMORY_STYLE_CONFIG = MEMORY_STYLE_CONFIG["simple"] if SIMPLE_OUTPUT else MEMORY_STYLE_CONFIG["advanced"] diff --git a/src/heart_flow/heartflow.py b/src/heart_flow/heartflow.py index f5b394f2e..3ea51917c 100644 --- a/src/heart_flow/heartflow.py +++ b/src/heart_flow/heartflow.py @@ -9,6 +9,7 @@ from src.common.logger import get_module_logger, LogConfig, HEARTFLOW_STYLE_CONF from src.individuality.individuality import Individuality import time import random +from typing import Dict, Any heartflow_config = LogConfig( # 使用海马体专用样式 @@ -39,7 +40,7 @@ class Heartflow: model=global_config.llm_heartflow, temperature=0.6, max_tokens=1000, request_type="heart_flow" ) - self._subheartflows = {} + self._subheartflows: Dict[Any, SubHeartflow] = {} self.active_subheartflows_nums = 0 async def _cleanup_inactive_subheartflows(self): diff --git a/src/plugins/chat/auto_speak.py b/src/plugins/chat/auto_speak.py index 62a5a20a5..ac76a2714 100644 --- a/src/plugins/chat/auto_speak.py +++ b/src/plugins/chat/auto_speak.py @@ -142,7 +142,11 @@ class AutoSpeakManager: message_manager.add_message(thinking_message) # 生成自主发言内容 - response, raw_content = await self.gpt.generate_response(message) + try: + response, raw_content = await self.gpt.generate_response(message) + except Exception as e: + logger.error(f"生成自主发言内容时发生错误: {e}") + return False if response: message_set = MessageSet(None, think_id) # 不需要chat_stream diff --git a/src/plugins/chat_module/reasoning_chat/reasoning_chat.py b/src/plugins/chat_module/reasoning_chat/reasoning_chat.py index b9e94e4fe..c097427de 100644 --- a/src/plugins/chat_module/reasoning_chat/reasoning_chat.py +++ b/src/plugins/chat_module/reasoning_chat/reasoning_chat.py @@ -59,11 +59,7 @@ class ReasoningChat: return thinking_id - async def _send_response_messages(self, - message, - chat, - response_set:List[str], - thinking_id) -> MessageSending: + async def _send_response_messages(self, message, chat, response_set: List[str], thinking_id) -> MessageSending: """发送回复消息""" container = message_manager.get_container(chat.stream_id) thinking_message = None @@ -240,19 +236,23 @@ class ReasoningChat: thinking_id = await self._create_thinking_message(message, chat, userinfo, messageinfo) timer2 = time.time() timing_results["创建思考消息"] = timer2 - timer1 - + logger.debug(f"创建捕捉器,thinking_id:{thinking_id}") - + info_catcher = info_catcher_manager.get_info_catcher(thinking_id) info_catcher.catch_decide_to_response(message) # 生成回复 timer1 = time.time() - response_set = await self.gpt.generate_response(message,thinking_id) - timer2 = time.time() - timing_results["生成回复"] = timer2 - timer1 - - info_catcher.catch_after_generate_response(timing_results["生成回复"]) + try: + response_set = await self.gpt.generate_response(message, thinking_id) + timer2 = time.time() + timing_results["生成回复"] = timer2 - timer1 + + info_catcher.catch_after_generate_response(timing_results["生成回复"]) + except Exception as e: + logger.error(f"回复生成出现错误:str{e}") + response_set = None if not response_set: logger.info("为什么生成回复失败?") @@ -263,10 +263,9 @@ class ReasoningChat: first_bot_msg = await self._send_response_messages(message, chat, response_set, thinking_id) timer2 = time.time() timing_results["发送消息"] = timer2 - timer1 - - info_catcher.catch_after_response(timing_results["发送消息"],response_set,first_bot_msg) - - + + info_catcher.catch_after_response(timing_results["发送消息"], response_set, first_bot_msg) + info_catcher.done_catch() # 处理表情包 diff --git a/src/plugins/memory_system/Hippocampus.py b/src/plugins/memory_system/Hippocampus.py index 8e2cd21e7..407d20540 100644 --- a/src/plugins/memory_system/Hippocampus.py +++ b/src/plugins/memory_system/Hippocampus.py @@ -506,319 +506,6 @@ class EntorhinalCortex: logger.success(f"[数据库] 同步了 {len(memory_nodes)} 个节点和 {len(memory_edges)} 条边") -# 负责整合,遗忘,合并记忆 -class ParahippocampalGyrus: - def __init__(self, hippocampus): - self.hippocampus = hippocampus - self.memory_graph = hippocampus.memory_graph - self.config = hippocampus.config - - async def memory_compress(self, messages: list, compress_rate=0.1): - """压缩和总结消息内容,生成记忆主题和摘要。 - - Args: - messages (list): 消息列表,每个消息是一个字典,包含以下字段: - - time: float, 消息的时间戳 - - detailed_plain_text: str, 消息的详细文本内容 - compress_rate (float, optional): 压缩率,用于控制生成的主题数量。默认为0.1。 - - Returns: - tuple: (compressed_memory, similar_topics_dict) - - compressed_memory: set, 压缩后的记忆集合,每个元素是一个元组 (topic, summary) - - topic: str, 记忆主题 - - summary: str, 主题的摘要描述 - - similar_topics_dict: dict, 相似主题字典,key为主题,value为相似主题列表 - 每个相似主题是一个元组 (similar_topic, similarity) - - similar_topic: str, 相似的主题 - - similarity: float, 相似度分数(0-1之间) - - Process: - 1. 合并消息文本并生成时间信息 - 2. 使用LLM提取关键主题 - 3. 过滤掉包含禁用关键词的主题 - 4. 为每个主题生成摘要 - 5. 查找与现有记忆中的相似主题 - """ - if not messages: - return set(), {} - - # 合并消息文本,同时保留时间信息 - input_text = "" - time_info = "" - # 计算最早和最晚时间 - earliest_time = min(msg["time"] for msg in messages) - latest_time = max(msg["time"] for msg in messages) - - earliest_dt = datetime.datetime.fromtimestamp(earliest_time) - latest_dt = datetime.datetime.fromtimestamp(latest_time) - - # 如果是同一年 - if earliest_dt.year == latest_dt.year: - earliest_str = earliest_dt.strftime("%m-%d %H:%M:%S") - latest_str = latest_dt.strftime("%m-%d %H:%M:%S") - time_info += f"是在{earliest_dt.year}年,{earliest_str} 到 {latest_str} 的对话:\n" - else: - earliest_str = earliest_dt.strftime("%Y-%m-%d %H:%M:%S") - latest_str = latest_dt.strftime("%Y-%m-%d %H:%M:%S") - time_info += f"是从 {earliest_str} 到 {latest_str} 的对话:\n" - - for msg in messages: - input_text += f"{msg['detailed_plain_text']}\n" - - logger.debug(input_text) - - topic_num = self.hippocampus.calculate_topic_num(input_text, compress_rate) - topics_response = await self.hippocampus.llm_topic_judge.generate_response( - self.hippocampus.find_topic_llm(input_text, topic_num) - ) - - # 使用正则表达式提取<>中的内容 - topics = re.findall(r"<([^>]+)>", topics_response[0]) - - # 如果没有找到<>包裹的内容,返回['none'] - if not topics: - topics = ["none"] - else: - # 处理提取出的话题 - topics = [ - topic.strip() - for topic in ",".join(topics).replace(",", ",").replace("、", ",").replace(" ", ",").split(",") - if topic.strip() - ] - - # 过滤掉包含禁用关键词的topic - filtered_topics = [ - topic for topic in topics if not any(keyword in topic for keyword in self.config.memory_ban_words) - ] - - logger.debug(f"过滤后话题: {filtered_topics}") - - # 创建所有话题的请求任务 - tasks = [] - for topic in filtered_topics: - topic_what_prompt = self.hippocampus.topic_what(input_text, topic, time_info) - task = self.hippocampus.llm_summary_by_topic.generate_response_async(topic_what_prompt) - tasks.append((topic.strip(), task)) - - # 等待所有任务完成 - compressed_memory = set() - similar_topics_dict = {} - - for topic, task in tasks: - response = await task - if response: - compressed_memory.add((topic, response[0])) - - existing_topics = list(self.memory_graph.G.nodes()) - similar_topics = [] - - for existing_topic in existing_topics: - topic_words = set(jieba.cut(topic)) - existing_words = set(jieba.cut(existing_topic)) - - all_words = topic_words | existing_words - v1 = [1 if word in topic_words else 0 for word in all_words] - v2 = [1 if word in existing_words else 0 for word in all_words] - - similarity = cosine_similarity(v1, v2) - - if similarity >= 0.7: - similar_topics.append((existing_topic, similarity)) - - similar_topics.sort(key=lambda x: x[1], reverse=True) - similar_topics = similar_topics[:3] - similar_topics_dict[topic] = similar_topics - - return compressed_memory, similar_topics_dict - - async def operation_build_memory(self): - logger.debug("------------------------------------开始构建记忆--------------------------------------") - start_time = time.time() - memory_samples = self.hippocampus.entorhinal_cortex.get_memory_sample() - all_added_nodes = [] - all_connected_nodes = [] - all_added_edges = [] - for i, messages in enumerate(memory_samples, 1): - all_topics = [] - progress = (i / len(memory_samples)) * 100 - bar_length = 30 - filled_length = int(bar_length * i // len(memory_samples)) - bar = "█" * filled_length + "-" * (bar_length - filled_length) - logger.debug(f"进度: [{bar}] {progress:.1f}% ({i}/{len(memory_samples)})") - - compress_rate = self.config.memory_compress_rate - compressed_memory, similar_topics_dict = await self.memory_compress(messages, compress_rate) - logger.debug(f"压缩后记忆数量: {compressed_memory},似曾相识的话题: {similar_topics_dict}") - - current_time = datetime.datetime.now().timestamp() - logger.debug(f"添加节点: {', '.join(topic for topic, _ in compressed_memory)}") - all_added_nodes.extend(topic for topic, _ in compressed_memory) - - for topic, memory in compressed_memory: - self.memory_graph.add_dot(topic, memory) - all_topics.append(topic) - - if topic in similar_topics_dict: - similar_topics = similar_topics_dict[topic] - for similar_topic, similarity in similar_topics: - if topic != similar_topic: - strength = int(similarity * 10) - - logger.debug(f"连接相似节点: {topic} 和 {similar_topic} (强度: {strength})") - all_added_edges.append(f"{topic}-{similar_topic}") - - all_connected_nodes.append(topic) - all_connected_nodes.append(similar_topic) - - self.memory_graph.G.add_edge( - topic, - similar_topic, - strength=strength, - created_time=current_time, - last_modified=current_time, - ) - - for i in range(len(all_topics)): - for j in range(i + 1, len(all_topics)): - logger.debug(f"连接同批次节点: {all_topics[i]} 和 {all_topics[j]}") - all_added_edges.append(f"{all_topics[i]}-{all_topics[j]}") - self.memory_graph.connect_dot(all_topics[i], all_topics[j]) - - logger.success(f"更新记忆: {', '.join(all_added_nodes)}") - logger.debug(f"强化连接: {', '.join(all_added_edges)}") - logger.info(f"强化连接节点: {', '.join(all_connected_nodes)}") - - await self.hippocampus.entorhinal_cortex.sync_memory_to_db() - - end_time = time.time() - logger.success(f"---------------------记忆构建耗时: {end_time - start_time:.2f} 秒---------------------") - - async def operation_forget_topic(self, percentage=0.005): - start_time = time.time() - logger.info("[遗忘] 开始检查数据库...") - - # 验证百分比参数 - if not 0 <= percentage <= 1: - logger.warning(f"[遗忘] 无效的遗忘百分比: {percentage}, 使用默认值 0.005") - percentage = 0.005 - - all_nodes = list(self.memory_graph.G.nodes()) - all_edges = list(self.memory_graph.G.edges()) - - if not all_nodes and not all_edges: - logger.info("[遗忘] 记忆图为空,无需进行遗忘操作") - return - - # 确保至少检查1个节点和边,且不超过总数 - check_nodes_count = max(1, min(len(all_nodes), int(len(all_nodes) * percentage))) - check_edges_count = max(1, min(len(all_edges), int(len(all_edges) * percentage))) - - # 只有在有足够的节点和边时才进行采样 - if len(all_nodes) >= check_nodes_count and len(all_edges) >= check_edges_count: - try: - nodes_to_check = random.sample(all_nodes, check_nodes_count) - edges_to_check = random.sample(all_edges, check_edges_count) - except ValueError as e: - logger.error(f"[遗忘] 采样错误: {str(e)}") - return - else: - logger.info("[遗忘] 没有足够的节点或边进行遗忘操作") - return - - # 使用列表存储变化信息 - edge_changes = { - "weakened": [], # 存储减弱的边 - "removed": [], # 存储移除的边 - } - node_changes = { - "reduced": [], # 存储减少记忆的节点 - "removed": [], # 存储移除的节点 - } - - current_time = datetime.datetime.now().timestamp() - - logger.info("[遗忘] 开始检查连接...") - edge_check_start = time.time() - for source, target in edges_to_check: - edge_data = self.memory_graph.G[source][target] - last_modified = edge_data.get("last_modified") - - if current_time - last_modified > 3600 * self.config.memory_forget_time: - current_strength = edge_data.get("strength", 1) - new_strength = current_strength - 1 - - if new_strength <= 0: - self.memory_graph.G.remove_edge(source, target) - edge_changes["removed"].append(f"{source} -> {target}") - else: - edge_data["strength"] = new_strength - edge_data["last_modified"] = current_time - edge_changes["weakened"].append(f"{source}-{target} (强度: {current_strength} -> {new_strength})") - edge_check_end = time.time() - logger.info(f"[遗忘] 连接检查耗时: {edge_check_end - edge_check_start:.2f}秒") - - logger.info("[遗忘] 开始检查节点...") - node_check_start = time.time() - for node in nodes_to_check: - node_data = self.memory_graph.G.nodes[node] - last_modified = node_data.get("last_modified", current_time) - - if current_time - last_modified > 3600 * 24: - memory_items = node_data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - if memory_items: - current_count = len(memory_items) - removed_item = random.choice(memory_items) - memory_items.remove(removed_item) - - if memory_items: - self.memory_graph.G.nodes[node]["memory_items"] = memory_items - self.memory_graph.G.nodes[node]["last_modified"] = current_time - node_changes["reduced"].append(f"{node} (数量: {current_count} -> {len(memory_items)})") - else: - self.memory_graph.G.remove_node(node) - node_changes["removed"].append(node) - node_check_end = time.time() - logger.info(f"[遗忘] 节点检查耗时: {node_check_end - node_check_start:.2f}秒") - - if any(edge_changes.values()) or any(node_changes.values()): - sync_start = time.time() - - await self.hippocampus.entorhinal_cortex.resync_memory_to_db() - - sync_end = time.time() - logger.info(f"[遗忘] 数据库同步耗时: {sync_end - sync_start:.2f}秒") - - # 汇总输出所有变化 - logger.info("[遗忘] 遗忘操作统计:") - if edge_changes["weakened"]: - logger.info( - f"[遗忘] 减弱的连接 ({len(edge_changes['weakened'])}个): {', '.join(edge_changes['weakened'])}" - ) - - if edge_changes["removed"]: - logger.info( - f"[遗忘] 移除的连接 ({len(edge_changes['removed'])}个): {', '.join(edge_changes['removed'])}" - ) - - if node_changes["reduced"]: - logger.info( - f"[遗忘] 减少记忆的节点 ({len(node_changes['reduced'])}个): {', '.join(node_changes['reduced'])}" - ) - - if node_changes["removed"]: - logger.info( - f"[遗忘] 移除的节点 ({len(node_changes['removed'])}个): {', '.join(node_changes['removed'])}" - ) - else: - logger.info("[遗忘] 本次检查没有节点或连接满足遗忘条件") - - end_time = time.time() - logger.info(f"[遗忘] 总耗时: {end_time - start_time:.2f}秒") - - # 海马体 class Hippocampus: def __init__(self): @@ -1247,6 +934,327 @@ class Hippocampus: return activation_ratio +# 负责整合,遗忘,合并记忆 +class ParahippocampalGyrus: + def __init__(self, hippocampus: Hippocampus): + self.hippocampus = hippocampus + self.memory_graph = hippocampus.memory_graph + self.config = hippocampus.config + + async def memory_compress(self, messages: list, compress_rate=0.1): + """压缩和总结消息内容,生成记忆主题和摘要。 + + Args: + messages (list): 消息列表,每个消息是一个字典,包含以下字段: + - time: float, 消息的时间戳 + - detailed_plain_text: str, 消息的详细文本内容 + compress_rate (float, optional): 压缩率,用于控制生成的主题数量。默认为0.1。 + + Returns: + tuple: (compressed_memory, similar_topics_dict) + - compressed_memory: set, 压缩后的记忆集合,每个元素是一个元组 (topic, summary) + - topic: str, 记忆主题 + - summary: str, 主题的摘要描述 + - similar_topics_dict: dict, 相似主题字典,key为主题,value为相似主题列表 + 每个相似主题是一个元组 (similar_topic, similarity) + - similar_topic: str, 相似的主题 + - similarity: float, 相似度分数(0-1之间) + + Process: + 1. 合并消息文本并生成时间信息 + 2. 使用LLM提取关键主题 + 3. 过滤掉包含禁用关键词的主题 + 4. 为每个主题生成摘要 + 5. 查找与现有记忆中的相似主题 + """ + if not messages: + return set(), {} + + # 合并消息文本,同时保留时间信息 + input_text = "" + time_info = "" + # 计算最早和最晚时间 + earliest_time = min(msg["time"] for msg in messages) + latest_time = max(msg["time"] for msg in messages) + + earliest_dt = datetime.datetime.fromtimestamp(earliest_time) + latest_dt = datetime.datetime.fromtimestamp(latest_time) + + # 如果是同一年 + if earliest_dt.year == latest_dt.year: + earliest_str = earliest_dt.strftime("%m-%d %H:%M:%S") + latest_str = latest_dt.strftime("%m-%d %H:%M:%S") + time_info += f"是在{earliest_dt.year}年,{earliest_str} 到 {latest_str} 的对话:\n" + else: + earliest_str = earliest_dt.strftime("%Y-%m-%d %H:%M:%S") + latest_str = latest_dt.strftime("%Y-%m-%d %H:%M:%S") + time_info += f"是从 {earliest_str} 到 {latest_str} 的对话:\n" + + for msg in messages: + input_text += f"{msg['detailed_plain_text']}\n" + + logger.debug(input_text) + + topic_num = self.hippocampus.calculate_topic_num(input_text, compress_rate) + topics_response = await self.hippocampus.llm_topic_judge.generate_response( + self.hippocampus.find_topic_llm(input_text, topic_num) + ) + + # 使用正则表达式提取<>中的内容 + topics = re.findall(r"<([^>]+)>", topics_response[0]) + + # 如果没有找到<>包裹的内容,返回['none'] + if not topics: + topics = ["none"] + else: + # 处理提取出的话题 + topics = [ + topic.strip() + for topic in ",".join(topics).replace(",", ",").replace("、", ",").replace(" ", ",").split(",") + if topic.strip() + ] + + # 过滤掉包含禁用关键词的topic + filtered_topics = [ + topic for topic in topics if not any(keyword in topic for keyword in self.config.memory_ban_words) + ] + + logger.debug(f"过滤后话题: {filtered_topics}") + + # 创建所有话题的请求任务 + tasks = [] + for topic in filtered_topics: + topic_what_prompt = self.hippocampus.topic_what(input_text, topic, time_info) + try: + task = self.hippocampus.llm_summary_by_topic.generate_response_async(topic_what_prompt) + tasks.append((topic.strip(), task)) + except Exception as e: + logger.error(f"生成话题 '{topic}' 的摘要时发生错误: {e}") + continue + + # 等待所有任务完成 + compressed_memory = set() + similar_topics_dict = {} + + for topic, task in tasks: + response = await task + if response: + compressed_memory.add((topic, response[0])) + + existing_topics = list(self.memory_graph.G.nodes()) + similar_topics = [] + + for existing_topic in existing_topics: + topic_words = set(jieba.cut(topic)) + existing_words = set(jieba.cut(existing_topic)) + + all_words = topic_words | existing_words + v1 = [1 if word in topic_words else 0 for word in all_words] + v2 = [1 if word in existing_words else 0 for word in all_words] + + similarity = cosine_similarity(v1, v2) + + if similarity >= 0.7: + similar_topics.append((existing_topic, similarity)) + + similar_topics.sort(key=lambda x: x[1], reverse=True) + similar_topics = similar_topics[:3] + similar_topics_dict[topic] = similar_topics + + return compressed_memory, similar_topics_dict + + async def operation_build_memory(self): + logger.debug("------------------------------------开始构建记忆--------------------------------------") + start_time = time.time() + memory_samples = self.hippocampus.entorhinal_cortex.get_memory_sample() + all_added_nodes = [] + all_connected_nodes = [] + all_added_edges = [] + for i, messages in enumerate(memory_samples, 1): + all_topics = [] + progress = (i / len(memory_samples)) * 100 + bar_length = 30 + filled_length = int(bar_length * i // len(memory_samples)) + bar = "█" * filled_length + "-" * (bar_length - filled_length) + logger.debug(f"进度: [{bar}] {progress:.1f}% ({i}/{len(memory_samples)})") + + compress_rate = self.config.memory_compress_rate + try: + compressed_memory, similar_topics_dict = await self.memory_compress(messages, compress_rate) + except Exception as e: + logger.error(f"压缩记忆时发生错误: {e}") + continue + logger.debug(f"压缩后记忆数量: {compressed_memory},似曾相识的话题: {similar_topics_dict}") + + current_time = datetime.datetime.now().timestamp() + logger.debug(f"添加节点: {', '.join(topic for topic, _ in compressed_memory)}") + all_added_nodes.extend(topic for topic, _ in compressed_memory) + + for topic, memory in compressed_memory: + self.memory_graph.add_dot(topic, memory) + all_topics.append(topic) + + if topic in similar_topics_dict: + similar_topics = similar_topics_dict[topic] + for similar_topic, similarity in similar_topics: + if topic != similar_topic: + strength = int(similarity * 10) + + logger.debug(f"连接相似节点: {topic} 和 {similar_topic} (强度: {strength})") + all_added_edges.append(f"{topic}-{similar_topic}") + + all_connected_nodes.append(topic) + all_connected_nodes.append(similar_topic) + + self.memory_graph.G.add_edge( + topic, + similar_topic, + strength=strength, + created_time=current_time, + last_modified=current_time, + ) + + for i in range(len(all_topics)): + for j in range(i + 1, len(all_topics)): + logger.debug(f"连接同批次节点: {all_topics[i]} 和 {all_topics[j]}") + all_added_edges.append(f"{all_topics[i]}-{all_topics[j]}") + self.memory_graph.connect_dot(all_topics[i], all_topics[j]) + + logger.success(f"更新记忆: {', '.join(all_added_nodes)}") + logger.debug(f"强化连接: {', '.join(all_added_edges)}") + logger.info(f"强化连接节点: {', '.join(all_connected_nodes)}") + + await self.hippocampus.entorhinal_cortex.sync_memory_to_db() + + end_time = time.time() + logger.success(f"---------------------记忆构建耗时: {end_time - start_time:.2f} 秒---------------------") + + async def operation_forget_topic(self, percentage=0.005): + start_time = time.time() + logger.info("[遗忘] 开始检查数据库...") + + # 验证百分比参数 + if not 0 <= percentage <= 1: + logger.warning(f"[遗忘] 无效的遗忘百分比: {percentage}, 使用默认值 0.005") + percentage = 0.005 + + all_nodes = list(self.memory_graph.G.nodes()) + all_edges = list(self.memory_graph.G.edges()) + + if not all_nodes and not all_edges: + logger.info("[遗忘] 记忆图为空,无需进行遗忘操作") + return + + # 确保至少检查1个节点和边,且不超过总数 + check_nodes_count = max(1, min(len(all_nodes), int(len(all_nodes) * percentage))) + check_edges_count = max(1, min(len(all_edges), int(len(all_edges) * percentage))) + + # 只有在有足够的节点和边时才进行采样 + if len(all_nodes) >= check_nodes_count and len(all_edges) >= check_edges_count: + try: + nodes_to_check = random.sample(all_nodes, check_nodes_count) + edges_to_check = random.sample(all_edges, check_edges_count) + except ValueError as e: + logger.error(f"[遗忘] 采样错误: {str(e)}") + return + else: + logger.info("[遗忘] 没有足够的节点或边进行遗忘操作") + return + + # 使用列表存储变化信息 + edge_changes = { + "weakened": [], # 存储减弱的边 + "removed": [], # 存储移除的边 + } + node_changes = { + "reduced": [], # 存储减少记忆的节点 + "removed": [], # 存储移除的节点 + } + + current_time = datetime.datetime.now().timestamp() + + logger.info("[遗忘] 开始检查连接...") + edge_check_start = time.time() + for source, target in edges_to_check: + edge_data = self.memory_graph.G[source][target] + last_modified = edge_data.get("last_modified") + + if current_time - last_modified > 3600 * self.config.memory_forget_time: + current_strength = edge_data.get("strength", 1) + new_strength = current_strength - 1 + + if new_strength <= 0: + self.memory_graph.G.remove_edge(source, target) + edge_changes["removed"].append(f"{source} -> {target}") + else: + edge_data["strength"] = new_strength + edge_data["last_modified"] = current_time + edge_changes["weakened"].append(f"{source}-{target} (强度: {current_strength} -> {new_strength})") + edge_check_end = time.time() + logger.info(f"[遗忘] 连接检查耗时: {edge_check_end - edge_check_start:.2f}秒") + + logger.info("[遗忘] 开始检查节点...") + node_check_start = time.time() + for node in nodes_to_check: + node_data = self.memory_graph.G.nodes[node] + last_modified = node_data.get("last_modified", current_time) + + if current_time - last_modified > 3600 * 24: + memory_items = node_data.get("memory_items", []) + if not isinstance(memory_items, list): + memory_items = [memory_items] if memory_items else [] + + if memory_items: + current_count = len(memory_items) + removed_item = random.choice(memory_items) + memory_items.remove(removed_item) + + if memory_items: + self.memory_graph.G.nodes[node]["memory_items"] = memory_items + self.memory_graph.G.nodes[node]["last_modified"] = current_time + node_changes["reduced"].append(f"{node} (数量: {current_count} -> {len(memory_items)})") + else: + self.memory_graph.G.remove_node(node) + node_changes["removed"].append(node) + node_check_end = time.time() + logger.info(f"[遗忘] 节点检查耗时: {node_check_end - node_check_start:.2f}秒") + + if any(edge_changes.values()) or any(node_changes.values()): + sync_start = time.time() + + await self.hippocampus.entorhinal_cortex.resync_memory_to_db() + + sync_end = time.time() + logger.info(f"[遗忘] 数据库同步耗时: {sync_end - sync_start:.2f}秒") + + # 汇总输出所有变化 + logger.info("[遗忘] 遗忘操作统计:") + if edge_changes["weakened"]: + logger.info( + f"[遗忘] 减弱的连接 ({len(edge_changes['weakened'])}个): {', '.join(edge_changes['weakened'])}" + ) + + if edge_changes["removed"]: + logger.info( + f"[遗忘] 移除的连接 ({len(edge_changes['removed'])}个): {', '.join(edge_changes['removed'])}" + ) + + if node_changes["reduced"]: + logger.info( + f"[遗忘] 减少记忆的节点 ({len(node_changes['reduced'])}个): {', '.join(node_changes['reduced'])}" + ) + + if node_changes["removed"]: + logger.info( + f"[遗忘] 移除的节点 ({len(node_changes['removed'])}个): {', '.join(node_changes['removed'])}" + ) + else: + logger.info("[遗忘] 本次检查没有节点或连接满足遗忘条件") + + end_time = time.time() + logger.info(f"[遗忘] 总耗时: {end_time - start_time:.2f}秒") + + class HippocampusManager: _instance = None _hippocampus = None @@ -1317,12 +1325,13 @@ class HippocampusManager: if not self._initialized: raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法") try: - response = await self._hippocampus.get_memory_from_text(text, max_memory_num, max_memory_length, max_depth, fast_retrieval) + response = await self._hippocampus.get_memory_from_text( + text, max_memory_num, max_memory_length, max_depth, fast_retrieval + ) except Exception as e: logger.error(f"文本激活记忆失败: {e}") response = [] return response - async def get_activate_from_text(self, text: str, max_depth: int = 3, fast_retrieval: bool = False) -> float: """从文本中获取激活值的公共接口"""