better:优化记忆系统,记忆系统现可以整合记忆
This commit is contained in:
@@ -221,9 +221,6 @@ class PromptBuilder:
|
||||
moderation_prompt=await global_prompt_manager.get_prompt_async("moderation_prompt"),
|
||||
)
|
||||
|
||||
prompt = await relationship_manager.convert_all_person_sign_to_person_name(prompt)
|
||||
prompt = parse_text_timestamps(prompt, mode="lite")
|
||||
|
||||
return prompt
|
||||
|
||||
async def _build_prompt_normal(self, chat_stream, message_txt: str, sender_name: str = "某人") -> tuple[str, str]:
|
||||
|
||||
@@ -195,6 +195,7 @@ class Hippocampus:
|
||||
self.config = None
|
||||
|
||||
def initialize(self, global_config):
|
||||
# 使用导入的 MemoryConfig dataclass 和其 from_global_config 方法
|
||||
self.config = MemoryConfig.from_global_config(global_config)
|
||||
# 初始化子组件
|
||||
self.entorhinal_cortex = EntorhinalCortex(self)
|
||||
@@ -237,7 +238,7 @@ class Hippocampus:
|
||||
# 不再需要 time_info 参数
|
||||
prompt = (
|
||||
f'这是一段文字:\n{text}\n\n我想让你基于这段文字来概括"{topic}"这个概念,帮我总结成一句自然的话,'
|
||||
f"可以包含时间和人物,以及具体的观点。只输出这句话就好"
|
||||
f"要求包含对这个概念的定义,内容,知识,可以包含时间和人物。只输出这句话就好"
|
||||
)
|
||||
return prompt
|
||||
|
||||
@@ -795,7 +796,7 @@ class EntorhinalCortex:
|
||||
def get_memory_sample(self):
|
||||
"""从数据库获取记忆样本"""
|
||||
# 硬编码:每条消息最大记忆次数
|
||||
max_memorized_time_per_msg = 3
|
||||
max_memorized_time_per_msg = 2
|
||||
|
||||
# 创建双峰分布的记忆调度器
|
||||
sample_scheduler = MemoryBuildScheduler(
|
||||
@@ -1337,26 +1338,56 @@ class ParahippocampalGyrus:
|
||||
logger.info("[遗忘] 开始检查节点...")
|
||||
node_check_start = time.time()
|
||||
for node in nodes_to_check:
|
||||
# 检查节点是否存在,以防在迭代中被移除(例如边移除导致)
|
||||
if node not in self.memory_graph.G:
|
||||
continue
|
||||
|
||||
node_data = self.memory_graph.G.nodes[node]
|
||||
|
||||
# 首先获取记忆项
|
||||
memory_items = node_data.get("memory_items", [])
|
||||
if not isinstance(memory_items, list):
|
||||
memory_items = [memory_items] if memory_items else []
|
||||
|
||||
# 新增:检查节点是否为空
|
||||
if not memory_items:
|
||||
try:
|
||||
self.memory_graph.G.remove_node(node)
|
||||
node_changes["removed"].append(f"{node}(空节点)") # 标记为空节点移除
|
||||
logger.debug(f"[遗忘] 移除了空的节点: {node}")
|
||||
except nx.NetworkXError as e:
|
||||
logger.warning(f"[遗忘] 移除空节点 {node} 时发生错误(可能已被移除): {e}")
|
||||
continue # 处理下一个节点
|
||||
|
||||
# --- 如果节点不为空,则执行原来的不活跃检查和随机移除逻辑 ---
|
||||
last_modified = node_data.get("last_modified", current_time)
|
||||
|
||||
# 条件1:检查是否长时间未修改 (超过24小时)
|
||||
if current_time - last_modified > 3600 * 24:
|
||||
memory_items = node_data.get("memory_items", [])
|
||||
if not isinstance(memory_items, list):
|
||||
memory_items = [memory_items] if memory_items else []
|
||||
|
||||
# 条件2:再次确认节点包含记忆项(理论上已确认,但作为保险)
|
||||
if memory_items:
|
||||
current_count = len(memory_items)
|
||||
removed_item = random.choice(memory_items)
|
||||
memory_items.remove(removed_item)
|
||||
# 如果列表非空,才进行随机选择
|
||||
if current_count > 0:
|
||||
removed_item = random.choice(memory_items)
|
||||
try:
|
||||
memory_items.remove(removed_item)
|
||||
|
||||
if memory_items:
|
||||
self.memory_graph.G.nodes[node]["memory_items"] = memory_items
|
||||
self.memory_graph.G.nodes[node]["last_modified"] = current_time
|
||||
node_changes["reduced"].append(f"{node} (数量: {current_count} -> {len(memory_items)})")
|
||||
else:
|
||||
self.memory_graph.G.remove_node(node)
|
||||
node_changes["removed"].append(node)
|
||||
# 条件3:检查移除后 memory_items 是否变空
|
||||
if memory_items: # 如果移除后列表不为空
|
||||
# self.memory_graph.G.nodes[node]["memory_items"] = memory_items # 直接修改列表即可
|
||||
self.memory_graph.G.nodes[node]["last_modified"] = current_time # 更新修改时间
|
||||
node_changes["reduced"].append(f"{node} (数量: {current_count} -> {len(memory_items)})")
|
||||
else: # 如果移除后列表为空
|
||||
# 尝试移除节点,处理可能的错误
|
||||
try:
|
||||
self.memory_graph.G.remove_node(node)
|
||||
node_changes["removed"].append(f"{node}(遗忘清空)") # 标记为遗忘清空
|
||||
logger.debug(f"[遗忘] 节点 {node} 因移除最后一项而被清空。")
|
||||
except nx.NetworkXError as e:
|
||||
logger.warning(f"[遗忘] 尝试移除节点 {node} 时发生错误(可能已被移除):{e}")
|
||||
except ValueError:
|
||||
# 这个错误理论上不应发生,因为 removed_item 来自 memory_items
|
||||
logger.warning(f"[遗忘] 尝试从节点 '{node}' 移除不存在的项目 '{removed_item[:30]}...'")
|
||||
node_check_end = time.time()
|
||||
logger.info(f"[遗忘] 节点检查耗时: {node_check_end - node_check_start:.2f}秒")
|
||||
|
||||
@@ -1395,6 +1426,116 @@ class ParahippocampalGyrus:
|
||||
end_time = time.time()
|
||||
logger.info(f"[遗忘] 总耗时: {end_time - start_time:.2f}秒")
|
||||
|
||||
async def operation_consolidate_memory(self):
|
||||
"""整合记忆:合并节点内相似的记忆项"""
|
||||
start_time = time.time()
|
||||
percentage = self.config.consolidate_memory_percentage
|
||||
similarity_threshold = self.config.consolidation_similarity_threshold
|
||||
logger.info(f"[整合] 开始检查记忆节点... 检查比例: {percentage:.2%}, 合并阈值: {similarity_threshold}")
|
||||
|
||||
# 获取所有至少有2条记忆项的节点
|
||||
eligible_nodes = []
|
||||
for node, data in self.memory_graph.G.nodes(data=True):
|
||||
memory_items = data.get("memory_items", [])
|
||||
if isinstance(memory_items, list) and len(memory_items) >= 2:
|
||||
eligible_nodes.append(node)
|
||||
|
||||
if not eligible_nodes:
|
||||
logger.info("[整合] 没有找到包含多个记忆项的节点,无需整合。")
|
||||
return
|
||||
|
||||
# 计算需要检查的节点数量
|
||||
check_nodes_count = max(1, min(len(eligible_nodes), int(len(eligible_nodes) * percentage)))
|
||||
|
||||
# 随机抽取节点进行检查
|
||||
try:
|
||||
nodes_to_check = random.sample(eligible_nodes, check_nodes_count)
|
||||
except ValueError as e:
|
||||
logger.error(f"[整合] 抽样节点时出错: {e}")
|
||||
return
|
||||
|
||||
logger.info(f"[整合] 将检查 {len(nodes_to_check)} / {len(eligible_nodes)} 个符合条件的节点。")
|
||||
|
||||
merged_count = 0
|
||||
nodes_modified = set()
|
||||
current_timestamp = datetime.datetime.now().timestamp()
|
||||
|
||||
for node in nodes_to_check:
|
||||
node_data = self.memory_graph.G.nodes[node]
|
||||
memory_items = node_data.get("memory_items", [])
|
||||
if not isinstance(memory_items, list) or len(memory_items) < 2:
|
||||
continue # 双重检查,理论上不会进入
|
||||
|
||||
items_copy = list(memory_items) # 创建副本以安全迭代和修改
|
||||
|
||||
# 遍历所有记忆项组合
|
||||
for item1, item2 in combinations(items_copy, 2):
|
||||
# 确保 item1 和 item2 仍然存在于原始列表中(可能已被之前的合并移除)
|
||||
if item1 not in memory_items or item2 not in memory_items:
|
||||
continue
|
||||
|
||||
similarity = self._calculate_item_similarity(item1, item2)
|
||||
|
||||
if similarity >= similarity_threshold:
|
||||
logger.debug(f"[整合] 节点 '{node}' 中发现相似项 (相似度: {similarity:.2f}):")
|
||||
logger.trace(f" - '{item1}'")
|
||||
logger.trace(f" - '{item2}'")
|
||||
|
||||
# 比较信息量
|
||||
info1 = calculate_information_content(item1)
|
||||
info2 = calculate_information_content(item2)
|
||||
|
||||
if info1 >= info2:
|
||||
item_to_keep = item1
|
||||
item_to_remove = item2
|
||||
else:
|
||||
item_to_keep = item2
|
||||
item_to_remove = item1
|
||||
|
||||
# 从原始列表中移除信息量较低的项
|
||||
try:
|
||||
memory_items.remove(item_to_remove)
|
||||
logger.info(f"[整合] 已合并节点 '{node}' 中的记忆,保留: '{item_to_keep[:60]}...', 移除: '{item_to_remove[:60]}...'" )
|
||||
merged_count += 1
|
||||
nodes_modified.add(node)
|
||||
node_data['last_modified'] = current_timestamp # 更新修改时间
|
||||
_merged_in_this_node = True
|
||||
break # 每个节点每次检查只合并一对
|
||||
except ValueError:
|
||||
# 如果项已经被移除(例如,在之前的迭代中作为 item_to_keep),则跳过
|
||||
logger.warning(f"[整合] 尝试移除节点 '{node}' 中不存在的项 '{item_to_remove[:30]}...',可能已被合并。")
|
||||
continue
|
||||
# # 如果节点内发生了合并,更新节点数据 (这种方式不安全,会丢失其他属性)
|
||||
# if merged_in_this_node:
|
||||
# self.memory_graph.G.nodes[node]["memory_items"] = memory_items
|
||||
|
||||
|
||||
if merged_count > 0:
|
||||
logger.info(f"[整合] 共合并了 {merged_count} 对相似记忆项,分布在 {len(nodes_modified)} 个节点中。")
|
||||
sync_start = time.time()
|
||||
logger.info("[整合] 开始将变更同步到数据库...")
|
||||
# 使用 resync 更安全地处理删除和添加
|
||||
await self.hippocampus.entorhinal_cortex.resync_memory_to_db()
|
||||
sync_end = time.time()
|
||||
logger.info(f"[整合] 数据库同步耗时: {sync_end - sync_start:.2f}秒")
|
||||
else:
|
||||
logger.info("[整合] 本次检查未发现需要合并的记忆项。")
|
||||
|
||||
end_time = time.time()
|
||||
logger.info(f"[整合] 整合检查完成,总耗时: {end_time - start_time:.2f}秒")
|
||||
|
||||
@staticmethod
|
||||
def _calculate_item_similarity(item1: str, item2: str) -> float:
|
||||
"""计算两条记忆项文本的余弦相似度"""
|
||||
words1 = set(jieba.cut(item1))
|
||||
words2 = set(jieba.cut(item2))
|
||||
all_words = words1 | words2
|
||||
if not all_words:
|
||||
return 0.0
|
||||
v1 = [1 if word in words1 else 0 for word in all_words]
|
||||
v2 = [1 if word in words2 else 0 for word in all_words]
|
||||
return cosine_similarity(v1, v2)
|
||||
|
||||
|
||||
class HippocampusManager:
|
||||
_instance = None
|
||||
@@ -1433,12 +1574,12 @@ class HippocampusManager:
|
||||
edge_count = len(memory_graph.edges())
|
||||
|
||||
logger.success(f"""--------------------------------
|
||||
记忆系统参数配置:
|
||||
构建间隔: {global_config.build_memory_interval}秒|样本数: {config.build_memory_sample_num},长度: {config.build_memory_sample_length}|压缩率: {config.memory_compress_rate}
|
||||
记忆构建分布: {config.memory_build_distribution}
|
||||
遗忘间隔: {global_config.forget_memory_interval}秒|遗忘比例: {global_config.memory_forget_percentage}|遗忘: {config.memory_forget_time}小时之后
|
||||
记忆图统计信息: 节点数量: {node_count}, 连接数量: {edge_count}
|
||||
--------------------------------""") # noqa: E501
|
||||
记忆系统参数配置:
|
||||
构建间隔: {global_config.build_memory_interval}秒|样本数: {config.build_memory_sample_num},长度: {config.build_memory_sample_length}|压缩率: {config.memory_compress_rate}
|
||||
记忆构建分布: {config.memory_build_distribution}
|
||||
遗忘间隔: {global_config.forget_memory_interval}秒|遗忘比例: {global_config.memory_forget_percentage}|遗忘: {config.memory_forget_time}小时之后
|
||||
记忆图统计信息: 节点数量: {node_count}, 连接数量: {edge_count}
|
||||
--------------------------------""") # noqa: E501
|
||||
|
||||
return self._hippocampus
|
||||
|
||||
@@ -1453,6 +1594,14 @@ class HippocampusManager:
|
||||
if not self._initialized:
|
||||
raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法")
|
||||
return await self._hippocampus.parahippocampal_gyrus.operation_forget_topic(percentage)
|
||||
|
||||
async def consolidate_memory(self):
|
||||
"""整合记忆的公共接口"""
|
||||
if not self._initialized:
|
||||
raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法")
|
||||
# 注意:目前 operation_consolidate_memory 内部直接读取配置,percentage 参数暂时无效
|
||||
# 如果需要外部控制比例,需要修改 operation_consolidate_memory
|
||||
return await self._hippocampus.parahippocampal_gyrus.operation_consolidate_memory()
|
||||
|
||||
async def get_memory_from_text(
|
||||
self,
|
||||
|
||||
@@ -18,19 +18,29 @@ class MemoryConfig:
|
||||
# 记忆过滤相关配置
|
||||
memory_ban_words: List[str] # 记忆过滤词列表
|
||||
|
||||
# 新增:记忆整合相关配置
|
||||
consolidation_similarity_threshold: float # 相似度阈值
|
||||
consolidate_memory_percentage: float # 检查节点比例
|
||||
consolidate_memory_interval: int # 记忆整合间隔
|
||||
|
||||
llm_topic_judge: str # 话题判断模型
|
||||
llm_summary_by_topic: str # 话题总结模型
|
||||
|
||||
@classmethod
|
||||
def from_global_config(cls, global_config):
|
||||
"""从全局配置创建记忆系统配置"""
|
||||
# 使用 getattr 提供默认值,防止全局配置缺少这些项
|
||||
return cls(
|
||||
memory_build_distribution=global_config.memory_build_distribution,
|
||||
build_memory_sample_num=global_config.build_memory_sample_num,
|
||||
build_memory_sample_length=global_config.build_memory_sample_length,
|
||||
memory_compress_rate=global_config.memory_compress_rate,
|
||||
memory_forget_time=global_config.memory_forget_time,
|
||||
memory_ban_words=global_config.memory_ban_words,
|
||||
llm_topic_judge=global_config.llm_topic_judge,
|
||||
llm_summary_by_topic=global_config.llm_summary_by_topic,
|
||||
memory_build_distribution=getattr(global_config, "memory_build_distribution", (24, 12, 0.5, 168, 72, 0.5)), # 添加默认值
|
||||
build_memory_sample_num=getattr(global_config, "build_memory_sample_num", 5),
|
||||
build_memory_sample_length=getattr(global_config, "build_memory_sample_length", 30),
|
||||
memory_compress_rate=getattr(global_config, "memory_compress_rate", 0.1),
|
||||
memory_forget_time=getattr(global_config, "memory_forget_time", 24 * 7),
|
||||
memory_ban_words=getattr(global_config, "memory_ban_words", []),
|
||||
# 新增加载整合配置,并提供默认值
|
||||
consolidation_similarity_threshold=getattr(global_config, "consolidation_similarity_threshold", 0.7),
|
||||
consolidate_memory_percentage=getattr(global_config, "consolidate_memory_percentage", 0.01),
|
||||
consolidate_memory_interval=getattr(global_config, "consolidate_memory_interval", 1000),
|
||||
llm_topic_judge=getattr(global_config, "llm_topic_judge", "default_judge_model"), # 添加默认模型名
|
||||
llm_summary_by_topic=getattr(global_config, "llm_summary_by_topic", "default_summary_model"), # 添加默认模型名
|
||||
)
|
||||
|
||||
@@ -101,36 +101,6 @@ class RelationshipManager:
|
||||
# await person_info_manager.update_one_field(person_id, "user_avatar", user_avatar)
|
||||
await person_info_manager.qv_person_name(person_id, user_nickname, user_cardname, user_avatar)
|
||||
|
||||
@staticmethod
|
||||
async def convert_all_person_sign_to_person_name(input_text: str):
|
||||
"""将所有人的<platform:user_id:nickname:cardname>格式转换为person_name"""
|
||||
try:
|
||||
# 使用正则表达式匹配<platform:user_id:nickname:cardname>格式
|
||||
all_person = person_info_manager.person_name_list
|
||||
|
||||
pattern = r"<([^:]+):(\d+):([^:]+):([^>]+)>"
|
||||
matches = re.findall(pattern, input_text)
|
||||
|
||||
# 遍历匹配结果,将<platform:user_id:nickname:cardname>替换为person_name
|
||||
result_text = input_text
|
||||
for platform, user_id, nickname, cardname in matches:
|
||||
person_id = person_info_manager.get_person_id(platform, user_id)
|
||||
# 默认使用昵称作为人名
|
||||
person_name = nickname.strip() if nickname.strip() else cardname.strip()
|
||||
|
||||
if person_id in all_person:
|
||||
if all_person[person_id] is not None:
|
||||
person_name = all_person[person_id]
|
||||
|
||||
# print(f"将<{platform}:{user_id}:{nickname}:{cardname}>替换为{person_name}")
|
||||
|
||||
result_text = result_text.replace(f"<{platform}:{user_id}:{nickname}:{cardname}>", person_name)
|
||||
|
||||
return result_text
|
||||
except Exception:
|
||||
logger.error(traceback.format_exc())
|
||||
return input_text
|
||||
|
||||
async def calculate_update_relationship_value(self, chat_stream: ChatStream, label: str, stance: str) -> tuple:
|
||||
"""计算并变更关系值
|
||||
新的关系值变更计算方式:
|
||||
|
||||
Reference in New Issue
Block a user