记忆系统优化

优化了记忆的连接建立
重启了遗忘功能
This commit is contained in:
SengokuCola
2025-03-11 01:13:17 +08:00
parent 62523409d1
commit 354d6d0deb
7 changed files with 1422 additions and 104 deletions

View File

@@ -121,9 +121,9 @@ async def build_memory_task():
@scheduler.scheduled_job("interval", seconds=global_config.forget_memory_interval, id="forget_memory")
async def forget_memory_task():
"""每30秒执行一次记忆构建"""
# print("\033[1;32m[记忆遗忘]\033[0m 开始遗忘记忆...")
# await hippocampus.operation_forget_topic(percentage=0.1)
# print("\033[1;32m[记忆遗忘]\033[0m 记忆遗忘完成")
print("\033[1;32m[记忆遗忘]\033[0m 开始遗忘记忆...")
await hippocampus.operation_forget_topic(percentage=0.1)
print("\033[1;32m[记忆遗忘]\033[0m 记忆遗忘完成")
@scheduler.scheduled_job("interval", seconds=global_config.build_memory_interval + 10, id="merge_memory")

View File

@@ -203,7 +203,7 @@ class EmojiManager:
try:
prompt = f'这是{global_config.BOT_NICKNAME}将要发送的消息内容:\n{text}\n若要为其配上表情包,请你输出这个表情包应该表达怎样的情感,应该给人什么样的感觉,不要太简洁也不要太长,注意不要输出任何对消息内容的分析内容,只输出\"一种什么样的感觉\"中间的形容词部分。'
content, _ = await self.llm_emotion_judge.generate_response_async(prompt)
content, _ = await self.llm_emotion_judge.generate_response_async(prompt,temperature=1.5)
logger.info(f"输出描述: {content}")
return content

View File

@@ -79,7 +79,7 @@ class KnowledgeLibrary:
content = f.read()
# 按1024字符分段
segments = [content[i:i+600] for i in range(0, len(content), 600)]
segments = [content[i:i+600] for i in range(0, len(content), 300)]
# 处理每个分段
for segment in segments:

View File

@@ -25,26 +25,46 @@ class Memory_graph:
self.db = Database.get_instance()
def connect_dot(self, concept1, concept2):
# 如果边已存在,增加 strength
# 避免自连接
if concept1 == concept2:
return
current_time = datetime.datetime.now().timestamp()
# 如果边已存在,增加 strength
if self.G.has_edge(concept1, concept2):
self.G[concept1][concept2]['strength'] = self.G[concept1][concept2].get('strength', 1) + 1
# 更新最后修改时间
self.G[concept1][concept2]['last_modified'] = current_time
else:
# 如果是新边初始化 strength 为 1
self.G.add_edge(concept1, concept2, strength=1)
# 如果是新边,初始化 strength 为 1
self.G.add_edge(concept1, concept2,
strength=1,
created_time=current_time, # 添加创建时间
last_modified=current_time) # 添加最后修改时间
def add_dot(self, concept, memory):
current_time = datetime.datetime.now().timestamp()
if concept in self.G:
# 如果节点已存在,将新记忆添加到现有列表中
if 'memory_items' in self.G.nodes[concept]:
if not isinstance(self.G.nodes[concept]['memory_items'], list):
# 如果当前不是列表,将其转换为列表
self.G.nodes[concept]['memory_items'] = [self.G.nodes[concept]['memory_items']]
self.G.nodes[concept]['memory_items'].append(memory)
# 更新最后修改时间
self.G.nodes[concept]['last_modified'] = current_time
else:
self.G.nodes[concept]['memory_items'] = [memory]
# 如果节点存在但没有memory_items,说明是第一次添加memory,设置created_time
if 'created_time' not in self.G.nodes[concept]:
self.G.nodes[concept]['created_time'] = current_time
self.G.nodes[concept]['last_modified'] = current_time
else:
# 如果是新节点创建新的记忆列表
self.G.add_node(concept, memory_items=[memory])
# 如果是新节点,创建新的记忆列表
self.G.add_node(concept,
memory_items=[memory],
created_time=current_time, # 添加创建时间
last_modified=current_time) # 添加最后修改时间
def get_dot(self, concept):
# 检查节点是否存在于图中
@@ -191,15 +211,11 @@ class Hippocampus:
async def memory_compress(self, messages: list, compress_rate=0.1):
"""压缩消息记录为记忆
Args:
messages: 消息记录字典列表每个字典包含text和time字段
compress_rate: 压缩率
Returns:
set: (话题, 记忆) 元组集合
tuple: (压缩记忆集合, 相似主题字典)
"""
if not messages:
return set()
return set(), {}
# 合并消息文本,同时保留时间信息
input_text = ""
@@ -246,12 +262,33 @@ class Hippocampus:
# 等待所有任务完成
compressed_memory = set()
similar_topics_dict = {} # 存储每个话题的相似主题列表
for topic, task in tasks:
response = await task
if response:
compressed_memory.add((topic, response[0]))
# 为每个话题查找相似的已存在主题
existing_topics = list(self.memory_graph.G.nodes())
similar_topics = []
for existing_topic in existing_topics:
topic_words = set(jieba.cut(topic))
existing_words = set(jieba.cut(existing_topic))
all_words = topic_words | existing_words
v1 = [1 if word in topic_words else 0 for word in all_words]
v2 = [1 if word in existing_words else 0 for word in all_words]
similarity = cosine_similarity(v1, v2)
if similarity >= 0.6:
similar_topics.append((existing_topic, similarity))
similar_topics.sort(key=lambda x: x[1], reverse=True)
similar_topics = similar_topics[:5]
similar_topics_dict[topic] = similar_topics
return compressed_memory
return compressed_memory, similar_topics_dict
def calculate_topic_num(self, text, compress_rate):
"""计算文本的话题数量"""
@@ -265,33 +302,40 @@ class Hippocampus:
return topic_num
async def operation_build_memory(self, chat_size=20):
# 最近消息获取频率
time_frequency = {'near': 2, 'mid': 4, 'far': 2}
memory_sample = self.get_memory_sample(chat_size, time_frequency)
for i, input_text in enumerate(memory_sample, 1):
# 加载进度可视化
time_frequency = {'near': 3, 'mid': 8, 'far': 5}
memory_samples = self.get_memory_sample(chat_size, time_frequency)
for i, messages in enumerate(memory_samples, 1):
all_topics = []
progress = (i / len(memory_sample)) * 100
# 加载进度可视化
progress = (i / len(memory_samples)) * 100
bar_length = 30
filled_length = int(bar_length * i // len(memory_sample))
filled_length = int(bar_length * i // len(memory_samples))
bar = '' * filled_length + '-' * (bar_length - filled_length)
logger.debug(f"进度: [{bar}] {progress:.1f}% ({i}/{len(memory_sample)})")
logger.debug(f"进度: [{bar}] {progress:.1f}% ({i}/{len(memory_samples)})")
# 生成压缩后记忆 ,表现为 (话题,记忆) 的元组
compressed_memory = set()
compress_rate = 0.1
compressed_memory = await self.memory_compress(input_text, compress_rate)
logger.info(f"压缩后记忆数量: {len(compressed_memory)}")
# 将记忆加入到图谱中
compressed_memory, similar_topics_dict = await self.memory_compress(messages, compress_rate)
logger.info(f"压缩后记忆数量: {len(compressed_memory)},似曾相识的话题: {len(similar_topics_dict)}")
for topic, memory in compressed_memory:
logger.info(f"添加节点: {topic}")
self.memory_graph.add_dot(topic, memory)
all_topics.append(topic) # 收集所有话题
all_topics.append(topic)
# 连接相似的已存在主题
if topic in similar_topics_dict:
similar_topics = similar_topics_dict[topic]
for similar_topic, similarity in similar_topics:
if topic != similar_topic:
strength = int(similarity * 10)
logger.info(f"连接相似节点: {topic}{similar_topic} (强度: {strength})")
self.memory_graph.G.add_edge(topic, similar_topic, strength=strength)
# 连接同批次的相关话题
for i in range(len(all_topics)):
for j in range(i + 1, len(all_topics)):
logger.info(f"连接节点: {all_topics[i]}{all_topics[j]}")
logger.info(f"连接同批次节点: {all_topics[i]}{all_topics[j]}")
self.memory_graph.connect_dot(all_topics[i], all_topics[j])
self.sync_memory_to_db()
@@ -302,7 +346,7 @@ class Hippocampus:
db_nodes = list(self.memory_graph.db.db.graph_data.nodes.find())
memory_nodes = list(self.memory_graph.G.nodes(data=True))
# 转换数据库节点为字典格式方便查找
# 转换数据库节点为字典格式,方便查找
db_nodes_dict = {node['concept']: node for node in db_nodes}
# 检查并更新节点
@@ -313,13 +357,19 @@ class Hippocampus:
# 计算内存中节点的特征值
memory_hash = self.calculate_node_hash(concept, memory_items)
# 获取时间信息
created_time = data.get('created_time', datetime.datetime.now().timestamp())
last_modified = data.get('last_modified', datetime.datetime.now().timestamp())
if concept not in db_nodes_dict:
# 数据库中缺少的节点添加
# 数据库中缺少的节点,添加
node_data = {
'concept': concept,
'memory_items': memory_items,
'hash': memory_hash
'hash': memory_hash,
'created_time': created_time,
'last_modified': last_modified
}
self.memory_graph.db.db.graph_data.nodes.insert_one(node_data)
else:
@@ -327,25 +377,21 @@ class Hippocampus:
db_node = db_nodes_dict[concept]
db_hash = db_node.get('hash', None)
# 如果特征值不同则更新节点
# 如果特征值不同,则更新节点
if db_hash != memory_hash:
self.memory_graph.db.db.graph_data.nodes.update_one(
{'concept': concept},
{'$set': {
'memory_items': memory_items,
'hash': memory_hash
'hash': memory_hash,
'created_time': created_time,
'last_modified': last_modified
}}
)
# 检查并删除数据库中多余的节点
memory_concepts = set(node[0] for node in memory_nodes)
for db_node in db_nodes:
if db_node['concept'] not in memory_concepts:
self.memory_graph.db.db.graph_data.nodes.delete_one({'concept': db_node['concept']})
# 处理边的信息
db_edges = list(self.memory_graph.db.db.graph_data.edges.find())
memory_edges = list(self.memory_graph.G.edges())
memory_edges = list(self.memory_graph.G.edges(data=True))
# 创建边的哈希值字典
db_edge_dict = {}
@@ -357,10 +403,14 @@ class Hippocampus:
}
# 检查并更新边
for source, target in memory_edges:
for source, target, data in memory_edges:
edge_hash = self.calculate_edge_hash(source, target)
edge_key = (source, target)
strength = self.memory_graph.G[source][target].get('strength', 1)
strength = data.get('strength', 1)
# 获取边的时间信息
created_time = data.get('created_time', datetime.datetime.now().timestamp())
last_modified = data.get('last_modified', datetime.datetime.now().timestamp())
if edge_key not in db_edge_dict:
# 添加新边
@@ -368,7 +418,9 @@ class Hippocampus:
'source': source,
'target': target,
'strength': strength,
'hash': edge_hash
'hash': edge_hash,
'created_time': created_time,
'last_modified': last_modified
}
self.memory_graph.db.db.graph_data.edges.insert_one(edge_data)
else:
@@ -378,20 +430,12 @@ class Hippocampus:
{'source': source, 'target': target},
{'$set': {
'hash': edge_hash,
'strength': strength
'strength': strength,
'created_time': created_time,
'last_modified': last_modified
}}
)
# 删除多余的边
memory_edge_set = set(memory_edges)
for edge_key in db_edge_dict:
if edge_key not in memory_edge_set:
source, target = edge_key
self.memory_graph.db.db.graph_data.edges.delete_one({
'source': source,
'target': target
})
def sync_memory_from_db(self):
"""从数据库同步数据到内存中的图结构"""
# 清空当前图
@@ -405,61 +449,107 @@ class Hippocampus:
# 确保memory_items是列表
if not isinstance(memory_items, list):
memory_items = [memory_items] if memory_items else []
# 获取时间信息
created_time = node.get('created_time', datetime.datetime.now().timestamp())
last_modified = node.get('last_modified', datetime.datetime.now().timestamp())
# 添加节点到图中
self.memory_graph.G.add_node(concept, memory_items=memory_items)
self.memory_graph.G.add_node(concept,
memory_items=memory_items,
created_time=created_time,
last_modified=last_modified)
# 从数据库加载所有边
edges = self.memory_graph.db.db.graph_data.edges.find()
for edge in edges:
source = edge['source']
target = edge['target']
strength = edge.get('strength', 1) # 获取 strength默认为 1
strength = edge.get('strength', 1) # 获取 strength,默认为 1
# 获取时间信息
created_time = edge.get('created_time', datetime.datetime.now().timestamp())
last_modified = edge.get('last_modified', datetime.datetime.now().timestamp())
# 只有当源节点和目标节点都存在时才添加边
if source in self.memory_graph.G and target in self.memory_graph.G:
self.memory_graph.G.add_edge(source, target, strength=strength)
self.memory_graph.G.add_edge(source, target,
strength=strength,
created_time=created_time,
last_modified=last_modified)
async def operation_forget_topic(self, percentage=0.1):
"""随机选择图中一定比例的节点进行检查根据条件决定是否遗忘"""
# 获取所有节点
"""随机选择图中一定比例的节点和边进行检查,根据时间条件决定是否遗忘"""
all_nodes = list(self.memory_graph.G.nodes())
# 计算要检查的节点数量
check_count = max(1, int(len(all_nodes) * percentage))
# 随机选择节点
nodes_to_check = random.sample(all_nodes, check_count)
forgotten_nodes = []
all_edges = list(self.memory_graph.G.edges())
check_nodes_count = max(1, int(len(all_nodes) * percentage))
check_edges_count = max(1, int(len(all_edges) * percentage))
nodes_to_check = random.sample(all_nodes, check_nodes_count)
edges_to_check = random.sample(all_edges, check_edges_count)
edge_changes = {'weakened': 0, 'removed': 0}
node_changes = {'reduced': 0, 'removed': 0}
current_time = datetime.datetime.now().timestamp()
# 检查并遗忘连接
logger.info("开始检查连接...")
for source, target in edges_to_check:
edge_data = self.memory_graph.G[source][target]
last_modified = edge_data.get('last_modified')
# print(source,target)
# print(f"float(last_modified):{float(last_modified)}" )
# print(f"current_time:{current_time}")
# print(f"current_time - last_modified:{current_time - last_modified}")
if current_time - last_modified > 3600*24: # test
current_strength = edge_data.get('strength', 1)
new_strength = current_strength - 1
if new_strength <= 0:
self.memory_graph.G.remove_edge(source, target)
edge_changes['removed'] += 1
logger.info(f"\033[1;31m[连接移除]\033[0m {source} - {target}")
else:
edge_data['strength'] = new_strength
edge_data['last_modified'] = current_time
edge_changes['weakened'] += 1
logger.info(f"\033[1;34m[连接减弱]\033[0m {source} - {target} (强度: {current_strength} -> {new_strength})")
# 检查并遗忘话题
logger.info("开始检查节点...")
for node in nodes_to_check:
# 获取节点的连接数
connections = self.memory_graph.G.degree(node)
# 获取节点的内容条数
memory_items = self.memory_graph.G.nodes[node].get('memory_items', [])
if not isinstance(memory_items, list):
memory_items = [memory_items] if memory_items else []
content_count = len(memory_items)
# 检查连接强度
weak_connections = True
if connections > 1: # 只有当连接数大于1时才检查强度
for neighbor in self.memory_graph.G.neighbors(node):
strength = self.memory_graph.G[node][neighbor].get('strength', 1)
if strength > 2:
weak_connections = False
break
# 如果满足遗忘条件
if (connections <= 1 and weak_connections) or content_count <= 2:
removed_item = self.memory_graph.forget_topic(node)
if removed_item:
forgotten_nodes.append((node, removed_item))
logger.debug(f"遗忘节点 {node} 的记忆: {removed_item}")
# 同步到数据库
if forgotten_nodes:
node_data = self.memory_graph.G.nodes[node]
last_modified = node_data.get('last_modified', current_time)
if current_time - last_modified > 3600*24: # test
memory_items = node_data.get('memory_items', [])
if not isinstance(memory_items, list):
memory_items = [memory_items] if memory_items else []
if memory_items:
current_count = len(memory_items)
removed_item = random.choice(memory_items)
memory_items.remove(removed_item)
if memory_items:
self.memory_graph.G.nodes[node]['memory_items'] = memory_items
self.memory_graph.G.nodes[node]['last_modified'] = current_time
node_changes['reduced'] += 1
logger.info(f"\033[1;33m[记忆减少]\033[0m {node} (记忆数量: {current_count} -> {len(memory_items)})")
else:
self.memory_graph.G.remove_node(node)
node_changes['removed'] += 1
logger.info(f"\033[1;31m[节点移除]\033[0m {node}")
if any(count > 0 for count in edge_changes.values()) or any(count > 0 for count in node_changes.values()):
self.sync_memory_to_db()
logger.debug(f"完成遗忘操作,共遗忘 {len(forgotten_nodes)} 个节点的记忆")
logger.info("\n遗忘操作统计:")
logger.info(f"连接变化: {edge_changes['weakened']} 个减弱, {edge_changes['removed']} 个移除")
logger.info(f"节点变化: {node_changes['reduced']} 个减少记忆, {node_changes['removed']} 个移除")
else:
logger.debug("本次检查没有节点满足遗忘条件")
logger.info("\n本次检查没有节点或连接满足遗忘条件")
async def merge_memory(self, topic):
"""
@@ -486,7 +576,7 @@ class Hippocampus:
logger.debug(f"选择的记忆:\n{merged_text}")
# 使用memory_compress生成新的压缩记忆
compressed_memories = await self.memory_compress(selected_memories, 0.1)
compressed_memories, _ = await self.memory_compress(selected_memories, 0.1)
# 从原记忆列表中移除被选中的记忆
for memory in selected_memories:

File diff suppressed because it is too large Load Diff