From 388392b9c515dcdfb1f3697ef8020a5b2574ff1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=A5=E6=B2=B3=E6=99=B4?= Date: Mon, 21 Apr 2025 14:16:13 +0900 Subject: [PATCH 1/6] =?UTF-8?q?fix:=20=E5=88=A0=E9=99=A4=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=E7=9A=84=E7=B1=BB=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugins/memory_system/Hippocampus.py | 995 +++++++---------------- 1 file changed, 281 insertions(+), 714 deletions(-) diff --git a/src/plugins/memory_system/Hippocampus.py b/src/plugins/memory_system/Hippocampus.py index 557b42f2b..f25f1d452 100644 --- a/src/plugins/memory_system/Hippocampus.py +++ b/src/plugins/memory_system/Hippocampus.py @@ -342,720 +342,6 @@ class Hippocampus: memories.sort(key=lambda x: x[2], reverse=True) return memories - async def get_memory_from_text( - self, - text: str, - max_memory_num: int = 3, - max_memory_length: int = 2, - max_depth: int = 3, - fast_retrieval: bool = False, - ) -> list: - """从文本中提取关键词并获取相关记忆。 - - Args: - text (str): 输入文本 - max_memory_num (int, optional): 记忆数量限制。默认为3。 - max_memory_length (int, optional): 记忆长度限制。默认为2。 - max_depth (int, optional): 记忆检索深度。默认为2。 - fast_retrieval (bool, optional): 是否使用快速检索。默认为False。 - 如果为True,使用jieba分词和TF-IDF提取关键词,速度更快但可能不够准确。 - 如果为False,使用LLM提取关键词,速度较慢但更准确。 - - Returns: - list: 记忆列表,每个元素是一个元组 (topic, memory_items, similarity) - - topic: str, 记忆主题 - - memory_items: list, 该主题下的记忆项列表 - - similarity: float, 与文本的相似度 - """ - if not text: - return [] - - if fast_retrieval: - # 使用jieba分词提取关键词 - words = jieba.cut(text) - # 过滤掉停用词和单字词 - keywords = [word for word in words if len(word) > 1] - # 去重 - keywords = list(set(keywords)) - # 限制关键词数量 - keywords = keywords[:5] - else: - # 使用LLM提取关键词 - topic_num = min(5, max(1, int(len(text) * 0.1))) # 根据文本长度动态调整关键词数量 - # logger.info(f"提取关键词数量: {topic_num}") - topics_response = await self.llm_topic_judge.generate_response(self.find_topic_llm(text, topic_num)) - - # 提取关键词 - keywords = re.findall(r"<([^>]+)>", topics_response[0]) - if not keywords: - keywords = [] - else: - keywords = [ - keyword.strip() - for keyword in ",".join(keywords).replace(",", ",").replace("、", ",").replace(" ", ",").split(",") - if keyword.strip() - ] - - # logger.info(f"提取的关键词: {', '.join(keywords)}") - - # 过滤掉不存在于记忆图中的关键词 - valid_keywords = [keyword for keyword in keywords if keyword in self.memory_graph.G] - if not valid_keywords: - # logger.info("没有找到有效的关键词节点") - return [] - - logger.info(f"有效的关键词: {', '.join(valid_keywords)}") - - # 从每个关键词获取记忆 - all_memories = [] - activate_map = {} # 存储每个词的累计激活值 - - # 对每个关键词进行扩散式检索 - for keyword in valid_keywords: - logger.debug(f"开始以关键词 '{keyword}' 为中心进行扩散检索 (最大深度: {max_depth}):") - # 初始化激活值 - activation_values = {keyword: 1.0} - # 记录已访问的节点 - visited_nodes = {keyword} - # 待处理的节点队列,每个元素是(节点, 激活值, 当前深度) - nodes_to_process = [(keyword, 1.0, 0)] - - while nodes_to_process: - current_node, current_activation, current_depth = nodes_to_process.pop(0) - - # 如果激活值小于0或超过最大深度,停止扩散 - if current_activation <= 0 or current_depth >= max_depth: - continue - - # 获取当前节点的所有邻居 - neighbors = list(self.memory_graph.G.neighbors(current_node)) - - for neighbor in neighbors: - if neighbor in visited_nodes: - continue - - # 获取连接强度 - edge_data = self.memory_graph.G[current_node][neighbor] - strength = edge_data.get("strength", 1) - - # 计算新的激活值 - new_activation = current_activation - (1 / strength) - - if new_activation > 0: - activation_values[neighbor] = new_activation - visited_nodes.add(neighbor) - nodes_to_process.append((neighbor, new_activation, current_depth + 1)) - logger.trace( - f"节点 '{neighbor}' 被激活,激活值: {new_activation:.2f} (通过 '{current_node}' 连接,强度: {strength}, 深度: {current_depth + 1})" - ) # noqa: E501 - - # 更新激活映射 - for node, activation_value in activation_values.items(): - if activation_value > 0: - if node in activate_map: - activate_map[node] += activation_value - else: - activate_map[node] = activation_value - - # 输出激活映射 - # logger.info("激活映射统计:") - # for node, total_activation in sorted(activate_map.items(), key=lambda x: x[1], reverse=True): - # logger.info(f"节点 '{node}': 累计激活值 = {total_activation:.2f}") - - # 基于激活值平方的独立概率选择 - remember_map = {} - # logger.info("基于激活值平方的归一化选择:") - - # 计算所有激活值的平方和 - total_squared_activation = sum(activation**2 for activation in activate_map.values()) - if total_squared_activation > 0: - # 计算归一化的激活值 - normalized_activations = { - node: (activation**2) / total_squared_activation for node, activation in activate_map.items() - } - - # 按归一化激活值排序并选择前max_memory_num个 - sorted_nodes = sorted(normalized_activations.items(), key=lambda x: x[1], reverse=True)[:max_memory_num] - - # 将选中的节点添加到remember_map - for node, normalized_activation in sorted_nodes: - remember_map[node] = activate_map[node] # 使用原始激活值 - logger.debug( - f"节点 '{node}' (归一化激活值: {normalized_activation:.2f}, 激活值: {activate_map[node]:.2f})" - ) - else: - logger.info("没有有效的激活值") - - # 从选中的节点中提取记忆 - all_memories = [] - # logger.info("开始从选中的节点中提取记忆:") - for node, activation in remember_map.items(): - logger.debug(f"处理节点 '{node}' (激活值: {activation:.2f}):") - node_data = self.memory_graph.G.nodes[node] - memory_items = node_data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - if memory_items: - logger.debug(f"节点包含 {len(memory_items)} 条记忆") - # 计算每条记忆与输入文本的相似度 - memory_similarities = [] - for memory in memory_items: - # 计算与输入文本的相似度 - memory_words = set(jieba.cut(memory)) - text_words = set(jieba.cut(text)) - all_words = memory_words | text_words - v1 = [1 if word in memory_words else 0 for word in all_words] - v2 = [1 if word in text_words else 0 for word in all_words] - similarity = cosine_similarity(v1, v2) - memory_similarities.append((memory, similarity)) - - # 按相似度排序 - memory_similarities.sort(key=lambda x: x[1], reverse=True) - # 获取最匹配的记忆 - top_memories = memory_similarities[:max_memory_length] - - # 添加到结果中 - for memory, similarity in top_memories: - all_memories.append((node, [memory], similarity)) - # logger.info(f"选中记忆: {memory} (相似度: {similarity:.2f})") - else: - logger.info("节点没有记忆") - - # 去重(基于记忆内容) - logger.debug("开始记忆去重:") - seen_memories = set() - unique_memories = [] - for topic, memory_items, activation_value in all_memories: - memory = memory_items[0] # 因为每个topic只有一条记忆 - if memory not in seen_memories: - seen_memories.add(memory) - unique_memories.append((topic, memory_items, activation_value)) - logger.debug(f"保留记忆: {memory} (来自节点: {topic}, 激活值: {activation_value:.2f})") - else: - logger.debug(f"跳过重复记忆: {memory} (来自节点: {topic})") - - # 转换为(关键词, 记忆)格式 - result = [] - for topic, memory_items, _ in unique_memories: - memory = memory_items[0] # 因为每个topic只有一条记忆 - result.append((topic, memory)) - logger.info(f"选中记忆: {memory} (来自节点: {topic})") - - return result - - async def get_activate_from_text(self, text: str, max_depth: int = 3, fast_retrieval: bool = False) -> float: - """从文本中提取关键词并获取相关记忆。 - - Args: - text (str): 输入文本 - max_depth (int, optional): 记忆检索深度。默认为2。 - fast_retrieval (bool, optional): 是否使用快速检索。默认为False。 - 如果为True,使用jieba分词和TF-IDF提取关键词,速度更快但可能不够准确。 - 如果为False,使用LLM提取关键词,速度较慢但更准确。 - - Returns: - float: 激活节点数与总节点数的比值 - """ - if not text: - return 0 - - if fast_retrieval: - # 使用jieba分词提取关键词 - words = jieba.cut(text) - # 过滤掉停用词和单字词 - keywords = [word for word in words if len(word) > 1] - # 去重 - keywords = list(set(keywords)) - # 限制关键词数量 - keywords = keywords[:5] - else: - # 使用LLM提取关键词 - topic_num = min(5, max(1, int(len(text) * 0.1))) # 根据文本长度动态调整关键词数量 - # logger.info(f"提取关键词数量: {topic_num}") - topics_response = await self.llm_topic_judge.generate_response(self.find_topic_llm(text, topic_num)) - - # 提取关键词 - keywords = re.findall(r"<([^>]+)>", topics_response[0]) - if not keywords: - keywords = [] - else: - keywords = [ - keyword.strip() - for keyword in ",".join(keywords).replace(",", ",").replace("、", ",").replace(" ", ",").split(",") - if keyword.strip() - ] - - # logger.info(f"提取的关键词: {', '.join(keywords)}") - - # 过滤掉不存在于记忆图中的关键词 - valid_keywords = [keyword for keyword in keywords if keyword in self.memory_graph.G] - if not valid_keywords: - # logger.info("没有找到有效的关键词节点") - return 0 - - logger.info(f"有效的关键词: {', '.join(valid_keywords)}") - - # 从每个关键词获取记忆 - activate_map = {} # 存储每个词的累计激活值 - - # 对每个关键词进行扩散式检索 - for keyword in valid_keywords: - logger.debug(f"开始以关键词 '{keyword}' 为中心进行扩散检索 (最大深度: {max_depth}):") - # 初始化激活值 - activation_values = {keyword: 1.0} - # 记录已访问的节点 - visited_nodes = {keyword} - # 待处理的节点队列,每个元素是(节点, 激活值, 当前深度) - nodes_to_process = [(keyword, 1.0, 0)] - - while nodes_to_process: - current_node, current_activation, current_depth = nodes_to_process.pop(0) - - # 如果激活值小于0或超过最大深度,停止扩散 - if current_activation <= 0 or current_depth >= max_depth: - continue - - # 获取当前节点的所有邻居 - neighbors = list(self.memory_graph.G.neighbors(current_node)) - - for neighbor in neighbors: - if neighbor in visited_nodes: - continue - - # 获取连接强度 - edge_data = self.memory_graph.G[current_node][neighbor] - strength = edge_data.get("strength", 1) - - # 计算新的激活值 - new_activation = current_activation - (1 / strength) - - if new_activation > 0: - activation_values[neighbor] = new_activation - visited_nodes.add(neighbor) - nodes_to_process.append((neighbor, new_activation, current_depth + 1)) - # logger.debug( - # f"节点 '{neighbor}' 被激活,激活值: {new_activation:.2f} (通过 '{current_node}' 连接,强度: {strength}, 深度: {current_depth + 1})") # noqa: E501 - - # 更新激活映射 - for node, activation_value in activation_values.items(): - if activation_value > 0: - if node in activate_map: - activate_map[node] += activation_value - else: - activate_map[node] = activation_value - - # 输出激活映射 - # logger.info("激活映射统计:") - # for node, total_activation in sorted(activate_map.items(), key=lambda x: x[1], reverse=True): - # logger.info(f"节点 '{node}': 累计激活值 = {total_activation:.2f}") - - # 计算激活节点数与总节点数的比值 - total_activation = sum(activate_map.values()) - logger.info(f"总激活值: {total_activation:.2f}") - total_nodes = len(self.memory_graph.G.nodes()) - # activated_nodes = len(activate_map) - activation_ratio = total_activation / total_nodes if total_nodes > 0 else 0 - activation_ratio = activation_ratio * 60 - logger.info(f"总激活值: {total_activation:.2f}, 总节点数: {total_nodes}, 激活: {activation_ratio}") - - return activation_ratio - - -# 负责海马体与其他部分的交互 -class EntorhinalCortex: - def __init__(self, hippocampus: Hippocampus): - self.hippocampus = hippocampus - self.memory_graph = hippocampus.memory_graph - self.config = hippocampus.config - - def get_memory_sample(self): - """从数据库获取记忆样本""" - # 硬编码:每条消息最大记忆次数 - max_memorized_time_per_msg = 3 - - # 创建双峰分布的记忆调度器 - sample_scheduler = MemoryBuildScheduler( - n_hours1=self.config.memory_build_distribution[0], - std_hours1=self.config.memory_build_distribution[1], - weight1=self.config.memory_build_distribution[2], - n_hours2=self.config.memory_build_distribution[3], - std_hours2=self.config.memory_build_distribution[4], - weight2=self.config.memory_build_distribution[5], - total_samples=self.config.build_memory_sample_num, - ) - - timestamps = sample_scheduler.get_timestamp_array() - logger.info(f"回忆往事: {[time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts)) for ts in timestamps]}") - chat_samples = [] - for timestamp in timestamps: - messages = self.random_get_msg_snippet( - timestamp, self.config.build_memory_sample_length, max_memorized_time_per_msg - ) - if messages: - time_diff = (datetime.datetime.now().timestamp() - timestamp) / 3600 - logger.debug(f"成功抽取 {time_diff:.1f} 小时前的消息样本,共{len(messages)}条") - chat_samples.append(messages) - else: - logger.debug(f"时间戳 {timestamp} 的消息样本抽取失败") - - return chat_samples - - @staticmethod - def random_get_msg_snippet(target_timestamp: float, chat_size: int, max_memorized_time_per_msg: int) -> list: - """从数据库中随机获取指定时间戳附近的消息片段""" - try_count = 0 - while try_count < 3: - messages = get_closest_chat_from_db(length=chat_size, timestamp=target_timestamp) - if messages: - for message in messages: - if message["memorized_times"] >= max_memorized_time_per_msg: - messages = None - break - if messages: - for message in messages: - db.messages.update_one( - {"_id": message["_id"]}, {"$set": {"memorized_times": message["memorized_times"] + 1}} - ) - return messages - try_count += 1 - return None - - async def sync_memory_to_db(self): - """将记忆图同步到数据库""" - # 获取数据库中所有节点和内存中所有节点 - db_nodes = list(db.graph_data.nodes.find()) - memory_nodes = list(self.memory_graph.G.nodes(data=True)) - - # 转换数据库节点为字典格式,方便查找 - db_nodes_dict = {node["concept"]: node for node in db_nodes} - - # 检查并更新节点 - for concept, data in memory_nodes: - memory_items = data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - # 计算内存中节点的特征值 - memory_hash = self.hippocampus.calculate_node_hash(concept, memory_items) - - # 获取时间信息 - created_time = data.get("created_time", datetime.datetime.now().timestamp()) - last_modified = data.get("last_modified", datetime.datetime.now().timestamp()) - - if concept not in db_nodes_dict: - # 数据库中缺少的节点,添加 - node_data = { - "concept": concept, - "memory_items": memory_items, - "hash": memory_hash, - "created_time": created_time, - "last_modified": last_modified, - } - db.graph_data.nodes.insert_one(node_data) - else: - # 获取数据库中节点的特征值 - db_node = db_nodes_dict[concept] - db_hash = db_node.get("hash", None) - - # 如果特征值不同,则更新节点 - if db_hash != memory_hash: - db.graph_data.nodes.update_one( - {"concept": concept}, - { - "$set": { - "memory_items": memory_items, - "hash": memory_hash, - "created_time": created_time, - "last_modified": last_modified, - } - }, - ) - - # 处理边的信息 - db_edges = list(db.graph_data.edges.find()) - memory_edges = list(self.memory_graph.G.edges(data=True)) - - # 创建边的哈希值字典 - db_edge_dict = {} - for edge in db_edges: - edge_hash = self.hippocampus.calculate_edge_hash(edge["source"], edge["target"]) - db_edge_dict[(edge["source"], edge["target"])] = {"hash": edge_hash, "strength": edge.get("strength", 1)} - - # 检查并更新边 - for source, target, data in memory_edges: - edge_hash = self.hippocampus.calculate_edge_hash(source, target) - edge_key = (source, target) - strength = data.get("strength", 1) - - # 获取边的时间信息 - created_time = data.get("created_time", datetime.datetime.now().timestamp()) - last_modified = data.get("last_modified", datetime.datetime.now().timestamp()) - - if edge_key not in db_edge_dict: - # 添加新边 - edge_data = { - "source": source, - "target": target, - "strength": strength, - "hash": edge_hash, - "created_time": created_time, - "last_modified": last_modified, - } - db.graph_data.edges.insert_one(edge_data) - else: - # 检查边的特征值是否变化 - if db_edge_dict[edge_key]["hash"] != edge_hash: - db.graph_data.edges.update_one( - {"source": source, "target": target}, - { - "$set": { - "hash": edge_hash, - "strength": strength, - "created_time": created_time, - "last_modified": last_modified, - } - }, - ) - - def sync_memory_from_db(self): - """从数据库同步数据到内存中的图结构""" - current_time = datetime.datetime.now().timestamp() - need_update = False - - # 清空当前图 - self.memory_graph.G.clear() - - # 从数据库加载所有节点 - nodes = list(db.graph_data.nodes.find()) - for node in nodes: - concept = node["concept"] - memory_items = node.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - # 检查时间字段是否存在 - if "created_time" not in node or "last_modified" not in node: - need_update = True - # 更新数据库中的节点 - update_data = {} - if "created_time" not in node: - update_data["created_time"] = current_time - if "last_modified" not in node: - update_data["last_modified"] = current_time - - db.graph_data.nodes.update_one({"concept": concept}, {"$set": update_data}) - logger.info(f"[时间更新] 节点 {concept} 添加缺失的时间字段") - - # 获取时间信息(如果不存在则使用当前时间) - created_time = node.get("created_time", current_time) - last_modified = node.get("last_modified", current_time) - - # 添加节点到图中 - self.memory_graph.G.add_node( - concept, memory_items=memory_items, created_time=created_time, last_modified=last_modified - ) - - # 从数据库加载所有边 - edges = list(db.graph_data.edges.find()) - for edge in edges: - source = edge["source"] - target = edge["target"] - strength = edge.get("strength", 1) - - # 检查时间字段是否存在 - if "created_time" not in edge or "last_modified" not in edge: - need_update = True - # 更新数据库中的边 - update_data = {} - if "created_time" not in edge: - update_data["created_time"] = current_time - if "last_modified" not in edge: - update_data["last_modified"] = current_time - - db.graph_data.edges.update_one({"source": source, "target": target}, {"$set": update_data}) - logger.info(f"[时间更新] 边 {source} - {target} 添加缺失的时间字段") - - # 获取时间信息(如果不存在则使用当前时间) - created_time = edge.get("created_time", current_time) - last_modified = edge.get("last_modified", current_time) - - # 只有当源节点和目标节点都存在时才添加边 - if source in self.memory_graph.G and target in self.memory_graph.G: - self.memory_graph.G.add_edge( - source, target, strength=strength, created_time=created_time, last_modified=last_modified - ) - - if need_update: - logger.success("[数据库] 已为缺失的时间字段进行补充") - - async def resync_memory_to_db(self): - """清空数据库并重新同步所有记忆数据""" - start_time = time.time() - logger.info("[数据库] 开始重新同步所有记忆数据...") - - # 清空数据库 - clear_start = time.time() - db.graph_data.nodes.delete_many({}) - db.graph_data.edges.delete_many({}) - clear_end = time.time() - logger.info(f"[数据库] 清空数据库耗时: {clear_end - clear_start:.2f}秒") - - # 获取所有节点和边 - memory_nodes = list(self.memory_graph.G.nodes(data=True)) - memory_edges = list(self.memory_graph.G.edges(data=True)) - - # 重新写入节点 - node_start = time.time() - for concept, data in memory_nodes: - memory_items = data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - node_data = { - "concept": concept, - "memory_items": memory_items, - "hash": self.hippocampus.calculate_node_hash(concept, memory_items), - "created_time": data.get("created_time", datetime.datetime.now().timestamp()), - "last_modified": data.get("last_modified", datetime.datetime.now().timestamp()), - } - db.graph_data.nodes.insert_one(node_data) - node_end = time.time() - logger.info(f"[数据库] 写入 {len(memory_nodes)} 个节点耗时: {node_end - node_start:.2f}秒") - - # 重新写入边 - edge_start = time.time() - for source, target, data in memory_edges: - edge_data = { - "source": source, - "target": target, - "strength": data.get("strength", 1), - "hash": self.hippocampus.calculate_edge_hash(source, target), - "created_time": data.get("created_time", datetime.datetime.now().timestamp()), - "last_modified": data.get("last_modified", datetime.datetime.now().timestamp()), - } - db.graph_data.edges.insert_one(edge_data) - edge_end = time.time() - logger.info(f"[数据库] 写入 {len(memory_edges)} 条边耗时: {edge_end - edge_start:.2f}秒") - - end_time = time.time() - logger.success(f"[数据库] 重新同步完成,总耗时: {end_time - start_time:.2f}秒") - logger.success(f"[数据库] 同步了 {len(memory_nodes)} 个节点和 {len(memory_edges)} 条边") - - -# 海马体 -class Hippocampus: - def __init__(self): - self.memory_graph = MemoryGraph() - self.llm_topic_judge = None - self.llm_summary_by_topic = None - self.entorhinal_cortex = None - self.parahippocampal_gyrus = None - self.config = None - - def initialize(self, global_config): - self.config = MemoryConfig.from_global_config(global_config) - # 初始化子组件 - self.entorhinal_cortex = EntorhinalCortex(self) - self.parahippocampal_gyrus = ParahippocampalGyrus(self) - # 从数据库加载记忆图 - self.entorhinal_cortex.sync_memory_from_db() - self.llm_topic_judge = LLMRequest(self.config.llm_topic_judge, request_type="memory") - self.llm_summary_by_topic = LLMRequest(self.config.llm_summary_by_topic, request_type="memory") - - def get_all_node_names(self) -> list: - """获取记忆图中所有节点的名字列表""" - return list(self.memory_graph.G.nodes()) - - @staticmethod - def calculate_node_hash(concept, memory_items) -> int: - """计算节点的特征值""" - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - sorted_items = sorted(memory_items) - content = f"{concept}:{'|'.join(sorted_items)}" - return hash(content) - - @staticmethod - def calculate_edge_hash(source, target) -> int: - """计算边的特征值""" - nodes = sorted([source, target]) - return hash(f"{nodes[0]}:{nodes[1]}") - - @staticmethod - def find_topic_llm(text, topic_num): - prompt = ( - f"这是一段文字:{text}。请你从这段话中总结出最多{topic_num}个关键的概念,可以是名词,动词,或者特定人物,帮我列出来," - f"将主题用逗号隔开,并加上<>,例如<主题1>,<主题2>......尽可能精简。只需要列举最多{topic_num}个话题就好,不要有序号,不要告诉我其他内容。" - f"如果确定找不出主题或者没有明显主题,返回。" - ) - return prompt - - @staticmethod - def topic_what(text, topic, time_info): - prompt = ( - f'这是一段文字,{time_info}:{text}。我想让你基于这段文字来概括"{topic}"这个概念,帮我总结成一句自然的话,' - f"可以包含时间和人物,以及具体的观点。只输出这句话就好" - ) - return prompt - - @staticmethod - def calculate_topic_num(text, compress_rate): - """计算文本的话题数量""" - information_content = calculate_information_content(text) - topic_by_length = text.count("\n") * compress_rate - topic_by_information_content = max(1, min(5, int((information_content - 3) * 2))) - topic_num = int((topic_by_length + topic_by_information_content) / 2) - logger.debug( - f"topic_by_length: {topic_by_length}, topic_by_information_content: {topic_by_information_content}, " - f"topic_num: {topic_num}" - ) - return topic_num - - def get_memory_from_keyword(self, keyword: str, max_depth: int = 2) -> list: - """从关键词获取相关记忆。 - - Args: - keyword (str): 关键词 - max_depth (int, optional): 记忆检索深度,默认为2。1表示只获取直接相关的记忆,2表示获取间接相关的记忆。 - - Returns: - list: 记忆列表,每个元素是一个元组 (topic, memory_items, similarity) - - topic: str, 记忆主题 - - memory_items: list, 该主题下的记忆项列表 - - similarity: float, 与关键词的相似度 - """ - if not keyword: - return [] - - # 获取所有节点 - all_nodes = list(self.memory_graph.G.nodes()) - memories = [] - - # 计算关键词的词集合 - keyword_words = set(jieba.cut(keyword)) - - # 遍历所有节点,计算相似度 - for node in all_nodes: - node_words = set(jieba.cut(node)) - all_words = keyword_words | node_words - v1 = [1 if word in keyword_words else 0 for word in all_words] - v2 = [1 if word in node_words else 0 for word in all_words] - similarity = cosine_similarity(v1, v2) - - # 如果相似度超过阈值,获取该节点的记忆 - if similarity >= 0.3: # 可以调整这个阈值 - node_data = self.memory_graph.G.nodes[node] - memory_items = node_data.get("memory_items", []) - if not isinstance(memory_items, list): - memory_items = [memory_items] if memory_items else [] - - memories.append((node, memory_items, similarity)) - - # 按相似度降序排序 - memories.sort(key=lambda x: x[2], reverse=True) - return memories - async def get_memory_from_text( self, text: str, @@ -1543,6 +829,287 @@ class Hippocampus: return activation_ratio +# 负责海马体与其他部分的交互 +class EntorhinalCortex: + def __init__(self, hippocampus: Hippocampus): + self.hippocampus = hippocampus + self.memory_graph = hippocampus.memory_graph + self.config = hippocampus.config + + def get_memory_sample(self): + """从数据库获取记忆样本""" + # 硬编码:每条消息最大记忆次数 + max_memorized_time_per_msg = 3 + + # 创建双峰分布的记忆调度器 + sample_scheduler = MemoryBuildScheduler( + n_hours1=self.config.memory_build_distribution[0], + std_hours1=self.config.memory_build_distribution[1], + weight1=self.config.memory_build_distribution[2], + n_hours2=self.config.memory_build_distribution[3], + std_hours2=self.config.memory_build_distribution[4], + weight2=self.config.memory_build_distribution[5], + total_samples=self.config.build_memory_sample_num, + ) + + timestamps = sample_scheduler.get_timestamp_array() + logger.info(f"回忆往事: {[time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts)) for ts in timestamps]}") + chat_samples = [] + for timestamp in timestamps: + messages = self.random_get_msg_snippet( + timestamp, self.config.build_memory_sample_length, max_memorized_time_per_msg + ) + if messages: + time_diff = (datetime.datetime.now().timestamp() - timestamp) / 3600 + logger.debug(f"成功抽取 {time_diff:.1f} 小时前的消息样本,共{len(messages)}条") + chat_samples.append(messages) + else: + logger.debug(f"时间戳 {timestamp} 的消息样本抽取失败") + + return chat_samples + + @staticmethod + def random_get_msg_snippet(target_timestamp: float, chat_size: int, max_memorized_time_per_msg: int) -> list: + """从数据库中随机获取指定时间戳附近的消息片段""" + try_count = 0 + while try_count < 3: + messages = get_closest_chat_from_db(length=chat_size, timestamp=target_timestamp) + if messages: + for message in messages: + if message["memorized_times"] >= max_memorized_time_per_msg: + messages = None + break + if messages: + for message in messages: + db.messages.update_one( + {"_id": message["_id"]}, {"$set": {"memorized_times": message["memorized_times"] + 1}} + ) + return messages + try_count += 1 + return None + + async def sync_memory_to_db(self): + """将记忆图同步到数据库""" + # 获取数据库中所有节点和内存中所有节点 + db_nodes = list(db.graph_data.nodes.find()) + memory_nodes = list(self.memory_graph.G.nodes(data=True)) + + # 转换数据库节点为字典格式,方便查找 + db_nodes_dict = {node["concept"]: node for node in db_nodes} + + # 检查并更新节点 + for concept, data in memory_nodes: + memory_items = data.get("memory_items", []) + if not isinstance(memory_items, list): + memory_items = [memory_items] if memory_items else [] + + # 计算内存中节点的特征值 + memory_hash = self.hippocampus.calculate_node_hash(concept, memory_items) + + # 获取时间信息 + created_time = data.get("created_time", datetime.datetime.now().timestamp()) + last_modified = data.get("last_modified", datetime.datetime.now().timestamp()) + + if concept not in db_nodes_dict: + # 数据库中缺少的节点,添加 + node_data = { + "concept": concept, + "memory_items": memory_items, + "hash": memory_hash, + "created_time": created_time, + "last_modified": last_modified, + } + db.graph_data.nodes.insert_one(node_data) + else: + # 获取数据库中节点的特征值 + db_node = db_nodes_dict[concept] + db_hash = db_node.get("hash", None) + + # 如果特征值不同,则更新节点 + if db_hash != memory_hash: + db.graph_data.nodes.update_one( + {"concept": concept}, + { + "$set": { + "memory_items": memory_items, + "hash": memory_hash, + "created_time": created_time, + "last_modified": last_modified, + } + }, + ) + + # 处理边的信息 + db_edges = list(db.graph_data.edges.find()) + memory_edges = list(self.memory_graph.G.edges(data=True)) + + # 创建边的哈希值字典 + db_edge_dict = {} + for edge in db_edges: + edge_hash = self.hippocampus.calculate_edge_hash(edge["source"], edge["target"]) + db_edge_dict[(edge["source"], edge["target"])] = {"hash": edge_hash, "strength": edge.get("strength", 1)} + + # 检查并更新边 + for source, target, data in memory_edges: + edge_hash = self.hippocampus.calculate_edge_hash(source, target) + edge_key = (source, target) + strength = data.get("strength", 1) + + # 获取边的时间信息 + created_time = data.get("created_time", datetime.datetime.now().timestamp()) + last_modified = data.get("last_modified", datetime.datetime.now().timestamp()) + + if edge_key not in db_edge_dict: + # 添加新边 + edge_data = { + "source": source, + "target": target, + "strength": strength, + "hash": edge_hash, + "created_time": created_time, + "last_modified": last_modified, + } + db.graph_data.edges.insert_one(edge_data) + else: + # 检查边的特征值是否变化 + if db_edge_dict[edge_key]["hash"] != edge_hash: + db.graph_data.edges.update_one( + {"source": source, "target": target}, + { + "$set": { + "hash": edge_hash, + "strength": strength, + "created_time": created_time, + "last_modified": last_modified, + } + }, + ) + + def sync_memory_from_db(self): + """从数据库同步数据到内存中的图结构""" + current_time = datetime.datetime.now().timestamp() + need_update = False + + # 清空当前图 + self.memory_graph.G.clear() + + # 从数据库加载所有节点 + nodes = list(db.graph_data.nodes.find()) + for node in nodes: + concept = node["concept"] + memory_items = node.get("memory_items", []) + if not isinstance(memory_items, list): + memory_items = [memory_items] if memory_items else [] + + # 检查时间字段是否存在 + if "created_time" not in node or "last_modified" not in node: + need_update = True + # 更新数据库中的节点 + update_data = {} + if "created_time" not in node: + update_data["created_time"] = current_time + if "last_modified" not in node: + update_data["last_modified"] = current_time + + db.graph_data.nodes.update_one({"concept": concept}, {"$set": update_data}) + logger.info(f"[时间更新] 节点 {concept} 添加缺失的时间字段") + + # 获取时间信息(如果不存在则使用当前时间) + created_time = node.get("created_time", current_time) + last_modified = node.get("last_modified", current_time) + + # 添加节点到图中 + self.memory_graph.G.add_node( + concept, memory_items=memory_items, created_time=created_time, last_modified=last_modified + ) + + # 从数据库加载所有边 + edges = list(db.graph_data.edges.find()) + for edge in edges: + source = edge["source"] + target = edge["target"] + strength = edge.get("strength", 1) + + # 检查时间字段是否存在 + if "created_time" not in edge or "last_modified" not in edge: + need_update = True + # 更新数据库中的边 + update_data = {} + if "created_time" not in edge: + update_data["created_time"] = current_time + if "last_modified" not in edge: + update_data["last_modified"] = current_time + + db.graph_data.edges.update_one({"source": source, "target": target}, {"$set": update_data}) + logger.info(f"[时间更新] 边 {source} - {target} 添加缺失的时间字段") + + # 获取时间信息(如果不存在则使用当前时间) + created_time = edge.get("created_time", current_time) + last_modified = edge.get("last_modified", current_time) + + # 只有当源节点和目标节点都存在时才添加边 + if source in self.memory_graph.G and target in self.memory_graph.G: + self.memory_graph.G.add_edge( + source, target, strength=strength, created_time=created_time, last_modified=last_modified + ) + + if need_update: + logger.success("[数据库] 已为缺失的时间字段进行补充") + + async def resync_memory_to_db(self): + """清空数据库并重新同步所有记忆数据""" + start_time = time.time() + logger.info("[数据库] 开始重新同步所有记忆数据...") + + # 清空数据库 + clear_start = time.time() + db.graph_data.nodes.delete_many({}) + db.graph_data.edges.delete_many({}) + clear_end = time.time() + logger.info(f"[数据库] 清空数据库耗时: {clear_end - clear_start:.2f}秒") + + # 获取所有节点和边 + memory_nodes = list(self.memory_graph.G.nodes(data=True)) + memory_edges = list(self.memory_graph.G.edges(data=True)) + + # 重新写入节点 + node_start = time.time() + for concept, data in memory_nodes: + memory_items = data.get("memory_items", []) + if not isinstance(memory_items, list): + memory_items = [memory_items] if memory_items else [] + + node_data = { + "concept": concept, + "memory_items": memory_items, + "hash": self.hippocampus.calculate_node_hash(concept, memory_items), + "created_time": data.get("created_time", datetime.datetime.now().timestamp()), + "last_modified": data.get("last_modified", datetime.datetime.now().timestamp()), + } + db.graph_data.nodes.insert_one(node_data) + node_end = time.time() + logger.info(f"[数据库] 写入 {len(memory_nodes)} 个节点耗时: {node_end - node_start:.2f}秒") + + # 重新写入边 + edge_start = time.time() + for source, target, data in memory_edges: + edge_data = { + "source": source, + "target": target, + "strength": data.get("strength", 1), + "hash": self.hippocampus.calculate_edge_hash(source, target), + "created_time": data.get("created_time", datetime.datetime.now().timestamp()), + "last_modified": data.get("last_modified", datetime.datetime.now().timestamp()), + } + db.graph_data.edges.insert_one(edge_data) + edge_end = time.time() + logger.info(f"[数据库] 写入 {len(memory_edges)} 条边耗时: {edge_end - edge_start:.2f}秒") + + end_time = time.time() + logger.success(f"[数据库] 重新同步完成,总耗时: {end_time - start_time:.2f}秒") + logger.success(f"[数据库] 同步了 {len(memory_nodes)} 个节点和 {len(memory_edges)} 条边") + + # 负责整合,遗忘,合并记忆 class ParahippocampalGyrus: def __init__(self, hippocampus: Hippocampus): From 5d663347766c631e04d513d090cce4260d57e8e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=A5=E6=B2=B3=E6=99=B4?= Date: Mon, 21 Apr 2025 14:21:08 +0900 Subject: [PATCH 2/6] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E6=9C=AA=E8=A7=A3?= =?UTF-8?q?=E6=9E=90=E7=9A=84=E9=A5=AE=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../memory_system/manually_alter_memory.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/plugins/memory_system/manually_alter_memory.py b/src/plugins/memory_system/manually_alter_memory.py index 818742113..1452d3d56 100644 --- a/src/plugins/memory_system/manually_alter_memory.py +++ b/src/plugins/memory_system/manually_alter_memory.py @@ -5,7 +5,8 @@ import time from pathlib import Path import datetime from rich.console import Console -from memory_manual_build import Memory_graph, Hippocampus # 海马体和记忆图 +from Hippocampus import Hippocampus # 海马体和记忆图 + from dotenv import load_dotenv @@ -45,13 +46,13 @@ else: # 查询节点信息 -def query_mem_info(memory_graph: Memory_graph): +def query_mem_info(hippocampus: Hippocampus): while True: query = input("\n请输入新的查询概念(输入'退出'以结束):") if query.lower() == "退出": break - items_list = memory_graph.get_related_item(query) + items_list = hippocampus.memory_graph.get_related_item(query) if items_list: have_memory = False first_layer, second_layer = items_list @@ -312,14 +313,11 @@ def alter_mem_edge(hippocampus: Hippocampus): async def main(): start_time = time.time() - # 创建记忆图 - memory_graph = Memory_graph() - # 创建海马体 - hippocampus = Hippocampus(memory_graph) + hippocampus = Hippocampus() # 从数据库同步数据 - hippocampus.sync_memory_from_db() + hippocampus.entorhinal_cortex.sync_memory_from_db() end_time = time.time() logger.info(f"\033[32m[加载海马体耗时: {end_time - start_time:.2f} 秒]\033[0m") @@ -338,7 +336,7 @@ async def main(): query = -1 if query == 0: - query_mem_info(memory_graph) + query_mem_info(hippocampus.memory_graph) elif query == 1: add_mem_node(hippocampus) elif query == 2: @@ -355,7 +353,7 @@ async def main(): print("已结束操作") break - hippocampus.sync_memory_to_db() + hippocampus.entorhinal_cortex.sync_memory_to_db() if __name__ == "__main__": From 1dd3c62c553df60d52137f6942ff2e60a015314e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=A5=E6=B2=B3=E6=99=B4?= Date: Mon, 21 Apr 2025 14:32:32 +0900 Subject: [PATCH 3/6] chore: PEP8 naming --- src/main.py | 6 +++--- src/plugins/chat/bot.py | 4 ++-- .../heartFC_chat/heartFC_controler.py | 16 ++++++++-------- .../heartFC_chat/heartFC_processor.py | 14 +++++++------- .../chat_module/heartFC_chat/pf_chatting.py | 6 +++--- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/main.py b/src/main.py index aad08b906..929cff7dd 100644 --- a/src/main.py +++ b/src/main.py @@ -18,7 +18,7 @@ from .plugins.remote import heartbeat_thread # noqa: F401 from .individuality.individuality import Individuality from .common.server import global_server from .plugins.chat_module.heartFC_chat.interest import InterestManager -from .plugins.chat_module.heartFC_chat.heartFC_controler import HeartFC_Controller +from .plugins.chat_module.heartFC_chat.heartFC_controler import HeartFCController logger = get_module_logger("main") @@ -118,8 +118,8 @@ class MainSystem: logger.success("兴趣管理器后台任务启动成功") # 初始化并独立启动 HeartFC_Chat - HeartFC_Controller() - heartfc_chat_instance = HeartFC_Controller.get_instance() + HeartFCController() + heartfc_chat_instance = HeartFCController.get_instance() if heartfc_chat_instance: await heartfc_chat_instance.start() logger.success("HeartFC_Chat 模块独立启动成功") diff --git a/src/plugins/chat/bot.py b/src/plugins/chat/bot.py index 314d20ff0..cfe4238ea 100644 --- a/src/plugins/chat/bot.py +++ b/src/plugins/chat/bot.py @@ -7,7 +7,7 @@ from ..chat_module.only_process.only_message_process import MessageProcessor from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig from ..chat_module.reasoning_chat.reasoning_chat import ReasoningChat -from ..chat_module.heartFC_chat.heartFC_processor import HeartFC_Processor +from ..chat_module.heartFC_chat.heartFC_processor import HeartFCProcessor from ..utils.prompt_builder import Prompt, global_prompt_manager import traceback @@ -29,7 +29,7 @@ class ChatBot: self.mood_manager = MoodManager.get_instance() # 获取情绪管理器单例 self.mood_manager.start_mood_update() # 启动情绪更新 self.reasoning_chat = ReasoningChat() - self.heartFC_processor = HeartFC_Processor() # 新增 + self.heartFC_processor = HeartFCProcessor() # 新增 # 创建初始化PFC管理器的任务,会在_ensure_started时执行 self.only_process_chat = MessageProcessor() diff --git a/src/plugins/chat_module/heartFC_chat/heartFC_controler.py b/src/plugins/chat_module/heartFC_chat/heartFC_controler.py index 389e030a4..a217f9785 100644 --- a/src/plugins/chat_module/heartFC_chat/heartFC_controler.py +++ b/src/plugins/chat_module/heartFC_chat/heartFC_controler.py @@ -20,18 +20,18 @@ chat_config = LogConfig( file_format=CHAT_STYLE_CONFIG["file_format"], ) -logger = get_module_logger("HeartFC_Controller", config=chat_config) +logger = get_module_logger("HeartFCController", config=chat_config) # 检测群聊兴趣的间隔时间 INTEREST_MONITOR_INTERVAL_SECONDS = 1 -class HeartFC_Controller: +class HeartFCController: _instance = None # For potential singleton access if needed by MessageManager def __init__(self): # --- Updated Init --- - if HeartFC_Controller._instance is not None: + if HeartFCController._instance is not None: # Prevent re-initialization if used as a singleton return self.gpt = ResponseGenerator() @@ -44,7 +44,7 @@ class HeartFC_Controller: self.pf_chatting_instances: Dict[str, PFChatting] = {} self._pf_chatting_lock = Lock() # --- End New PFChatting Management --- - HeartFC_Controller._instance = self # Register instance + HeartFCController._instance = self # Register instance # --- End Updated Init --- # --- Make dependencies accessible for PFChatting --- # These are accessed via the passed instance in PFChatting @@ -58,7 +58,7 @@ class HeartFC_Controller: def get_instance(cls): if cls._instance is None: # This might indicate an issue if called before initialization - logger.warning("HeartFC_Controller get_instance called before initialization.") + logger.warning("HeartFCController get_instance called before initialization.") # Optionally, initialize here if a strict singleton pattern is desired # cls._instance = cls() return cls._instance @@ -67,9 +67,9 @@ class HeartFC_Controller: async def start(self): """启动异步任务,如回复启动器""" - logger.debug("HeartFC_Controller 正在启动异步任务...") + logger.debug("HeartFCController 正在启动异步任务...") self._initialize_monitor_task() - logger.info("HeartFC_Controller 异步任务启动完成") + logger.info("HeartFCController 异步任务启动完成") def _initialize_monitor_task(self): """启动后台兴趣监控任务,可以检查兴趣是否足以开启心流对话""" @@ -89,7 +89,7 @@ class HeartFC_Controller: async with self._pf_chatting_lock: if stream_id not in self.pf_chatting_instances: logger.info(f"为流 {stream_id} 创建新的PFChatting实例") - # 传递 self (HeartFC_Controller 实例) 进行依赖注入 + # 传递 self (HeartFCController 实例) 进行依赖注入 instance = PFChatting(stream_id, self) # 执行异步初始化 if not await instance._initialize(): diff --git a/src/plugins/chat_module/heartFC_chat/heartFC_processor.py b/src/plugins/chat_module/heartFC_chat/heartFC_processor.py index 37708a94f..44849f821 100644 --- a/src/plugins/chat_module/heartFC_chat/heartFC_processor.py +++ b/src/plugins/chat_module/heartFC_chat/heartFC_processor.py @@ -25,7 +25,7 @@ logger = get_module_logger("heartFC_processor", config=processor_config) # INTEREST_INCREASE_THRESHOLD = 0.5 -class HeartFC_Processor: +class HeartFCProcessor: def __init__(self): self.storage = MessageStorage() self.interest_manager = InterestManager() @@ -97,21 +97,21 @@ class HeartFC_Processor: # 处理缓冲器结果 (Bombing logic) if not buffer_result: - F_type = "seglist" + f_type = "seglist" if message.message_segment.type != "seglist": - F_type = message.message_segment.type + f_type = message.message_segment.type else: if ( isinstance(message.message_segment.data, list) and all(isinstance(x, Seg) for x in message.message_segment.data) and len(message.message_segment.data) == 1 ): - F_type = message.message_segment.data[0].type - if F_type == "text": + f_type = message.message_segment.data[0].type + if f_type == "text": logger.debug(f"触发缓冲,消息:{message.processed_plain_text}") - elif F_type == "image": + elif f_type == "image": logger.debug("触发缓冲,表情包/图片等待中") - elif F_type == "seglist": + elif f_type == "seglist": logger.debug("触发缓冲,消息列表等待中") return # 被缓冲器拦截,不生成回复 diff --git a/src/plugins/chat_module/heartFC_chat/pf_chatting.py b/src/plugins/chat_module/heartFC_chat/pf_chatting.py index 59472fd14..620a9eead 100644 --- a/src/plugins/chat_module/heartFC_chat/pf_chatting.py +++ b/src/plugins/chat_module/heartFC_chat/pf_chatting.py @@ -25,7 +25,7 @@ logger = get_module_logger("PFCLoop", config=interest_log_config) # Logger Name # Forward declaration for type hinting if TYPE_CHECKING: - from .heartFC_controler import HeartFC_Controller + from .heartFC_controler import HeartFCController PLANNER_TOOL_DEFINITION = [ { @@ -61,7 +61,7 @@ class PFChatting: 只要计时器>0,循环就会继续。 """ - def __init__(self, chat_id: str, heartfc_controller_instance: "HeartFC_Controller"): + def __init__(self, chat_id: str, heartfc_controller_instance: "HeartFCController"): """ 初始化PFChatting实例。 @@ -771,7 +771,7 @@ class PFChatting: logger.error(traceback.format_exc()) return None - # --- Methods moved from HeartFC_Controller start --- + # --- Methods moved from HeartFCController start --- async def _create_thinking_message(self, anchor_message: Optional[MessageRecv]) -> Optional[str]: """创建思考消息 (尝试锚定到 anchor_message)""" if not anchor_message or not anchor_message.chat_stream: From 13e05adf806806f90edac86f8dbe294f89113991 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=A5=E6=B2=B3=E6=99=B4?= Date: Mon, 21 Apr 2025 14:42:33 +0900 Subject: [PATCH 4/6] =?UTF-8?q?chore:=20=E6=98=BE=E5=BC=8F=E8=BF=94?= =?UTF-8?q?=E5=9B=9ENone?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/individuality/individuality.py | 1 + src/plugins/PFC/conversation.py | 8 +++++ src/plugins/chat/message.py | 12 ++++---- src/plugins/chat/utils.py | 5 ++-- .../reasoning_chat/reasoning_chat.py | 29 ++++++++++--------- src/plugins/message/message_base.py | 1 - src/plugins/person_info/person_info.py | 2 +- src/plugins/remote/remote.py | 1 + 8 files changed, 35 insertions(+), 24 deletions(-) diff --git a/src/individuality/individuality.py b/src/individuality/individuality.py index e7616ec27..2a489338a 100644 --- a/src/individuality/individuality.py +++ b/src/individuality/individuality.py @@ -105,3 +105,4 @@ class Individuality: return self.personality.agreeableness elif factor == "neuroticism": return self.personality.neuroticism + return None diff --git a/src/plugins/PFC/conversation.py b/src/plugins/PFC/conversation.py index 598468e89..9502b755c 100644 --- a/src/plugins/PFC/conversation.py +++ b/src/plugins/PFC/conversation.py @@ -180,6 +180,7 @@ class Conversation: "time": datetime.datetime.now().strftime("%H:%M:%S"), } ) + return None elif action == "fetch_knowledge": self.waiter.wait_accumulated_time = 0 @@ -193,28 +194,35 @@ class Conversation: if knowledge: if topic not in self.conversation_info.knowledge_list: self.conversation_info.knowledge_list.append({"topic": topic, "knowledge": knowledge}) + return None else: self.conversation_info.knowledge_list[topic] += knowledge + return None + return None elif action == "rethink_goal": self.waiter.wait_accumulated_time = 0 self.state = ConversationState.RETHINKING await self.goal_analyzer.analyze_goal(conversation_info, observation_info) + return None elif action == "listening": self.state = ConversationState.LISTENING logger.info("倾听对方发言...") await self.waiter.wait_listening(conversation_info) + return None elif action == "end_conversation": self.should_continue = False logger.info("决定结束对话...") + return None else: # wait self.state = ConversationState.WAITING logger.info("等待更多信息...") await self.waiter.wait(self.conversation_info) + return None async def _send_timeout_message(self): """发送超时结束消息""" diff --git a/src/plugins/chat/message.py b/src/plugins/chat/message.py index cbea1fd92..87380e7c0 100644 --- a/src/plugins/chat/message.py +++ b/src/plugins/chat/message.py @@ -1,14 +1,13 @@ import time from dataclasses import dataclass -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Union import urllib3 -from .utils_image import image_manager - -from ..message.message_base import Seg, UserInfo, BaseMessageInfo, MessageBase -from .chat_stream import ChatStream from src.common.logger import get_module_logger +from .chat_stream import ChatStream +from .utils_image import image_manager +from ..message.message_base import Seg, UserInfo, BaseMessageInfo, MessageBase logger = get_module_logger("chat_message") @@ -207,7 +206,7 @@ class MessageProcessBase(Message): # 处理单个消息段 return await self._process_single_segment(segment) - async def _process_single_segment(self, seg: Seg) -> str: + async def _process_single_segment(self, seg: Seg) -> Union[str, None]: """处理单个消息段 Args: @@ -233,6 +232,7 @@ class MessageProcessBase(Message): elif seg.type == "reply": if self.reply and hasattr(self.reply, "processed_plain_text"): return f"[回复:{self.reply.processed_plain_text}]" + return None else: return f"[{seg.type}:{str(seg.data)}]" except Exception as e: diff --git a/src/plugins/chat/utils.py b/src/plugins/chat/utils.py index 9c98a16a5..3e4cfa52d 100644 --- a/src/plugins/chat/utils.py +++ b/src/plugins/chat/utils.py @@ -2,7 +2,7 @@ import random import time import re from collections import Counter -from typing import Dict, List +from typing import Dict, List, Optional import jieba import numpy as np @@ -688,7 +688,7 @@ def count_messages_between(start_time: float, end_time: float, stream_id: str) - return 0, 0 -def translate_timestamp_to_human_readable(timestamp: float, mode: str = "normal") -> str: +def translate_timestamp_to_human_readable(timestamp: float, mode: str = "normal") -> Optional[str]: """将时间戳转换为人类可读的时间格式 Args: @@ -716,6 +716,7 @@ def translate_timestamp_to_human_readable(timestamp: float, mode: str = "normal" return f"{int(diff / 86400)}天前:\n" else: return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) + ":\n" + return None def parse_text_timestamps(text: str, mode: str = "normal") -> str: diff --git a/src/plugins/chat_module/reasoning_chat/reasoning_chat.py b/src/plugins/chat_module/reasoning_chat/reasoning_chat.py index d149f68b0..2eb56c83b 100644 --- a/src/plugins/chat_module/reasoning_chat/reasoning_chat.py +++ b/src/plugins/chat_module/reasoning_chat/reasoning_chat.py @@ -1,25 +1,26 @@ import time -from random import random import traceback -from typing import List -from ...memory_system.Hippocampus import HippocampusManager -from ...moods.moods import MoodManager -from ....config.config import global_config -from ...chat.emoji_manager import emoji_manager +from random import random +from typing import List, Optional + +from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig +from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager from .reasoning_generator import ResponseGenerator +from ...chat.chat_stream import chat_manager +from ...chat.emoji_manager import emoji_manager from ...chat.message import MessageSending, MessageRecv, MessageThinking, MessageSet +from ...chat.message_buffer import message_buffer from ...chat.messagesender import message_manager -from ...storage.storage import MessageStorage from ...chat.utils import is_mentioned_bot_in_message from ...chat.utils_image import image_path_to_base64 -from ...willing.willing_manager import willing_manager +from ...memory_system.Hippocampus import HippocampusManager from ...message import UserInfo, Seg -from src.common.logger import get_module_logger, CHAT_STYLE_CONFIG, LogConfig -from ...chat.chat_stream import chat_manager +from ...moods.moods import MoodManager from ...person_info.relationship_manager import relationship_manager -from ...chat.message_buffer import message_buffer -from src.plugins.respon_info_catcher.info_catcher import info_catcher_manager +from ...storage.storage import MessageStorage from ...utils.timer_calculater import Timer +from ...willing.willing_manager import willing_manager +from ....config.config import global_config # 定义日志配置 chat_config = LogConfig( @@ -61,7 +62,7 @@ class ReasoningChat: return thinking_id @staticmethod - async def _send_response_messages(message, chat, response_set: List[str], thinking_id) -> MessageSending: + async def _send_response_messages(message, chat, response_set: List[str], thinking_id) -> Optional[MessageSending]: """发送回复消息""" container = message_manager.get_container(chat.stream_id) thinking_message = None @@ -74,7 +75,7 @@ class ReasoningChat: if not thinking_message: logger.warning("未找到对应的思考消息,可能已超时被移除") - return + return None thinking_start_time = thinking_message.thinking_start_time message_set = MessageSet(chat, thinking_id) diff --git a/src/plugins/message/message_base.py b/src/plugins/message/message_base.py index 2f1776702..b853d469a 100644 --- a/src/plugins/message/message_base.py +++ b/src/plugins/message/message_base.py @@ -12,7 +12,6 @@ class Seg: - 对于 text 类型,data 是字符串 - 对于 image 类型,data 是 base64 字符串 - 对于 seglist 类型,data 是 Seg 列表 - translated_data: 经过翻译处理的数据(可选) """ type: str diff --git a/src/plugins/person_info/person_info.py b/src/plugins/person_info/person_info.py index 8105b330f..b4404988e 100644 --- a/src/plugins/person_info/person_info.py +++ b/src/plugins/person_info/person_info.py @@ -169,7 +169,7 @@ class PersonInfoManager: """给某个用户取名""" if not person_id: logger.debug("取名失败:person_id不能为空") - return + return None old_name = await self.get_value(person_id, "person_name") old_reason = await self.get_value(person_id, "name_reason") diff --git a/src/plugins/remote/remote.py b/src/plugins/remote/remote.py index 0d119a3ec..5bc4dab14 100644 --- a/src/plugins/remote/remote.py +++ b/src/plugins/remote/remote.py @@ -134,3 +134,4 @@ def main(): heartbeat_thread.start() return heartbeat_thread # 返回线程对象,便于外部控制 + return None From 1e481a7af1d3ca85a532efd8517d4b4c486a694d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=A5=E6=B2=B3=E6=99=B4?= Date: Mon, 21 Apr 2025 14:49:14 +0900 Subject: [PATCH 5/6] fix: unreachable? --- .../chat_module/heartFC_chat/pf_chatting.py | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/src/plugins/chat_module/heartFC_chat/pf_chatting.py b/src/plugins/chat_module/heartFC_chat/pf_chatting.py index 620a9eead..2bb89987a 100644 --- a/src/plugins/chat_module/heartFC_chat/pf_chatting.py +++ b/src/plugins/chat_module/heartFC_chat/pf_chatting.py @@ -374,6 +374,22 @@ class PFChatting: ) action_taken_this_cycle = False + # --- Print Timer Results --- # + if cycle_timers: # 先检查cycle_timers是否非空 + timer_strings = [] + for name, elapsed in cycle_timers.items(): + # 直接格式化存储在字典中的浮点数 elapsed + formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" + timer_strings.append(f"{name}: {formatted_time}") + + if timer_strings: # 如果有有效计时器数据才打印 + logger.debug( + f"{log_prefix} test testtesttesttesttesttesttesttesttesttest Cycle Timers: {'; '.join(timer_strings)}" + ) + + # --- Timer Decrement --- # + cycle_duration = time.monotonic() - loop_cycle_start_time + except Exception as e_cycle: logger.error(f"{log_prefix} 循环周期执行时发生错误: {e_cycle}") logger.error(traceback.format_exc()) @@ -387,21 +403,6 @@ class PFChatting: self._processing_lock.release() logger.trace(f"{log_prefix} 循环释放了处理锁.") - # --- Print Timer Results --- # - if cycle_timers: # 先检查cycle_timers是否非空 - timer_strings = [] - for name, elapsed in cycle_timers.items(): - # 直接格式化存储在字典中的浮点数 elapsed - formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}秒" - timer_strings.append(f"{name}: {formatted_time}") - - if timer_strings: # 如果有有效计时器数据才打印 - logger.debug( - f"{log_prefix} test testtesttesttesttesttesttesttesttesttest Cycle Timers: {'; '.join(timer_strings)}" - ) - - # --- Timer Decrement --- # - cycle_duration = time.monotonic() - loop_cycle_start_time async with self._timer_lock: self._loop_timer -= cycle_duration # Log timer decrement less aggressively From cbfa8508c6a32fcfd4bec1a19284d6a108456090 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=A5=E6=B2=B3=E6=99=B4?= Date: Mon, 21 Apr 2025 15:56:17 +0900 Subject: [PATCH 6/6] chore: update README and template.env for better formatting and clarity --- README.md | 7 +++---- template/template.env | 20 +++++++++++++++----- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 656f536ad..26cd30f61 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@

- Logo + Logo
@@ -34,7 +34,6 @@ · 提出新特性

-

## 新版0.6.x部署前先阅读:https://docs.mai-mai.org/manual/usage/mmc_q_a @@ -53,7 +52,7 @@
- 麦麦演示视频 + 麦麦演示视频
👆 点击观看麦麦演示视频 👆
@@ -186,7 +185,7 @@ MaiCore是一个开源项目,我们非常欢迎你的参与。你的贡献, 感谢各位大佬! - + contributors **也感谢每一位给麦麦发展提出宝贵意见与建议的用户,感谢陪伴麦麦走到现在的你们** diff --git a/template/template.env b/template/template.env index 06e9b07ec..c1a6dd0dc 100644 --- a/template/template.env +++ b/template/template.env @@ -29,8 +29,18 @@ CHAT_ANY_WHERE_KEY= SILICONFLOW_KEY= # 定义日志相关配置 -SIMPLE_OUTPUT=true # 精简控制台输出格式 -CONSOLE_LOG_LEVEL=INFO # 自定义日志的默认控制台输出日志级别 -FILE_LOG_LEVEL=DEBUG # 自定义日志的默认文件输出日志级别 -DEFAULT_CONSOLE_LOG_LEVEL=SUCCESS # 原生日志的控制台输出日志级别(nonebot就是这一类) -DEFAULT_FILE_LOG_LEVEL=DEBUG # 原生日志的默认文件输出日志级别(nonebot就是这一类) \ No newline at end of file + +# 精简控制台输出格式 +SIMPLE_OUTPUT=true + +# 自定义日志的默认控制台输出日志级别 +CONSOLE_LOG_LEVEL=INFO + +# 自定义日志的默认文件输出日志级别 +FILE_LOG_LEVEL=DEBUG + +# 原生日志的控制台输出日志级别(nonebot就是这一类) +DEFAULT_CONSOLE_LOG_LEVEL=SUCCESS + +# 原生日志的默认文件输出日志级别(nonebot就是这一类) +DEFAULT_FILE_LOG_LEVEL=DEBUG