better:优化工作记忆

This commit is contained in:
SengokuCola
2025-06-03 00:03:38 +08:00
parent bc2c7c5c1e
commit 1ef124bf5e
3 changed files with 91 additions and 136 deletions

View File

@@ -162,7 +162,7 @@ class WorkingMemoryProcessor(BaseProcessor):
memory_brief = memory_summary.get("brief")
memory_points = memory_summary.get("points", [])
for point in memory_points:
memory_str += f"记忆要点:{point}\n"
memory_str += f"{point}\n"
working_memory_info = WorkingMemoryInfo()
if memory_str:

View File

@@ -286,110 +286,110 @@ class MemoryManager:
logger.error(f"生成总结时出错: {str(e)}")
return default_summary
async def refine_memory(self, memory_id: str, requirements: str = "") -> Dict[str, Any]:
"""
对记忆进行精简操作,根据要求修改要点、总结和概括
# async def refine_memory(self, memory_id: str, requirements: str = "") -> Dict[str, Any]:
# """
# 对记忆进行精简操作,根据要求修改要点、总结和概括
Args:
memory_id: 记忆ID
requirements: 精简要求,描述如何修改记忆,包括可能需要移除的要点
# Args:
# memory_id: 记忆ID
# requirements: 精简要求,描述如何修改记忆,包括可能需要移除的要点
Returns:
修改后的记忆总结字典
"""
# 获取指定ID的记忆项
logger.info(f"精简记忆: {memory_id}")
memory_item = self.get_by_id(memory_id)
if not memory_item:
raise ValueError(f"未找到ID为{memory_id}的记忆项")
# Returns:
# 修改后的记忆总结字典
# """
# # 获取指定ID的记忆项
# logger.info(f"精简记忆: {memory_id}")
# memory_item = self.get_by_id(memory_id)
# if not memory_item:
# raise ValueError(f"未找到ID为{memory_id}的记忆项")
# 增加精简次数
memory_item.increase_compress_count()
# # 增加精简次数
# memory_item.increase_compress_count()
summary = memory_item.summary
# summary = memory_item.summary
# 使用LLM根据要求对总结、概括和要点进行精简修改
prompt = f"""
请根据以下要求,对记忆内容的主题和关键要点进行精简,模拟记忆的遗忘过程:
要求:{requirements}
你可以随机对关键要点进行压缩,模糊或者丢弃,修改后,同样修改主题
# # 使用LLM根据要求对总结、概括和要点进行精简修改
# prompt = f"""
# 请根据以下要求,对记忆内容的主题和关键要点进行精简,模拟记忆的遗忘过程:
# 要求:{requirements}
# 你可以随机对关键要点进行压缩,模糊或者丢弃,修改后,同样修改主题
目前主题:{summary["brief"]}
# 目前主题:{summary["brief"]}
目前关键要点:
{chr(10).join([f"- {point}" for point in summary.get("points", [])])}
# 目前关键要点:
# {chr(10).join([f"- {point}" for point in summary.get("points", [])])}
请生成修改后的主题和关键要点,遵循以下格式:
```json
{{
"brief": "修改后的主题20字以内",
"points": [
"修改后的要点",
"修改后的要点"
]
}}
```
请确保输出是有效的JSON格式不要添加任何额外的说明或解释。
"""
# 定义默认的精简结果
default_refined = {
"brief": summary["brief"],
"points": summary.get("points", ["未知的要点"])[:1], # 默认只保留第一个要点
}
# 请生成修改后的主题和关键要点,遵循以下格式:
# ```json
# {{
# "brief": "修改后的主题20字以内",
# "points": [
# "修改后的要点",
# "修改后的要点"
# ]
# }}
# ```
# 请确保输出是有效的JSON格式不要添加任何额外的说明或解释。
# """
# # 定义默认的精简结果
# default_refined = {
# "brief": summary["brief"],
# "points": summary.get("points", ["未知的要点"])[:1], # 默认只保留第一个要点
# }
try:
# 调用LLM修改总结、概括和要点
response, _ = await self.llm_summarizer.generate_response_async(prompt)
logger.debug(f"精简记忆响应: {response}")
# 使用repair_json处理响应
try:
# 修复JSON格式
fixed_json_string = repair_json(response)
# try:
# # 调用LLM修改总结、概括和要点
# response, _ = await self.llm_summarizer.generate_response_async(prompt)
# logger.debug(f"精简记忆响应: {response}")
# # 使用repair_json处理响应
# try:
# # 修复JSON格式
# fixed_json_string = repair_json(response)
# 将修复后的字符串解析为Python对象
if isinstance(fixed_json_string, str):
try:
refined_data = json.loads(fixed_json_string)
except json.JSONDecodeError as decode_error:
logger.error(f"JSON解析错误: {str(decode_error)}")
refined_data = default_refined
else:
# 如果repair_json直接返回了字典对象直接使用
refined_data = fixed_json_string
# # 将修复后的字符串解析为Python对象
# if isinstance(fixed_json_string, str):
# try:
# refined_data = json.loads(fixed_json_string)
# except json.JSONDecodeError as decode_error:
# logger.error(f"JSON解析错误: {str(decode_error)}")
# refined_data = default_refined
# else:
# # 如果repair_json直接返回了字典对象直接使用
# refined_data = fixed_json_string
# 确保是字典类型
if not isinstance(refined_data, dict):
logger.error(f"修复后的JSON不是字典类型: {type(refined_data)}")
refined_data = default_refined
# # 确保是字典类型
# if not isinstance(refined_data, dict):
# logger.error(f"修复后的JSON不是字典类型: {type(refined_data)}")
# refined_data = default_refined
# 更新总结
summary["brief"] = refined_data.get("brief", "主题未知的记忆")
# # 更新总结
# summary["brief"] = refined_data.get("brief", "主题未知的记忆")
# 更新关键要点
points = refined_data.get("points", [])
if isinstance(points, list) and points:
# 确保所有要点都是字符串
summary["points"] = [str(point) for point in points if point is not None]
else:
# 如果points不是列表或为空使用默认值
summary["points"] = ["主要要点已遗忘"]
# # 更新关键要点
# points = refined_data.get("points", [])
# if isinstance(points, list) and points:
# # 确保所有要点都是字符串
# summary["points"] = [str(point) for point in points if point is not None]
# else:
# # 如果points不是列表或为空使用默认值
# summary["points"] = ["主要要点已遗忘"]
except Exception as e:
logger.error(f"精简记忆出错: {str(e)}")
traceback.print_exc()
# except Exception as e:
# logger.error(f"精简记忆出错: {str(e)}")
# traceback.print_exc()
# 出错时使用简化的默认精简
summary["brief"] = summary["brief"] + " (已简化)"
summary["points"] = summary.get("points", ["未知的要点"])[:1]
# # 出错时使用简化的默认精简
# summary["brief"] = summary["brief"] + " (已简化)"
# summary["points"] = summary.get("points", ["未知的要点"])[:1]
except Exception as e:
logger.error(f"精简记忆调用LLM出错: {str(e)}")
traceback.print_exc()
# except Exception as e:
# logger.error(f"精简记忆调用LLM出错: {str(e)}")
# traceback.print_exc()
# 更新原记忆项的总结
memory_item.set_summary(summary)
# # 更新原记忆项的总结
# memory_item.set_summary(summary)
return memory_item
# return memory_item
def decay_memory(self, memory_id: str, decay_factor: float = 0.8) -> bool:
"""

View File

@@ -112,10 +112,10 @@ class WorkingMemory:
self.memory_manager.delete(memory_id)
continue
# 计算衰减量
if memory_item.memory_strength < 5:
await self.memory_manager.refine_memory(
memory_id, f"由于时间过去了{self.auto_decay_interval}秒,记忆变的模糊,所以需要压缩"
)
# if memory_item.memory_strength < 5:
# await self.memory_manager.refine_memory(
# memory_id, f"由于时间过去了{self.auto_decay_interval}秒,记忆变的模糊,所以需要压缩"
# )
async def merge_memory(self, memory_id1: str, memory_id2: str) -> MemoryItem:
"""合并记忆
@@ -127,51 +127,6 @@ class WorkingMemory:
memory_id1=memory_id1, memory_id2=memory_id2, reason="两端记忆有重复的内容"
)
# 暂时没用,先留着
async def simulate_memory_blur(self, chat_id: str, blur_rate: float = 0.2):
"""
模拟记忆模糊过程,随机选择一部分记忆进行精简
Args:
chat_id: 聊天ID
blur_rate: 模糊比率(0-1之间),表示有多少比例的记忆会被精简
"""
memory = self.get_memory(chat_id)
# 获取所有字符串类型且有总结的记忆
all_summarized_memories = []
for type_items in memory._memory.values():
for item in type_items:
if isinstance(item.data, str) and hasattr(item, "summary") and item.summary:
all_summarized_memories.append(item)
if not all_summarized_memories:
return
# 计算要模糊的记忆数量
blur_count = max(1, int(len(all_summarized_memories) * blur_rate))
# 随机选择要模糊的记忆
memories_to_blur = random.sample(all_summarized_memories, min(blur_count, len(all_summarized_memories)))
# 对选中的记忆进行精简
for memory_item in memories_to_blur:
try:
# 根据记忆强度决定模糊程度
if memory_item.memory_strength > 7:
requirement = "保留所有重要信息,仅略微精简"
elif memory_item.memory_strength > 4:
requirement = "保留核心要点,适度精简细节"
else:
requirement = "只保留最关键的1-2个要点大幅精简内容"
# 进行精简
await memory.refine_memory(memory_item.id, requirement)
print(f"已模糊记忆 {memory_item.id},强度: {memory_item.memory_strength}, 要求: {requirement}")
except Exception as e:
print(f"模糊记忆 {memory_item.id} 时出错: {str(e)}")
async def shutdown(self) -> None:
"""关闭管理器,停止所有任务"""
if self.decay_task and not self.decay_task.done():