feat(memory): 移除传统内存系统并优化内存图谱

- 移除整个传统内存系统,包括内存系统模块及所有相关组件
- 删除弃用的内存组件:增强型内存激活器、海马体采样器、内存构建器、内存块、内存遗忘引擎、内存格式器、内存融合器、内存管理器、内存元数据索引、内存查询规划器、内存系统、消息集合处理器、消息集合存储、向量内存存储_v2
- 更新内存图配置,采用增强型检索设置
- 优化内存管理器查询功能,以分析完整对话上下文
- 更新机器人配置模板版本至7.6.1,新增内存图表检索参数

重大变更:旧版内存系统已被完全移除。所有内存功能现依赖于内存图系统。请更新配置以包含新的内存图检索参数。
This commit is contained in:
Windpicker-owo
2025-11-06 09:18:59 +08:00
parent b6a693895b
commit 59081848e2
22 changed files with 77 additions and 6428 deletions

View File

@@ -1,73 +0,0 @@
"""
简化记忆系统模块
移除即时记忆和长期记忆分类,实现统一记忆架构和智能遗忘机制
"""
# 核心数据结构
# 激活器
from .enhanced_memory_activator import MemoryActivator, enhanced_memory_activator, memory_activator
from .memory_chunk import (
ConfidenceLevel,
ContentStructure,
ImportanceLevel,
MemoryChunk,
MemoryMetadata,
MemoryType,
create_memory_chunk,
)
# 兼容性别名
from .memory_chunk import MemoryChunk as Memory
# 遗忘引擎
from .memory_forgetting_engine import ForgettingConfig, MemoryForgettingEngine, get_memory_forgetting_engine
from .memory_formatter import format_memories_bracket_style
# 记忆管理器
from .memory_manager import MemoryManager, MemoryResult, memory_manager
# 记忆核心系统
from .memory_system import MemorySystem, MemorySystemConfig, get_memory_system, initialize_memory_system
# Vector DB存储系统
from .vector_memory_storage_v2 import VectorMemoryStorage, VectorStorageConfig, get_vector_memory_storage
__all__ = [
"ConfidenceLevel",
"ContentStructure",
"ForgettingConfig",
"ImportanceLevel",
"Memory", # 兼容性别名
# 激活器
"MemoryActivator",
# 核心数据结构
"MemoryChunk",
# 遗忘引擎
"MemoryForgettingEngine",
# 记忆管理器
"MemoryManager",
"MemoryMetadata",
"MemoryResult",
# 记忆系统
"MemorySystem",
"MemorySystemConfig",
"MemoryType",
# Vector DB存储
"VectorMemoryStorage",
"VectorStorageConfig",
"create_memory_chunk",
"enhanced_memory_activator", # 兼容性别名
# 格式化工具
"format_memories_bracket_style",
"get_memory_forgetting_engine",
"get_memory_system",
"get_vector_memory_storage",
"initialize_memory_system",
"memory_activator",
"memory_manager",
]
# 版本信息
__version__ = "3.0.0"
__author__ = "MoFox Team"
__description__ = "简化记忆系统 - 统一记忆架构与智能遗忘机制"

View File

@@ -1,240 +0,0 @@
"""
记忆激活器
记忆系统的激活器组件
"""
import difflib
from datetime import datetime
import orjson
from json_repair import repair_json
from src.chat.memory_system.memory_manager import MemoryResult
from src.chat.utils.prompt import Prompt, global_prompt_manager
from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.llm_models.utils_model import LLMRequest
logger = get_logger("memory_activator")
def get_keywords_from_json(json_str) -> list:
"""
从JSON字符串中提取关键词列表
Args:
json_str: JSON格式的字符串
Returns:
List[str]: 关键词列表
"""
try:
# 使用repair_json修复JSON格式
fixed_json = repair_json(json_str)
# 如果repair_json返回的是字符串需要解析为Python对象
result = orjson.loads(fixed_json) if isinstance(fixed_json, str) else fixed_json
return result.get("keywords", [])
except Exception as e:
logger.error(f"解析关键词JSON失败: {e}")
return []
def init_prompt():
# --- Memory Activator Prompt ---
memory_activator_prompt = """
你是一个记忆分析器,你需要根据以下信息来进行记忆检索
以下是一段聊天记录,请根据这些信息,总结出几个关键词作为记忆检索的触发词
聊天记录:
{obs_info_text}
用户想要回复的消息:
{target_message}
历史关键词(请避免重复提取这些关键词):
{cached_keywords}
请输出一个json格式包含以下字段
{{
"keywords": ["关键词1", "关键词2", "关键词3",......]
}}
不要输出其他多余内容只输出json格式就好
"""
Prompt(memory_activator_prompt, "memory_activator_prompt")
class MemoryActivator:
"""记忆激活器"""
def __init__(self):
self.key_words_model = LLMRequest(
model_set=model_config.model_task_config.utils_small,
request_type="memory.activator",
)
self.running_memory = []
self.cached_keywords = set() # 用于缓存历史关键词
self.last_memory_query_time = 0 # 上次查询记忆的时间
async def activate_memory_with_chat_history(self, target_message, chat_history_prompt) -> list[dict]:
"""
激活记忆
"""
# 如果记忆系统被禁用,直接返回空列表
if not global_config.memory.enable_memory:
return []
# 将缓存的关键词转换为字符串用于prompt
cached_keywords_str = ", ".join(self.cached_keywords) if self.cached_keywords else "暂无历史关键词"
prompt = await global_prompt_manager.format_prompt(
"memory_activator_prompt",
obs_info_text=chat_history_prompt,
target_message=target_message,
cached_keywords=cached_keywords_str,
)
# 生成关键词
response, (reasoning_content, model_name, _) = await self.key_words_model.generate_response_async(
prompt, temperature=0.5
)
keywords = list(get_keywords_from_json(response))
# 更新关键词缓存
if keywords:
# 限制缓存大小最多保留10个关键词
if len(self.cached_keywords) > 10:
# 转换为列表,移除最早的关键词
cached_list = list(self.cached_keywords)
self.cached_keywords = set(cached_list[-8:])
# 添加新的关键词到缓存
self.cached_keywords.update(keywords)
logger.debug(f"记忆关键词: {self.cached_keywords}")
# 使用记忆系统获取相关记忆
memory_results = await self._query_unified_memory(keywords, target_message)
# 处理和记忆结果
if memory_results:
for result in memory_results:
# 检查是否已存在相似内容的记忆
exists = any(
m["content"] == result.content
or difflib.SequenceMatcher(None, m["content"], result.content).ratio() >= 0.7
for m in self.running_memory
)
if not exists:
memory_entry = {
"topic": result.memory_type,
"content": result.content,
"timestamp": datetime.fromtimestamp(result.timestamp).isoformat(),
"duration": 1,
"confidence": result.confidence,
"importance": result.importance,
"source": result.source,
"relevance_score": result.relevance_score, # 添加相关度评分
}
self.running_memory.append(memory_entry)
logger.debug(f"添加新记忆: {result.memory_type} - {result.content}")
# 激活时所有已有记忆的duration+1达到3则移除
for m in self.running_memory[:]:
m["duration"] = m.get("duration", 1) + 1
self.running_memory = [m for m in self.running_memory if m["duration"] < 3]
# 限制同时加载的记忆条数最多保留最后5条
if len(self.running_memory) > 5:
self.running_memory = self.running_memory[-5:]
return self.running_memory
async def _query_unified_memory(self, keywords: list[str], query_text: str) -> list[MemoryResult]:
"""查询统一记忆系统"""
try:
# 使用记忆系统
from src.chat.memory_system.memory_system import get_memory_system
memory_system = get_memory_system()
if not memory_system or memory_system.status.value != "ready":
logger.warning("记忆系统未就绪")
return []
# 构建查询上下文
context = {"keywords": keywords, "query_intent": "conversation_response"}
# 查询记忆
memories = await memory_system.retrieve_relevant_memories(
query_text=query_text,
user_id="global", # 使用全局作用域
context=context,
limit=5,
)
# 转换为 MemoryResult 格式
memory_results = []
for memory in memories:
result = MemoryResult(
content=memory.display,
memory_type=memory.memory_type.value,
confidence=memory.metadata.confidence.value,
importance=memory.metadata.importance.value,
timestamp=memory.metadata.created_at,
source="unified_memory",
relevance_score=memory.metadata.relevance_score,
)
memory_results.append(result)
logger.debug(f"统一记忆查询返回 {len(memory_results)} 条结果")
return memory_results
except Exception as e:
logger.error(f"查询统一记忆失败: {e}")
return []
async def get_instant_memory(self, target_message: str, chat_id: str) -> str | None:
"""
获取即时记忆 - 兼容原有接口(使用统一存储)
"""
try:
# 使用统一存储系统获取相关记忆
from src.chat.memory_system.memory_system import get_memory_system
memory_system = get_memory_system()
if not memory_system or memory_system.status.value != "ready":
return None
context = {"query_intent": "instant_response", "chat_id": chat_id}
memories = await memory_system.retrieve_relevant_memories(
query_text=target_message, user_id="global", context=context, limit=1
)
if memories:
return memories[0].display
return None
except Exception as e:
logger.error(f"获取即时记忆失败: {e}")
return None
def clear_cache(self):
"""清除缓存"""
self.cached_keywords.clear()
self.running_memory.clear()
logger.debug("记忆激活器缓存已清除")
# 创建全局实例
memory_activator = MemoryActivator()
# 兼容性别名
enhanced_memory_activator = memory_activator
init_prompt()

View File

@@ -1,721 +0,0 @@
"""
海马体双峰分布采样器
基于旧版海马体的采样策略,适配新版记忆系统
实现低消耗、高效率的记忆采样模式
"""
import asyncio
import random
import time
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
import numpy as np
from src.chat.utils.chat_message_builder import (
build_readable_messages,
get_raw_msg_by_timestamp,
get_raw_msg_by_timestamp_with_chat,
)
from src.chat.utils.utils import translate_timestamp_to_human_readable
from src.common.logger import get_logger
from src.config.config import global_config
from src.llm_models.utils_model import LLMRequest
logger = get_logger(__name__)
# 全局背景任务集合
_background_tasks = set()
@dataclass
class HippocampusSampleConfig:
"""海马体采样配置"""
# 双峰分布参数
recent_mean_hours: float = 12.0 # 近期分布均值(小时)
recent_std_hours: float = 8.0 # 近期分布标准差(小时)
recent_weight: float = 0.7 # 近期分布权重
distant_mean_hours: float = 48.0 # 远期分布均值(小时)
distant_std_hours: float = 24.0 # 远期分布标准差(小时)
distant_weight: float = 0.3 # 远期分布权重
# 采样参数
total_samples: int = 50 # 总采样数
sample_interval: int = 1800 # 采样间隔(秒)
max_sample_length: int = 30 # 每次采样的最大消息数量
batch_size: int = 5 # 批处理大小
@classmethod
def from_global_config(cls) -> "HippocampusSampleConfig":
"""从全局配置创建海马体采样配置"""
config = global_config.memory.hippocampus_distribution_config
return cls(
recent_mean_hours=config[0],
recent_std_hours=config[1],
recent_weight=config[2],
distant_mean_hours=config[3],
distant_std_hours=config[4],
distant_weight=config[5],
total_samples=global_config.memory.hippocampus_sample_size,
sample_interval=global_config.memory.hippocampus_sample_interval,
max_sample_length=global_config.memory.hippocampus_batch_size,
batch_size=global_config.memory.hippocampus_batch_size,
)
class HippocampusSampler:
"""海马体双峰分布采样器"""
def __init__(self, memory_system=None):
self.memory_system = memory_system
self.config = HippocampusSampleConfig.from_global_config()
self.last_sample_time = 0
self.is_running = False
# 记忆构建模型
self.memory_builder_model: LLMRequest | None = None
# 统计信息
self.sample_count = 0
self.success_count = 0
self.last_sample_results: list[dict[str, Any]] = []
async def initialize(self):
"""初始化采样器"""
try:
# 初始化LLM模型
from src.config.config import model_config
task_config = getattr(model_config.model_task_config, "utils", None)
if task_config:
self.memory_builder_model = LLMRequest(model_set=task_config, request_type="memory.hippocampus_build")
task = asyncio.create_task(self.start_background_sampling())
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
logger.info("✅ 海马体采样器初始化成功")
else:
raise RuntimeError("未找到记忆构建模型配置")
except Exception as e:
logger.error(f"❌ 海马体采样器初始化失败: {e}")
raise
def generate_time_samples(self) -> list[datetime]:
"""生成双峰分布的时间采样点"""
# 计算每个分布的样本数
recent_samples = max(1, int(self.config.total_samples * self.config.recent_weight))
distant_samples = max(1, self.config.total_samples - recent_samples)
# 生成两个正态分布的小时偏移
recent_offsets = np.random.normal(
loc=self.config.recent_mean_hours, scale=self.config.recent_std_hours, size=recent_samples
)
distant_offsets = np.random.normal(
loc=self.config.distant_mean_hours, scale=self.config.distant_std_hours, size=distant_samples
)
# 合并两个分布的偏移
all_offsets = np.concatenate([recent_offsets, distant_offsets])
# 转换为时间戳(使用绝对值确保时间点在过去)
base_time = datetime.now()
timestamps = [base_time - timedelta(hours=abs(offset)) for offset in all_offsets]
# 按时间排序(从最早到最近)
return sorted(timestamps)
async def collect_message_samples(self, target_timestamp: float) -> list[dict[str, Any]] | None:
"""收集指定时间戳附近的消息样本"""
try:
# 随机时间窗口5-30分钟
time_window_seconds = random.randint(300, 1800)
# 尝试3次获取消息
for attempt in range(3):
timestamp_start = target_timestamp
timestamp_end = target_timestamp + time_window_seconds
# 获取单条消息作为锚点
anchor_messages = await get_raw_msg_by_timestamp(
timestamp_start=timestamp_start,
timestamp_end=timestamp_end,
limit=1,
limit_mode="earliest",
)
if not anchor_messages:
target_timestamp -= 120 # 向前调整2分钟
continue
anchor_message = anchor_messages[0]
chat_id = anchor_message.get("chat_id")
if not chat_id:
continue
# 获取同聊天的多条消息
messages = await get_raw_msg_by_timestamp_with_chat(
timestamp_start=timestamp_start,
timestamp_end=timestamp_end,
limit=self.config.max_sample_length,
limit_mode="earliest",
chat_id=chat_id,
)
if messages and len(messages) >= 2: # 至少需要2条消息
# 过滤掉已经记忆过的消息
filtered_messages = [
msg
for msg in messages
if msg.get("memorized_times", 0) < 2 # 最多记忆2次
]
if filtered_messages:
logger.debug(f"成功收集 {len(filtered_messages)} 条消息样本")
return filtered_messages
target_timestamp -= 120 # 向前调整再试
logger.debug(f"时间戳 {target_timestamp} 附近未找到有效消息样本")
return None
except Exception as e:
logger.error(f"收集消息样本失败: {e}")
return None
async def build_memory_from_samples(self, messages: list[dict[str, Any]], target_timestamp: float) -> str | None:
"""从消息样本构建记忆"""
if not messages or not self.memory_system or not self.memory_builder_model:
return None
try:
# 构建可读消息文本
readable_text = await build_readable_messages(
messages,
merge_messages=True,
timestamp_mode="normal_no_YMD",
replace_bot_name=False,
)
if not readable_text:
logger.warning("无法从消息样本生成可读文本")
return None
# 直接使用对话文本,不添加系统标识符
input_text = readable_text
logger.debug(f"开始构建记忆,文本长度: {len(input_text)}")
# 构建上下文
context = {
"user_id": "hippocampus_sampler",
"timestamp": time.time(),
"source": "hippocampus_sampling",
"message_count": len(messages),
"sample_mode": "bimodal_distribution",
"is_hippocampus_sample": True, # 标识为海马体样本
"bypass_value_threshold": True, # 绕过价值阈值检查
"hippocampus_sample_time": target_timestamp, # 记录样本时间
}
# 使用记忆系统构建记忆(绕过构建间隔检查)
memories = await self.memory_system.build_memory_from_conversation(
conversation_text=input_text,
context=context,
timestamp=time.time(),
bypass_interval=True, # 海马体采样器绕过构建间隔限制
)
if memories:
memory_count = len(memories)
self.success_count += 1
# 记录采样结果
result = {
"timestamp": time.time(),
"memory_count": memory_count,
"message_count": len(messages),
"text_preview": readable_text[:100] + "..." if len(readable_text) > 100 else readable_text,
"memory_types": [m.memory_type.value for m in memories],
}
self.last_sample_results.append(result)
# 限制结果历史长度
if len(self.last_sample_results) > 10:
self.last_sample_results.pop(0)
logger.info(f"✅ 海马体采样成功构建 {memory_count} 条记忆")
return f"构建{memory_count}条记忆"
else:
logger.debug("海马体采样未生成有效记忆")
return None
except Exception as e:
logger.error(f"海马体采样构建记忆失败: {e}")
return None
async def perform_sampling_cycle(self) -> dict[str, Any]:
"""执行一次完整的采样周期(优化版:批量融合构建)"""
if not self.should_sample():
return {"status": "skipped", "reason": "interval_not_met"}
start_time = time.time()
self.sample_count += 1
try:
# 生成时间采样点
time_samples = self.generate_time_samples()
logger.debug(f"生成 {len(time_samples)} 个时间采样点")
# 记录时间采样点(调试用)
readable_timestamps = [
translate_timestamp_to_human_readable(int(ts.timestamp()), mode="normal")
for ts in time_samples[:5] # 只显示前5个
]
logger.debug(f"时间采样点示例: {readable_timestamps}")
# 第一步:批量收集所有消息样本
logger.debug("开始批量收集消息样本...")
collected_messages = await self._collect_all_message_samples(time_samples)
if not collected_messages:
logger.info("未收集到有效消息样本,跳过本次采样")
self.last_sample_time = time.time()
return {
"status": "success",
"sample_count": self.sample_count,
"success_count": self.success_count,
"processed_samples": len(time_samples),
"successful_builds": 0,
"duration": time.time() - start_time,
"samples_generated": len(time_samples),
"message": "未收集到有效消息样本",
}
logger.info(f"收集到 {len(collected_messages)} 组消息样本")
# 第二步:融合和去重消息
logger.debug("开始融合和去重消息...")
fused_messages = await self._fuse_and_deduplicate_messages(collected_messages)
if not fused_messages:
logger.info("消息融合后为空,跳过记忆构建")
self.last_sample_time = time.time()
return {
"status": "success",
"sample_count": self.sample_count,
"success_count": self.success_count,
"processed_samples": len(time_samples),
"successful_builds": 0,
"duration": time.time() - start_time,
"samples_generated": len(time_samples),
"message": "消息融合后为空",
}
logger.info(f"融合后得到 {len(fused_messages)} 组有效消息")
# 第三步:一次性构建记忆
logger.debug("开始批量构建记忆...")
build_result = await self._build_batch_memory(fused_messages, time_samples)
# 更新最后采样时间
self.last_sample_time = time.time()
duration = time.time() - start_time
result = {
"status": "success",
"sample_count": self.sample_count,
"success_count": self.success_count,
"processed_samples": len(time_samples),
"successful_builds": build_result.get("memory_count", 0),
"duration": duration,
"samples_generated": len(time_samples),
"messages_collected": len(collected_messages),
"messages_fused": len(fused_messages),
"optimization_mode": "batch_fusion",
}
logger.info(
f"✅ 海马体采样周期完成(批量融合模式) | "
f"采样点: {len(time_samples)} | "
f"收集消息: {len(collected_messages)} | "
f"融合消息: {len(fused_messages)} | "
f"构建记忆: {build_result.get('memory_count', 0)} | "
f"耗时: {duration:.2f}s"
)
return result
except Exception as e:
logger.error(f"❌ 海马体采样周期失败: {e}")
return {
"status": "error",
"error": str(e),
"sample_count": self.sample_count,
"duration": time.time() - start_time,
}
async def _collect_all_message_samples(self, time_samples: list[datetime]) -> list[list[dict[str, Any]]]:
"""批量收集所有时间点的消息样本"""
collected_messages = []
max_concurrent = min(5, len(time_samples)) # 提高并发数到5
for i in range(0, len(time_samples), max_concurrent):
batch = time_samples[i : i + max_concurrent]
tasks = []
# 创建并发收集任务
for timestamp in batch:
target_ts = timestamp.timestamp()
task = self.collect_message_samples(target_ts)
tasks.append(task)
# 执行并发收集
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理收集结果
for result in results:
if isinstance(result, list) and result:
collected_messages.append(result)
elif isinstance(result, Exception):
logger.debug(f"消息收集异常: {result}")
# 批次间短暂延迟
if i + max_concurrent < len(time_samples):
await asyncio.sleep(0.5)
return collected_messages
async def _fuse_and_deduplicate_messages(
self, collected_messages: list[list[dict[str, Any]]]
) -> list[list[dict[str, Any]]]:
"""融合和去重消息样本"""
if not collected_messages:
return []
try:
# 展平所有消息
all_messages = []
for message_group in collected_messages:
all_messages.extend(message_group)
logger.debug(f"展开后总消息数: {len(all_messages)}")
# 去重逻辑:基于消息内容和时间戳
unique_messages = []
seen_hashes = set()
for message in all_messages:
# 创建消息哈希用于去重
content = message.get("processed_plain_text", "") or message.get("display_message", "")
timestamp = message.get("time", 0)
chat_id = message.get("chat_id", "")
# 简单哈希内容前50字符 + 时间戳(精确到分钟) + 聊天ID
hash_key = f"{content[:50]}_{int(timestamp // 60)}_{chat_id}"
if hash_key not in seen_hashes and len(content.strip()) > 10:
seen_hashes.add(hash_key)
unique_messages.append(message)
logger.debug(f"去重后消息数: {len(unique_messages)}")
# 按时间排序
unique_messages.sort(key=lambda x: x.get("time", 0))
# 按聊天ID分组重新组织
chat_groups = {}
for message in unique_messages:
chat_id = message.get("chat_id", "unknown")
if chat_id not in chat_groups:
chat_groups[chat_id] = []
chat_groups[chat_id].append(message)
# 合并相邻时间范围内的消息
fused_groups = []
for chat_id, messages in chat_groups.items():
fused_groups.extend(self._merge_adjacent_messages(messages))
logger.debug(f"融合后消息组数: {len(fused_groups)}")
return fused_groups
except Exception as e:
logger.error(f"消息融合失败: {e}")
# 返回原始消息组作为备选
return collected_messages[:5] # 限制返回数量
def _merge_adjacent_messages(
self, messages: list[dict[str, Any]], time_gap: int = 1800
) -> list[list[dict[str, Any]]]:
"""合并时间间隔内的消息"""
if not messages:
return []
merged_groups = []
current_group = [messages[0]]
for i in range(1, len(messages)):
current_time = messages[i].get("time", 0)
prev_time = current_group[-1].get("time", 0)
# 如果时间间隔小于阈值,合并到当前组
if current_time - prev_time <= time_gap:
current_group.append(messages[i])
else:
# 否则开始新组
merged_groups.append(current_group)
current_group = [messages[i]]
# 添加最后一组
merged_groups.append(current_group)
# 过滤掉只有一条消息的组(除非内容较长)
result_groups = [
group for group in merged_groups
if len(group) > 1 or any(len(msg.get("processed_plain_text", "")) > 100 for msg in group)
]
return result_groups
async def _build_batch_memory(
self, fused_messages: list[list[dict[str, Any]]], time_samples: list[datetime]
) -> dict[str, Any]:
"""批量构建记忆"""
if not fused_messages:
return {"memory_count": 0, "memories": []}
try:
total_memories = []
total_memory_count = 0
# 构建融合后的文本
batch_input_text = await self._build_fused_conversation_text(fused_messages)
if not batch_input_text:
logger.warning("无法构建融合文本,尝试单独构建")
# 备选方案:分别构建
return await self._fallback_individual_build(fused_messages)
# 创建批量上下文
batch_context = {
"user_id": "hippocampus_batch_sampler",
"timestamp": time.time(),
"source": "hippocampus_batch_sampling",
"message_groups_count": len(fused_messages),
"total_messages": sum(len(group) for group in fused_messages),
"sample_count": len(time_samples),
"is_hippocampus_sample": True,
"bypass_value_threshold": True,
"optimization_mode": "batch_fusion",
}
logger.debug(f"批量构建记忆,文本长度: {len(batch_input_text)}")
# 一次性构建记忆
memories = await self.memory_system.build_memory_from_conversation(
conversation_text=batch_input_text, context=batch_context, timestamp=time.time(), bypass_interval=True
)
if memories:
memory_count = len(memories)
self.success_count += 1
total_memory_count += memory_count
total_memories.extend(memories)
logger.info(f"✅ 批量海马体采样成功构建 {memory_count} 条记忆")
else:
logger.debug("批量海马体采样未生成有效记忆")
# 记录采样结果
result = {
"timestamp": time.time(),
"memory_count": total_memory_count,
"message_groups_count": len(fused_messages),
"total_messages": sum(len(group) for group in fused_messages),
"text_preview": batch_input_text[:200] + "..." if len(batch_input_text) > 200 else batch_input_text,
"memory_types": [m.memory_type.value for m in total_memories],
}
self.last_sample_results.append(result)
# 限制结果历史长度
if len(self.last_sample_results) > 10:
self.last_sample_results.pop(0)
return {"memory_count": total_memory_count, "memories": total_memories, "result": result}
except Exception as e:
logger.error(f"批量构建记忆失败: {e}")
return {"memory_count": 0, "error": str(e)}
async def _build_fused_conversation_text(self, fused_messages: list[list[dict[str, Any]]]) -> str:
"""构建融合后的对话文本"""
try:
conversation_parts = []
for group_idx, message_group in enumerate(fused_messages):
if not message_group:
continue
# 为每个消息组添加分隔符
group_header = f"\n=== 对话片段 {group_idx + 1} ==="
conversation_parts.append(group_header)
# 构建可读消息
group_text = await build_readable_messages(
message_group,
merge_messages=True,
timestamp_mode="normal_no_YMD",
replace_bot_name=False,
)
if group_text and len(group_text.strip()) > 10:
conversation_parts.append(group_text.strip())
return "\n".join(conversation_parts)
except Exception as e:
logger.error(f"构建融合文本失败: {e}")
return ""
async def _fallback_individual_build(self, fused_messages: list[list[dict[str, Any]]]) -> dict[str, Any]:
"""备选方案:单独构建每个消息组"""
total_memories = []
total_count = 0
for group in fused_messages[:5]: # 限制最多5组
try:
memories = await self.build_memory_from_samples(group, time.time())
if memories:
total_memories.extend(memories)
total_count += len(memories)
except Exception as e:
logger.debug(f"单独构建失败: {e}")
return {"memory_count": total_count, "memories": total_memories, "fallback_mode": True}
async def process_sample_timestamp(self, target_timestamp: float) -> str | None:
"""处理单个时间戳采样(保留作为备选方法)"""
try:
# 收集消息样本
messages = await self.collect_message_samples(target_timestamp)
if not messages:
return None
# 构建记忆
result = await self.build_memory_from_samples(messages, target_timestamp)
return result
except Exception as e:
logger.debug(f"处理时间戳采样失败 {target_timestamp}: {e}")
return None
def should_sample(self) -> bool:
"""检查是否应该进行采样"""
current_time = time.time()
# 检查时间间隔
if current_time - self.last_sample_time < self.config.sample_interval:
return False
# 检查是否已初始化
if not self.memory_builder_model:
logger.warning("海马体采样器未初始化")
return False
return True
async def start_background_sampling(self):
"""启动后台采样"""
if self.is_running:
logger.warning("海马体后台采样已在运行")
return
self.is_running = True
logger.info("🚀 启动海马体后台采样任务")
try:
while self.is_running:
try:
# 执行采样周期
result = await self.perform_sampling_cycle()
# 如果是跳过状态,短暂睡眠
if result.get("status") == "skipped":
await asyncio.sleep(60) # 1分钟后重试
else:
# 正常等待下一个采样间隔
await asyncio.sleep(self.config.sample_interval)
except Exception as e:
logger.error(f"海马体后台采样异常: {e}")
await asyncio.sleep(300) # 异常时等待5分钟
except asyncio.CancelledError:
logger.info("海马体后台采样任务被取消")
finally:
self.is_running = False
def stop_background_sampling(self):
"""停止后台采样"""
self.is_running = False
logger.info("🛑 停止海马体后台采样任务")
def get_sampling_stats(self) -> dict[str, Any]:
"""获取采样统计信息"""
success_rate = (self.success_count / self.sample_count * 100) if self.sample_count > 0 else 0
# 计算最近的平均数据
recent_avg_messages = 0
recent_avg_memory_count = 0
if self.last_sample_results:
recent_results = self.last_sample_results[-5:] # 最近5次
recent_avg_messages = sum(r.get("total_messages", 0) for r in recent_results) / len(recent_results)
recent_avg_memory_count = sum(r.get("memory_count", 0) for r in recent_results) / len(recent_results)
return {
"is_running": self.is_running,
"sample_count": self.sample_count,
"success_count": self.success_count,
"success_rate": f"{success_rate:.1f}%",
"last_sample_time": self.last_sample_time,
"optimization_mode": "batch_fusion", # 显示优化模式
"performance_metrics": {
"avg_messages_per_sample": f"{recent_avg_messages:.1f}",
"avg_memories_per_sample": f"{recent_avg_memory_count:.1f}",
"fusion_efficiency": f"{(recent_avg_messages / max(recent_avg_memory_count, 1)):.1f}x"
if recent_avg_messages > 0
else "N/A",
},
"config": {
"sample_interval": self.config.sample_interval,
"total_samples": self.config.total_samples,
"recent_weight": f"{self.config.recent_weight:.1%}",
"distant_weight": f"{self.config.distant_weight:.1%}",
"max_concurrent": 5, # 批量模式并发数
"fusion_time_gap": "30分钟", # 消息融合时间间隔
},
"recent_results": self.last_sample_results[-5:], # 最近5次结果
}
# 全局海马体采样器实例
_hippocampus_sampler: HippocampusSampler | None = None
def get_hippocampus_sampler(memory_system=None) -> HippocampusSampler:
"""获取全局海马体采样器实例"""
global _hippocampus_sampler
if _hippocampus_sampler is None:
_hippocampus_sampler = HippocampusSampler(memory_system)
return _hippocampus_sampler
async def initialize_hippocampus_sampler(memory_system=None) -> HippocampusSampler:
"""初始化全局海马体采样器"""
sampler = get_hippocampus_sampler(memory_system)
await sampler.initialize()
return sampler

View File

@@ -1,238 +0,0 @@
"""
记忆激活器
记忆系统的激活器组件
"""
import difflib
from datetime import datetime
import orjson
from json_repair import repair_json
from src.chat.memory_system.memory_manager import MemoryResult
from src.chat.utils.prompt import Prompt, global_prompt_manager
from src.common.logger import get_logger
from src.config.config import global_config, model_config
from src.llm_models.utils_model import LLMRequest
logger = get_logger("memory_activator")
def get_keywords_from_json(json_str) -> list:
"""
从JSON字符串中提取关键词列表
Args:
json_str: JSON格式的字符串
Returns:
List[str]: 关键词列表
"""
try:
# 使用repair_json修复JSON格式
fixed_json = repair_json(json_str)
# 如果repair_json返回的是字符串需要解析为Python对象
result = orjson.loads(fixed_json) if isinstance(fixed_json, str) else fixed_json
return result.get("keywords", [])
except Exception as e:
logger.error(f"解析关键词JSON失败: {e}")
return []
def init_prompt():
# --- Memory Activator Prompt ---
memory_activator_prompt = """
你是一个记忆分析器,你需要根据以下信息来进行记忆检索
以下是一段聊天记录,请根据这些信息,总结出几个关键词作为记忆检索的触发词
聊天记录:
{obs_info_text}
用户想要回复的消息:
{target_message}
历史关键词(请避免重复提取这些关键词):
{cached_keywords}
请输出一个json格式包含以下字段
{{
"keywords": ["关键词1", "关键词2", "关键词3",......]
}}
不要输出其他多余内容只输出json格式就好
"""
Prompt(memory_activator_prompt, "memory_activator_prompt")
class MemoryActivator:
"""记忆激活器"""
def __init__(self):
self.key_words_model = LLMRequest(
model_set=model_config.model_task_config.utils_small,
request_type="memory.activator",
)
self.running_memory = []
self.cached_keywords = set() # 用于缓存历史关键词
self.last_memory_query_time = 0 # 上次查询记忆的时间
async def activate_memory_with_chat_history(self, target_message, chat_history_prompt) -> list[dict]:
"""
激活记忆
"""
# 如果记忆系统被禁用,直接返回空列表
if not global_config.memory.enable_memory:
return []
# 将缓存的关键词转换为字符串用于prompt
cached_keywords_str = ", ".join(self.cached_keywords) if self.cached_keywords else "暂无历史关键词"
prompt = await global_prompt_manager.format_prompt(
"memory_activator_prompt",
obs_info_text=chat_history_prompt,
target_message=target_message,
cached_keywords=cached_keywords_str,
)
# 生成关键词
response, (reasoning_content, model_name, _) = await self.key_words_model.generate_response_async(
prompt, temperature=0.5
)
keywords = list(get_keywords_from_json(response))
# 更新关键词缓存
if keywords:
# 限制缓存大小最多保留10个关键词
if len(self.cached_keywords) > 10:
# 转换为列表,移除最早的关键词
cached_list = list(self.cached_keywords)
self.cached_keywords = set(cached_list[-8:])
# 添加新的关键词到缓存
self.cached_keywords.update(keywords)
logger.debug(f"记忆关键词: {self.cached_keywords}")
# 使用记忆系统获取相关记忆
memory_results = await self._query_unified_memory(keywords, target_message)
# 处理和记忆结果
if memory_results:
for result in memory_results:
# 检查是否已存在相似内容的记忆
exists = any(
m["content"] == result.content
or difflib.SequenceMatcher(None, m["content"], result.content).ratio() >= 0.7
for m in self.running_memory
)
if not exists:
memory_entry = {
"topic": result.memory_type,
"content": result.content,
"timestamp": datetime.fromtimestamp(result.timestamp).isoformat(),
"duration": 1,
"confidence": result.confidence,
"importance": result.importance,
"source": result.source,
"relevance_score": result.relevance_score, # 添加相关度评分
}
self.running_memory.append(memory_entry)
logger.debug(f"添加新记忆: {result.memory_type} - {result.content}")
# 激活时所有已有记忆的duration+1达到3则移除
for m in self.running_memory[:]:
m["duration"] = m.get("duration", 1) + 1
self.running_memory = [m for m in self.running_memory if m["duration"] < 3]
# 限制同时加载的记忆条数最多保留最后5条
if len(self.running_memory) > 5:
self.running_memory = self.running_memory[-5:]
return self.running_memory
async def _query_unified_memory(self, keywords: list[str], query_text: str) -> list[MemoryResult]:
"""查询统一记忆系统"""
try:
# 使用记忆系统
from src.chat.memory_system.memory_system import get_memory_system
memory_system = get_memory_system()
if not memory_system or memory_system.status.value != "ready":
logger.warning("记忆系统未就绪")
return []
# 构建查询上下文
context = {"keywords": keywords, "query_intent": "conversation_response"}
# 查询记忆
memories = await memory_system.retrieve_relevant_memories(
query_text=query_text,
user_id="global", # 使用全局作用域
context=context,
limit=5,
)
# 转换为 MemoryResult 格式
memory_results = []
for memory in memories:
result = MemoryResult(
content=memory.display,
memory_type=memory.memory_type.value,
confidence=memory.metadata.confidence.value,
importance=memory.metadata.importance.value,
timestamp=memory.metadata.created_at,
source="unified_memory",
relevance_score=memory.metadata.relevance_score,
)
memory_results.append(result)
logger.debug(f"统一记忆查询返回 {len(memory_results)} 条结果")
return memory_results
except Exception as e:
logger.error(f"查询统一记忆失败: {e}")
return []
async def get_instant_memory(self, target_message: str, chat_id: str) -> str | None:
"""
获取即时记忆 - 兼容原有接口(使用统一存储)
"""
try:
# 使用统一存储系统获取相关记忆
from src.chat.memory_system.memory_system import get_memory_system
memory_system = get_memory_system()
if not memory_system or memory_system.status.value != "ready":
return None
context = {"query_intent": "instant_response", "chat_id": chat_id}
memories = await memory_system.retrieve_relevant_memories(
query_text=target_message, user_id="global", context=context, limit=1
)
if memories:
return memories[0].display
return None
except Exception as e:
logger.error(f"获取即时记忆失败: {e}")
return None
def clear_cache(self):
"""清除缓存"""
self.cached_keywords.clear()
self.running_memory.clear()
logger.debug("记忆激活器缓存已清除")
# 创建全局实例
memory_activator = MemoryActivator()
init_prompt()

File diff suppressed because it is too large Load Diff

View File

@@ -1,647 +0,0 @@
"""
结构化记忆单元设计
实现高质量、结构化的记忆单元,符合文档设计规范
"""
import hashlib
import time
import uuid
from collections.abc import Iterable
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
import numpy as np
import orjson
from src.common.logger import get_logger
logger = get_logger(__name__)
class MemoryType(Enum):
"""记忆类型分类"""
PERSONAL_FACT = "personal_fact" # 个人事实(姓名、职业、住址等)
EVENT = "event" # 事件(重要经历、约会等)
PREFERENCE = "preference" # 偏好(喜好、习惯等)
OPINION = "opinion" # 观点(对事物的看法)
RELATIONSHIP = "relationship" # 关系(与他人的关系)
EMOTION = "emotion" # 情感状态
KNOWLEDGE = "knowledge" # 知识信息
SKILL = "skill" # 技能能力
GOAL = "goal" # 目标计划
EXPERIENCE = "experience" # 经验教训
CONTEXTUAL = "contextual" # 上下文信息
class ConfidenceLevel(Enum):
"""置信度等级"""
LOW = 1 # 低置信度,可能不准确
MEDIUM = 2 # 中等置信度,有一定依据
HIGH = 3 # 高置信度,有明确来源
VERIFIED = 4 # 已验证,非常可靠
class ImportanceLevel(Enum):
"""重要性等级"""
LOW = 1 # 低重要性,普通信息
NORMAL = 2 # 一般重要性,日常信息
HIGH = 3 # 高重要性,重要信息
CRITICAL = 4 # 关键重要性,核心信息
@dataclass
class ContentStructure:
"""主谓宾结构,包含自然语言描述"""
subject: str | list[str]
predicate: str
object: str | dict
display: str = ""
def to_dict(self) -> dict[str, Any]:
"""转换为字典格式"""
return {"subject": self.subject, "predicate": self.predicate, "object": self.object, "display": self.display}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ContentStructure":
"""从字典创建实例"""
return cls(
subject=data.get("subject", ""),
predicate=data.get("predicate", ""),
object=data.get("object", ""),
display=data.get("display", ""),
)
def to_subject_list(self) -> list[str]:
"""将主语转换为列表形式"""
if isinstance(self.subject, list):
return [s for s in self.subject if isinstance(s, str) and s.strip()]
if isinstance(self.subject, str) and self.subject.strip():
return [self.subject.strip()]
return []
def __str__(self) -> str:
"""字符串表示"""
if self.display:
return self.display
subjects = "".join(self.to_subject_list()) or str(self.subject)
object_str = self.object if isinstance(self.object, str) else str(self.object)
return f"{subjects} {self.predicate} {object_str}".strip()
@dataclass
class MemoryMetadata:
"""记忆元数据 - 简化版本"""
# 基础信息
memory_id: str # 唯一标识符
user_id: str # 用户ID
chat_id: str | None = None # 聊天ID群聊或私聊
# 时间信息
created_at: float = 0.0 # 创建时间戳
last_accessed: float = 0.0 # 最后访问时间
last_modified: float = 0.0 # 最后修改时间
# 激活频率管理
last_activation_time: float = 0.0 # 最后激活时间
activation_frequency: int = 0 # 激活频率(单位时间内的激活次数)
total_activations: int = 0 # 总激活次数
# 统计信息
access_count: int = 0 # 访问次数
relevance_score: float = 0.0 # 相关度评分
# 信心和重要性(核心字段)
confidence: ConfidenceLevel = ConfidenceLevel.MEDIUM
importance: ImportanceLevel = ImportanceLevel.NORMAL
# 遗忘机制相关
forgetting_threshold: float = 0.0 # 遗忘阈值(动态计算)
last_forgetting_check: float = 0.0 # 上次遗忘检查时间
# 来源信息
source_context: str | None = None # 来源上下文片段
# 兼容旧字段: 一些代码或旧版本可能直接访问 metadata.source
source: str | None = None
def __post_init__(self):
"""后初始化处理"""
if not self.memory_id:
self.memory_id = str(uuid.uuid4())
current_time = time.time()
if self.created_at == 0:
self.created_at = current_time
if self.last_accessed == 0:
self.last_accessed = current_time
if self.last_modified == 0:
self.last_modified = current_time
if self.last_activation_time == 0:
self.last_activation_time = current_time
if self.last_forgetting_check == 0:
self.last_forgetting_check = current_time
# 兼容性:如果旧字段 source 被使用,保证 source 与 source_context 同步
if not getattr(self, "source", None) and getattr(self, "source_context", None):
try:
self.source = str(self.source_context)
except Exception:
self.source = None
# 如果有 source 字段但 source_context 为空,也同步回去
if not getattr(self, "source_context", None) and getattr(self, "source", None):
try:
self.source_context = str(self.source)
except Exception:
self.source_context = None
def update_access(self):
"""更新访问信息"""
current_time = time.time()
self.last_accessed = current_time
self.access_count += 1
self.total_activations += 1
# 更新激活频率
self._update_activation_frequency(current_time)
def _update_activation_frequency(self, current_time: float):
"""更新激活频率24小时内的激活次数"""
# 如果超过24小时重置激活频率
if current_time - self.last_activation_time > 86400: # 24小时 = 86400秒
self.activation_frequency = 1
else:
self.activation_frequency += 1
self.last_activation_time = current_time
def update_relevance(self, new_score: float):
"""更新相关度评分"""
self.relevance_score = max(0.0, min(1.0, new_score))
self.last_modified = time.time()
def calculate_forgetting_threshold(self) -> float:
"""计算遗忘阈值(天数)"""
# 基础天数
base_days = 30.0
# 重要性权重 (1-4 -> 0-3)
importance_weight = (self.importance.value - 1) * 15 # 0, 15, 30, 45
# 置信度权重 (1-4 -> 0-3)
confidence_weight = (self.confidence.value - 1) * 10 # 0, 10, 20, 30
# 激活频率权重每5次激活增加1天
frequency_weight = min(self.activation_frequency, 20) * 0.5 # 最多10天
# 计算最终阈值
threshold = base_days + importance_weight + confidence_weight + frequency_weight
# 设置最小和最大阈值
return max(7.0, min(threshold, 365.0)) # 7天到1年之间
def should_forget(self, current_time: float | None = None) -> bool:
"""判断是否应该遗忘"""
if current_time is None:
current_time = time.time()
# 计算遗忘阈值
self.forgetting_threshold = self.calculate_forgetting_threshold()
# 计算距离最后激活的时间
days_since_activation = (current_time - self.last_activation_time) / 86400
return days_since_activation > self.forgetting_threshold
def is_dormant(self, current_time: float | None = None, inactive_days: int = 90) -> bool:
"""判断是否处于休眠状态(长期未激活)"""
if current_time is None:
current_time = time.time()
days_since_last_access = (current_time - self.last_accessed) / 86400
return days_since_last_access > inactive_days
def to_dict(self) -> dict[str, Any]:
"""转换为字典格式"""
return {
"memory_id": self.memory_id,
"user_id": self.user_id,
"chat_id": self.chat_id,
"created_at": self.created_at,
"last_accessed": self.last_accessed,
"last_modified": self.last_modified,
"last_activation_time": self.last_activation_time,
"activation_frequency": self.activation_frequency,
"total_activations": self.total_activations,
"access_count": self.access_count,
"relevance_score": self.relevance_score,
"confidence": self.confidence.value,
"importance": self.importance.value,
"forgetting_threshold": self.forgetting_threshold,
"last_forgetting_check": self.last_forgetting_check,
"source_context": self.source_context,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MemoryMetadata":
"""从字典创建实例"""
return cls(
memory_id=data.get("memory_id", ""),
user_id=data.get("user_id", ""),
chat_id=data.get("chat_id"),
created_at=data.get("created_at", 0),
last_accessed=data.get("last_accessed", 0),
last_modified=data.get("last_modified", 0),
last_activation_time=data.get("last_activation_time", 0),
activation_frequency=data.get("activation_frequency", 0),
total_activations=data.get("total_activations", 0),
access_count=data.get("access_count", 0),
relevance_score=data.get("relevance_score", 0.0),
confidence=ConfidenceLevel(data.get("confidence", ConfidenceLevel.MEDIUM.value)),
importance=ImportanceLevel(data.get("importance", ImportanceLevel.NORMAL.value)),
forgetting_threshold=data.get("forgetting_threshold", 0.0),
last_forgetting_check=data.get("last_forgetting_check", 0),
source_context=data.get("source_context"),
)
@dataclass
class MemoryChunk:
"""结构化记忆单元 - 核心数据结构"""
# 元数据
metadata: MemoryMetadata
# 内容结构
content: ContentStructure # 主谓宾结构
memory_type: MemoryType # 记忆类型
# 扩展信息
keywords: list[str] = field(default_factory=list) # 关键词列表
tags: list[str] = field(default_factory=list) # 标签列表
categories: list[str] = field(default_factory=list) # 分类列表
# 语义信息
embedding: list[float] | None = None # 语义向量
semantic_hash: str | None = None # 语义哈希值
# 关联信息
related_memories: list[str] = field(default_factory=list) # 关联记忆ID列表
temporal_context: dict[str, Any] | None = None # 时间上下文
def __post_init__(self):
"""后初始化处理"""
if self.embedding and len(self.embedding) > 0:
self._generate_semantic_hash()
def _generate_semantic_hash(self):
"""生成语义哈希值"""
if not self.embedding:
return
try:
# 使用向量和内容生成稳定的哈希
content_str = f"{self.content.subject}:{self.content.predicate}:{self.content.object!s}"
embedding_str = ",".join(map(str, [round(x, 6) for x in self.embedding]))
hash_input = f"{content_str}|{embedding_str}"
hash_object = hashlib.sha256(hash_input.encode("utf-8"))
self.semantic_hash = hash_object.hexdigest()[:16]
except Exception as e:
logger.warning(f"生成语义哈希失败: {e}")
self.semantic_hash = str(uuid.uuid4())[:16]
@property
def memory_id(self) -> str:
"""获取记忆ID"""
return self.metadata.memory_id
@property
def user_id(self) -> str:
"""获取用户ID"""
return self.metadata.user_id
@property
def text_content(self) -> str:
"""获取文本内容优先使用display"""
return str(self.content)
@property
def display(self) -> str:
"""获取展示文本"""
return self.content.display or str(self.content)
@property
def subjects(self) -> list[str]:
"""获取主语列表"""
return self.content.to_subject_list()
def update_access(self):
"""更新访问信息"""
self.metadata.update_access()
def update_relevance(self, new_score: float):
"""更新相关度评分"""
self.metadata.update_relevance(new_score)
def should_forget(self, current_time: float | None = None) -> bool:
"""判断是否应该遗忘"""
return self.metadata.should_forget(current_time)
def is_dormant(self, current_time: float | None = None, inactive_days: int = 90) -> bool:
"""判断是否处于休眠状态(长期未激活)"""
return self.metadata.is_dormant(current_time, inactive_days)
def calculate_forgetting_threshold(self) -> float:
"""计算遗忘阈值(天数)"""
return self.metadata.calculate_forgetting_threshold()
def add_keyword(self, keyword: str):
"""添加关键词"""
if keyword and keyword not in self.keywords:
self.keywords.append(keyword.strip())
def add_tag(self, tag: str):
"""添加标签"""
if tag and tag not in self.tags:
self.tags.append(tag.strip())
def add_category(self, category: str):
"""添加分类"""
if category and category not in self.categories:
self.categories.append(category.strip())
def add_related_memory(self, memory_id: str):
"""添加关联记忆"""
if memory_id and memory_id not in self.related_memories:
self.related_memories.append(memory_id)
def set_embedding(self, embedding: list[float]):
"""设置语义向量"""
self.embedding = embedding
self._generate_semantic_hash()
def calculate_similarity(self, other: "MemoryChunk") -> float:
"""计算与另一个记忆块的相似度"""
if not self.embedding or not other.embedding:
return 0.0
try:
# 计算余弦相似度
v1 = np.array(self.embedding)
v2 = np.array(other.embedding)
dot_product = np.dot(v1, v2)
norm1 = np.linalg.norm(v1)
norm2 = np.linalg.norm(v2)
if norm1 == 0 or norm2 == 0:
return 0.0
similarity = dot_product / (norm1 * norm2)
return max(0.0, min(1.0, similarity))
except Exception as e:
logger.warning(f"计算记忆相似度失败: {e}")
return 0.0
def to_dict(self) -> dict[str, Any]:
"""转换为完整的字典格式"""
return {
"metadata": self.metadata.to_dict(),
"content": self.content.to_dict(),
"memory_type": self.memory_type.value,
"keywords": self.keywords,
"tags": self.tags,
"categories": self.categories,
"embedding": self.embedding,
"semantic_hash": self.semantic_hash,
"related_memories": self.related_memories,
"temporal_context": self.temporal_context,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MemoryChunk":
"""从字典创建实例"""
metadata = MemoryMetadata.from_dict(data.get("metadata", {}))
content = ContentStructure.from_dict(data.get("content", {}))
chunk = cls(
metadata=metadata,
content=content,
memory_type=MemoryType(data.get("memory_type", MemoryType.CONTEXTUAL.value)),
keywords=data.get("keywords", []),
tags=data.get("tags", []),
categories=data.get("categories", []),
embedding=data.get("embedding"),
semantic_hash=data.get("semantic_hash"),
related_memories=data.get("related_memories", []),
temporal_context=data.get("temporal_context"),
)
return chunk
def to_json(self) -> str:
"""转换为JSON字符串"""
return orjson.dumps(self.to_dict()).decode("utf-8")
@classmethod
def from_json(cls, json_str: str) -> "MemoryChunk":
"""从JSON字符串创建实例"""
try:
data = orjson.loads(json_str)
return cls.from_dict(data)
except Exception as e:
logger.error(f"从JSON创建记忆块失败: {e}")
raise
def is_similar_to(self, other: "MemoryChunk", threshold: float = 0.8) -> bool:
"""判断是否与另一个记忆块相似"""
if self.semantic_hash and other.semantic_hash:
return self.semantic_hash == other.semantic_hash
return self.calculate_similarity(other) >= threshold
def merge_with(self, other: "MemoryChunk") -> bool:
"""与另一个记忆块合并(如果相似)"""
if not self.is_similar_to(other):
return False
try:
# 合并关键词
for keyword in other.keywords:
self.add_keyword(keyword)
# 合并标签
for tag in other.tags:
self.add_tag(tag)
# 合并分类
for category in other.categories:
self.add_category(category)
# 合并关联记忆
for memory_id in other.related_memories:
self.add_related_memory(memory_id)
# 更新元数据
self.metadata.last_modified = time.time()
self.metadata.access_count += other.metadata.access_count
self.metadata.relevance_score = max(self.metadata.relevance_score, other.metadata.relevance_score)
# 更新置信度
if other.metadata.confidence.value > self.metadata.confidence.value:
self.metadata.confidence = other.metadata.confidence
# 更新重要性
if other.metadata.importance.value > self.metadata.importance.value:
self.metadata.importance = other.metadata.importance
logger.debug(f"记忆块 {self.memory_id} 合并了记忆块 {other.memory_id}")
return True
except Exception as e:
logger.error(f"合并记忆块失败: {e}")
return False
def __str__(self) -> str:
"""字符串表示"""
type_emoji = {
MemoryType.PERSONAL_FACT: "👤",
MemoryType.EVENT: "📅",
MemoryType.PREFERENCE: "❤️",
MemoryType.OPINION: "💭",
MemoryType.RELATIONSHIP: "👥",
MemoryType.EMOTION: "😊",
MemoryType.KNOWLEDGE: "📚",
MemoryType.SKILL: "🛠️",
MemoryType.GOAL: "🎯",
MemoryType.EXPERIENCE: "💡",
MemoryType.CONTEXTUAL: "📝",
}
emoji = type_emoji.get(self.memory_type, "📝")
confidence_icon = "" * self.metadata.confidence.value
importance_icon = "" * self.metadata.importance.value
return f"{emoji} [{self.memory_type.value}] {self.display} {confidence_icon} {importance_icon}"
def __repr__(self) -> str:
"""调试表示"""
return f"MemoryChunk(id={self.memory_id[:8]}..., type={self.memory_type.value}, user={self.user_id})"
def _build_display_text(subjects: Iterable[str], predicate: str, obj: str | dict) -> str:
"""根据主谓宾生成自然语言描述"""
subjects_clean = [s.strip() for s in subjects if s and isinstance(s, str)]
subject_part = "".join(subjects_clean) if subjects_clean else "对话参与者"
if isinstance(obj, dict):
object_candidates = []
for key, value in obj.items():
if isinstance(value, str | int | float):
object_candidates.append(f"{key}:{value}")
elif isinstance(value, list):
compact = "".join(str(item) for item in value[:3])
object_candidates.append(f"{key}:{compact}")
object_part = "".join(object_candidates) if object_candidates else str(obj)
else:
object_part = str(obj).strip()
predicate_clean = predicate.strip()
if not predicate_clean:
return f"{subject_part} {object_part}".strip()
if object_part:
return f"{subject_part}{predicate_clean}{object_part}".strip()
return f"{subject_part}{predicate_clean}".strip()
def create_memory_chunk(
user_id: str,
subject: str | list[str],
predicate: str,
obj: str | dict,
memory_type: MemoryType,
chat_id: str | None = None,
source_context: str | None = None,
importance: ImportanceLevel = ImportanceLevel.NORMAL,
confidence: ConfidenceLevel = ConfidenceLevel.MEDIUM,
display: str | None = None,
**kwargs,
) -> MemoryChunk:
"""便捷的内存块创建函数"""
metadata = MemoryMetadata(
memory_id="",
user_id=user_id,
chat_id=chat_id,
created_at=time.time(),
last_accessed=0,
last_modified=0,
confidence=confidence,
importance=importance,
source_context=source_context,
)
subjects: list[str]
if isinstance(subject, list):
subjects = [s for s in subject if isinstance(s, str) and s.strip()]
subject_payload: str | list[str] = subjects
else:
cleaned = subject.strip() if isinstance(subject, str) else ""
subjects = [cleaned] if cleaned else []
subject_payload = cleaned
display_text = display or _build_display_text(subjects, predicate, obj)
content = ContentStructure(subject=subject_payload, predicate=predicate, object=obj, display=display_text)
chunk = MemoryChunk(metadata=metadata, content=content, memory_type=memory_type, **kwargs)
return chunk
@dataclass
class MessageCollection:
"""消息集合数据结构"""
collection_id: str = field(default_factory=lambda: str(uuid.uuid4()))
chat_id: str | None = None # 聊天ID群聊或私聊
messages: list[str] = field(default_factory=list)
combined_text: str = ""
created_at: float = field(default_factory=time.time)
embedding: list[float] | None = None
def to_dict(self) -> dict[str, Any]:
"""转换为字典格式"""
return {
"collection_id": self.collection_id,
"chat_id": self.chat_id,
"messages": self.messages,
"combined_text": self.combined_text,
"created_at": self.created_at,
"embedding": self.embedding,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MessageCollection":
"""从字典创建实例"""
return cls(
collection_id=data.get("collection_id", str(uuid.uuid4())),
chat_id=data.get("chat_id"),
messages=data.get("messages", []),
combined_text=data.get("combined_text", ""),
created_at=data.get("created_at", time.time()),
embedding=data.get("embedding"),
)

View File

@@ -1,355 +0,0 @@
"""
智能记忆遗忘引擎
基于重要程度、置信度和激活频率的智能遗忘机制
"""
import asyncio
import time
from dataclasses import dataclass
from datetime import datetime
from src.chat.memory_system.memory_chunk import ConfidenceLevel, ImportanceLevel, MemoryChunk
from src.common.logger import get_logger
logger = get_logger(__name__)
@dataclass
class ForgettingStats:
"""遗忘统计信息"""
total_checked: int = 0
marked_for_forgetting: int = 0
actually_forgotten: int = 0
dormant_memories: int = 0
last_check_time: float = 0.0
check_duration: float = 0.0
@dataclass
class ForgettingConfig:
"""遗忘引擎配置"""
# 检查频率配置
check_interval_hours: int = 24 # 定期检查间隔(小时)
batch_size: int = 100 # 批处理大小
# 遗忘阈值配置
base_forgetting_days: float = 30.0 # 基础遗忘天数
min_forgetting_days: float = 7.0 # 最小遗忘天数
max_forgetting_days: float = 365.0 # 最大遗忘天数
# 重要程度权重
critical_importance_bonus: float = 45.0 # 关键重要性额外天数
high_importance_bonus: float = 30.0 # 高重要性额外天数
normal_importance_bonus: float = 15.0 # 一般重要性额外天数
low_importance_bonus: float = 0.0 # 低重要性额外天数
# 置信度权重
verified_confidence_bonus: float = 30.0 # 已验证置信度额外天数
high_confidence_bonus: float = 20.0 # 高置信度额外天数
medium_confidence_bonus: float = 10.0 # 中等置信度额外天数
low_confidence_bonus: float = 0.0 # 低置信度额外天数
# 激活频率权重
activation_frequency_weight: float = 0.5 # 每次激活增加的天数权重
max_frequency_bonus: float = 10.0 # 最大激活频率奖励天数
# 休眠配置
dormant_threshold_days: int = 90 # 休眠状态判定天数
force_forget_dormant_days: int = 180 # 强制遗忘休眠记忆的天数
class MemoryForgettingEngine:
"""智能记忆遗忘引擎"""
def __init__(self, config: ForgettingConfig | None = None):
self.config = config or ForgettingConfig()
self.stats = ForgettingStats()
self._last_forgetting_check = 0.0
self._forgetting_lock = asyncio.Lock()
logger.info("MemoryForgettingEngine 初始化完成")
def calculate_forgetting_threshold(self, memory: MemoryChunk) -> float:
"""
计算记忆的遗忘阈值(天数)
Args:
memory: 记忆块
Returns:
遗忘阈值(天数)
"""
# 基础天数
threshold = self.config.base_forgetting_days
# 重要性权重
importance = memory.metadata.importance
if importance == ImportanceLevel.CRITICAL:
threshold += self.config.critical_importance_bonus
elif importance == ImportanceLevel.HIGH:
threshold += self.config.high_importance_bonus
elif importance == ImportanceLevel.NORMAL:
threshold += self.config.normal_importance_bonus
# LOW 级别不增加额外天数
# 置信度权重
confidence = memory.metadata.confidence
if confidence == ConfidenceLevel.VERIFIED:
threshold += self.config.verified_confidence_bonus
elif confidence == ConfidenceLevel.HIGH:
threshold += self.config.high_confidence_bonus
elif confidence == ConfidenceLevel.MEDIUM:
threshold += self.config.medium_confidence_bonus
# LOW 级别不增加额外天数
# 激活频率权重
frequency_bonus = min(
memory.metadata.activation_frequency * self.config.activation_frequency_weight,
self.config.max_frequency_bonus,
)
threshold += frequency_bonus
# 确保在合理范围内
return max(self.config.min_forgetting_days, min(threshold, self.config.max_forgetting_days))
def should_forget_memory(self, memory: MemoryChunk, current_time: float | None = None) -> bool:
"""
判断记忆是否应该被遗忘
Args:
memory: 记忆块
current_time: 当前时间戳
Returns:
是否应该遗忘
"""
if current_time is None:
current_time = time.time()
# 关键重要性的记忆永不遗忘
if memory.metadata.importance == ImportanceLevel.CRITICAL:
return False
# 计算遗忘阈值
forgetting_threshold = self.calculate_forgetting_threshold(memory)
# 计算距离最后激活的时间
days_since_activation = (current_time - memory.metadata.last_activation_time) / 86400
# 判断是否超过阈值
should_forget = days_since_activation > forgetting_threshold
if should_forget:
logger.debug(
f"记忆 {memory.memory_id[:8]} 触发遗忘条件: "
f"重要性={memory.metadata.importance.name}, "
f"置信度={memory.metadata.confidence.name}, "
f"激活频率={memory.metadata.activation_frequency}, "
f"阈值={forgetting_threshold:.1f}天, "
f"未激活天数={days_since_activation:.1f}"
)
return should_forget
def is_dormant_memory(self, memory: MemoryChunk, current_time: float | None = None) -> bool:
"""
判断记忆是否处于休眠状态
Args:
memory: 记忆块
current_time: 当前时间戳
Returns:
是否处于休眠状态
"""
return memory.is_dormant(current_time, self.config.dormant_threshold_days)
def should_force_forget_dormant(self, memory: MemoryChunk, current_time: float | None = None) -> bool:
"""
判断是否应该强制遗忘休眠记忆
Args:
memory: 记忆块
current_time: 当前时间戳
Returns:
是否应该强制遗忘
"""
if current_time is None:
current_time = time.time()
# 只有非关键重要性的记忆才会被强制遗忘
if memory.metadata.importance == ImportanceLevel.CRITICAL:
return False
days_since_last_access = (current_time - memory.metadata.last_accessed) / 86400
return days_since_last_access > self.config.force_forget_dormant_days
async def check_memories_for_forgetting(self, memories: list[MemoryChunk]) -> tuple[list[str], list[str]]:
"""
检查记忆列表,识别需要遗忘的记忆
Args:
memories: 记忆块列表
Returns:
(普通遗忘列表, 强制遗忘列表)
"""
start_time = time.time()
current_time = start_time
normal_forgetting_ids = []
force_forgetting_ids = []
self.stats.total_checked = len(memories)
self.stats.last_check_time = current_time
for memory in memories:
try:
# 检查休眠状态
if self.is_dormant_memory(memory, current_time):
self.stats.dormant_memories += 1
# 检查是否应该强制遗忘休眠记忆
if self.should_force_forget_dormant(memory, current_time):
force_forgetting_ids.append(memory.memory_id)
logger.debug(f"休眠记忆 {memory.memory_id[:8]} 被标记为强制遗忘")
continue
# 检查普通遗忘条件
if self.should_forget_memory(memory, current_time):
normal_forgetting_ids.append(memory.memory_id)
self.stats.marked_for_forgetting += 1
except Exception as e:
logger.warning(f"检查记忆 {memory.memory_id[:8]} 遗忘状态失败: {e}")
continue
self.stats.check_duration = time.time() - start_time
logger.info(
f"遗忘检查完成 | 总数={self.stats.total_checked}, "
f"标记遗忘={len(normal_forgetting_ids)}, "
f"强制遗忘={len(force_forgetting_ids)}, "
f"休眠={self.stats.dormant_memories}, "
f"耗时={self.stats.check_duration:.3f}s"
)
return normal_forgetting_ids, force_forgetting_ids
async def perform_forgetting_check(self, memories: list[MemoryChunk]) -> dict[str, any]:
"""
执行完整的遗忘检查流程
Args:
memories: 记忆块列表
Returns:
检查结果统计
"""
async with self._forgetting_lock:
normal_forgetting, force_forgetting = await self.check_memories_for_forgetting(memories)
# 更新统计
self.stats.actually_forgotten = len(normal_forgetting) + len(force_forgetting)
return {
"normal_forgetting": normal_forgetting,
"force_forgetting": force_forgetting,
"stats": {
"total_checked": self.stats.total_checked,
"marked_for_forgetting": self.stats.marked_for_forgetting,
"actually_forgotten": self.stats.actually_forgotten,
"dormant_memories": self.stats.dormant_memories,
"check_duration": self.stats.check_duration,
"last_check_time": self.stats.last_check_time,
},
}
def is_forgetting_check_needed(self) -> bool:
"""检查是否需要进行遗忘检查"""
current_time = time.time()
hours_since_last_check = (current_time - self._last_forgetting_check) / 3600
return hours_since_last_check >= self.config.check_interval_hours
async def schedule_periodic_check(self, memories_provider, enable_auto_cleanup: bool = True):
"""
定期执行遗忘检查(可以在后台任务中调用)
Args:
memories_provider: 提供记忆列表的函数
enable_auto_cleanup: 是否启用自动清理
"""
if not self.is_forgetting_check_needed():
return
try:
logger.info("开始执行定期遗忘检查...")
# 获取记忆列表
memories = await memories_provider()
if not memories:
logger.debug("无记忆数据需要检查")
return
# 执行遗忘检查
result = await self.perform_forgetting_check(memories)
# 如果启用自动清理,执行实际的遗忘操作
if enable_auto_cleanup and (result["normal_forgetting"] or result["force_forgetting"]):
logger.info(
f"检测到 {len(result['normal_forgetting'])} 条普通遗忘和 {len(result['force_forgetting'])} 条强制遗忘记忆"
)
# 这里可以调用实际的删除逻辑
# await self.cleanup_forgotten_memories(result["normal_forgetting"] + result["force_forgetting"])
self._last_forgetting_check = time.time()
except Exception as e:
logger.error(f"定期遗忘检查失败: {e}", exc_info=True)
def get_forgetting_stats(self) -> dict[str, any]:
"""获取遗忘统计信息"""
return {
"total_checked": self.stats.total_checked,
"marked_for_forgetting": self.stats.marked_for_forgetting,
"actually_forgotten": self.stats.actually_forgotten,
"dormant_memories": self.stats.dormant_memories,
"last_check_time": datetime.fromtimestamp(self.stats.last_check_time).isoformat()
if self.stats.last_check_time
else None,
"last_check_duration": self.stats.check_duration,
"config": {
"check_interval_hours": self.config.check_interval_hours,
"base_forgetting_days": self.config.base_forgetting_days,
"min_forgetting_days": self.config.min_forgetting_days,
"max_forgetting_days": self.config.max_forgetting_days,
},
}
def reset_stats(self):
"""重置统计信息"""
self.stats = ForgettingStats()
logger.debug("遗忘统计信息已重置")
def update_config(self, **kwargs):
"""更新配置"""
for key, value in kwargs.items():
if hasattr(self.config, key):
setattr(self.config, key, value)
logger.debug(f"遗忘配置更新: {key} = {value}")
else:
logger.warning(f"未知的配置项: {key}")
# 创建全局遗忘引擎实例
memory_forgetting_engine = MemoryForgettingEngine()
def get_memory_forgetting_engine() -> MemoryForgettingEngine:
"""获取全局遗忘引擎实例"""
return memory_forgetting_engine

View File

@@ -1,120 +0,0 @@
"""记忆格式化工具
提供统一的记忆块格式化函数,供构建 Prompt 时使用。
当前使用的函数: format_memories_bracket_style
输入: list[dict] 其中每个元素包含:
- display: str 记忆可读内容
- memory_type: str 记忆类型 (personal_fact/opinion/preference/event 等)
- metadata: dict 可选,包括
- confidence: 置信度 (str|float)
- importance: 重要度 (str|float)
- timestamp: 时间戳 (float|str)
- source: 来源 (str)
- relevance_score: 相关度 (float)
返回: 适合直接嵌入提示词的大段文本;若无有效记忆返回空串。
"""
from __future__ import annotations
import time
from collections.abc import Iterable
from typing import Any
def _format_timestamp(ts: Any) -> str:
try:
if ts in (None, ""):
return ""
if isinstance(ts, int | float) and ts > 0:
return time.strftime("%Y-%m-%d %H:%M", time.localtime(float(ts)))
return str(ts)
except Exception:
return ""
def _coerce_str(v: Any) -> str:
if v is None:
return ""
return str(v)
def format_memories_bracket_style(
memories: Iterable[dict[str, Any]] | None,
query_context: str | None = None,
max_items: int = 15,
) -> str:
"""以方括号 + 标注字段的方式格式化记忆列表。
例子输出:
## 相关记忆回顾
- [类型:personal_fact|重要:高|置信:0.83|相关:0.72] 他喜欢黑咖啡 (来源: chat, 2025-10-05 09:30)
Args:
memories: 记忆字典迭代器
query_context: 当前查询/用户的消息,用于在首行提示(可选)
max_items: 最多输出的记忆条数
Returns:
str: 格式化文本;若无内容返回空串
"""
if not memories:
return ""
lines: list[str] = ["## 相关记忆回顾"]
if query_context:
lines.append(f"(与当前消息相关:{query_context[:60]}{'...' if len(query_context) > 60 else ''}")
lines.append("")
count = 0
for mem in memories:
if count >= max_items:
break
if not isinstance(mem, dict):
continue
display = _coerce_str(mem.get("display", "")).strip()
if not display:
continue
mtype = _coerce_str(mem.get("memory_type", "fact")) or "fact"
meta = mem.get("metadata", {}) if isinstance(mem.get("metadata"), dict) else {}
confidence = _coerce_str(meta.get("confidence", ""))
importance = _coerce_str(meta.get("importance", ""))
source = _coerce_str(meta.get("source", ""))
rel = meta.get("relevance_score")
try:
rel_str = f"{float(rel):.2f}" if rel is not None else ""
except Exception:
rel_str = ""
ts = _format_timestamp(meta.get("timestamp"))
# 构建标签段
tags: list[str] = [f"类型:{mtype}"]
if importance:
tags.append(f"重要:{importance}")
if confidence:
tags.append(f"置信:{confidence}")
if rel_str:
tags.append(f"相关:{rel_str}")
tag_block = "|".join(tags)
suffix_parts = []
if source:
suffix_parts.append(source)
if ts:
suffix_parts.append(ts)
suffix = (" (" + ", ".join(suffix_parts) + ")") if suffix_parts else ""
lines.append(f"- [{tag_block}] {display}{suffix}")
count += 1
if count == 0:
return ""
if count >= max_items:
lines.append(f"\n(已截断,仅显示前 {max_items} 条相关记忆)")
return "\n".join(lines)
__all__ = ["format_memories_bracket_style"]

View File

@@ -1,505 +0,0 @@
"""
记忆融合与去重机制
避免记忆碎片化,确保长期记忆库的高质量
"""
import time
from dataclasses import dataclass
from typing import Any
from src.chat.memory_system.memory_chunk import ConfidenceLevel, ImportanceLevel, MemoryChunk
from src.common.logger import get_logger
logger = get_logger(__name__)
@dataclass
class FusionResult:
"""融合结果"""
original_count: int
fused_count: int
removed_duplicates: int
merged_memories: list[MemoryChunk]
fusion_time: float
details: list[str]
@dataclass
class DuplicateGroup:
"""重复记忆组"""
group_id: str
memories: list[MemoryChunk]
similarity_matrix: list[list[float]]
representative_memory: MemoryChunk | None = None
class MemoryFusionEngine:
"""记忆融合引擎"""
def __init__(self, similarity_threshold: float = 0.85):
self.similarity_threshold = similarity_threshold
self.fusion_stats = {
"total_fusions": 0,
"memories_fused": 0,
"duplicates_removed": 0,
"average_similarity": 0.0,
}
# 融合策略配置
self.fusion_strategies = {
"semantic_similarity": True, # 语义相似性融合
"temporal_proximity": True, # 时间接近性融合
"logical_consistency": True, # 逻辑一致性融合
"confidence_boosting": True, # 置信度提升
"importance_preservation": True, # 重要性保持
}
async def fuse_memories(
self, new_memories: list[MemoryChunk], existing_memories: list[MemoryChunk] | None = None
) -> list[MemoryChunk]:
"""融合记忆列表"""
start_time = time.time()
try:
if not new_memories:
return []
logger.info(f"开始记忆融合,新记忆: {len(new_memories)},现有记忆: {len(existing_memories or [])}")
# 1. 检测重复记忆组
duplicate_groups = await self._detect_duplicate_groups(new_memories, existing_memories or [])
if not duplicate_groups:
fusion_time = time.time() - start_time
self._update_fusion_stats(len(new_memories), 0, fusion_time)
logger.info("✅ 记忆融合完成: %d 条记忆,移除 0 条重复", len(new_memories))
return new_memories
# 2. 对每个重复组进行融合
fused_memories = []
removed_count = 0
for group in duplicate_groups:
if len(group.memories) == 1:
# 单个记忆,直接添加
fused_memories.append(group.memories[0])
else:
# 多个记忆,进行融合
fused_memory = await self._fuse_memory_group(group)
if fused_memory:
fused_memories.append(fused_memory)
removed_count += len(group.memories) - 1
# 3. 更新统计
fusion_time = time.time() - start_time
self._update_fusion_stats(len(new_memories), removed_count, fusion_time)
logger.info(f"✅ 记忆融合完成: {len(fused_memories)} 条记忆,移除 {removed_count} 条重复")
return fused_memories
except Exception as e:
logger.error(f"❌ 记忆融合失败: {e}", exc_info=True)
return new_memories # 失败时返回原始记忆
async def _detect_duplicate_groups(
self, new_memories: list[MemoryChunk], existing_memories: list[MemoryChunk]
) -> list[DuplicateGroup]:
"""检测重复记忆组"""
all_memories = new_memories + existing_memories
new_memory_ids = {memory.memory_id for memory in new_memories}
groups = []
processed_ids = set()
for i, memory1 in enumerate(all_memories):
if memory1.memory_id in processed_ids:
continue
# 创建新的重复组
group = DuplicateGroup(group_id=f"group_{len(groups)}", memories=[memory1], similarity_matrix=[[1.0]])
processed_ids.add(memory1.memory_id)
# 寻找相似记忆
for j, memory2 in enumerate(all_memories[i + 1 :], i + 1):
if memory2.memory_id in processed_ids:
continue
similarity = self._calculate_comprehensive_similarity(memory1, memory2)
if similarity >= self.similarity_threshold:
group.memories.append(memory2)
processed_ids.add(memory2.memory_id)
# 更新相似度矩阵
self._update_similarity_matrix(group, memory2, similarity)
if len(group.memories) > 1:
# 选择代表性记忆
group.representative_memory = self._select_representative_memory(group)
groups.append(group)
else:
# 仅包含单条记忆,只有当其来自新记忆列表时保留
if memory1.memory_id in new_memory_ids:
groups.append(group)
logger.debug(f"检测到 {len(groups)} 个重复记忆组")
return groups
def _calculate_comprehensive_similarity(self, mem1: MemoryChunk, mem2: MemoryChunk) -> float:
"""计算综合相似度"""
similarity_scores = []
# 1. 语义向量相似度
if self.fusion_strategies["semantic_similarity"]:
semantic_sim = mem1.calculate_similarity(mem2)
similarity_scores.append(("semantic", semantic_sim))
# 2. 文本相似度
text_sim = self._calculate_text_similarity(mem1.text_content, mem2.text_content)
similarity_scores.append(("text", text_sim))
# 3. 关键词重叠度
keyword_sim = self._calculate_keyword_similarity(mem1.keywords, mem2.keywords)
similarity_scores.append(("keyword", keyword_sim))
# 4. 类型一致性
type_consistency = 1.0 if mem1.memory_type == mem2.memory_type else 0.0
similarity_scores.append(("type", type_consistency))
# 5. 时间接近性
if self.fusion_strategies["temporal_proximity"]:
temporal_sim = self._calculate_temporal_similarity(mem1.metadata.created_at, mem2.metadata.created_at)
similarity_scores.append(("temporal", temporal_sim))
# 6. 逻辑一致性
if self.fusion_strategies["logical_consistency"]:
logical_sim = self._calculate_logical_similarity(mem1, mem2)
similarity_scores.append(("logical", logical_sim))
# 计算加权平均相似度
weights = {"semantic": 0.35, "text": 0.25, "keyword": 0.15, "type": 0.10, "temporal": 0.10, "logical": 0.05}
weighted_sum = 0.0
total_weight = 0.0
for score_type, score in similarity_scores:
weight = weights.get(score_type, 0.1)
weighted_sum += weight * score
total_weight += weight
final_similarity = weighted_sum / total_weight if total_weight > 0 else 0.0
logger.debug(f"综合相似度计算: {final_similarity:.3f} - {[(t, f'{s:.3f}') for t, s in similarity_scores]}")
return final_similarity
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
"""计算文本相似度"""
# 简单的词汇重叠度计算
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
intersection = words1 & words2
union = words1 | words2
jaccard_similarity = len(intersection) / len(union)
return jaccard_similarity
def _calculate_keyword_similarity(self, keywords1: list[str], keywords2: list[str]) -> float:
"""计算关键词相似度"""
if not keywords1 or not keywords2:
return 0.0
set1 = set(k.lower() for k in keywords1) # noqa: C401
set2 = set(k.lower() for k in keywords2) # noqa: C401
intersection = set1 & set2
union = set1 | set2
return len(intersection) / len(union) if union else 0.0
def _calculate_temporal_similarity(self, time1: float, time2: float) -> float:
"""计算时间相似度"""
time_diff = abs(time1 - time2)
hours_diff = time_diff / 3600
# 24小时内相似度较高
if hours_diff <= 24:
return 1.0 - (hours_diff / 24)
elif hours_diff <= 168: # 一周内
return 0.7 - ((hours_diff - 24) / 168) * 0.5
else:
return 0.2
def _calculate_logical_similarity(self, mem1: MemoryChunk, mem2: MemoryChunk) -> float:
"""计算逻辑一致性"""
# 检查主谓宾结构的逻辑一致性
consistency_score = 0.0
# 主语一致性
subjects1 = set(mem1.subjects)
subjects2 = set(mem2.subjects)
if subjects1 or subjects2:
overlap = len(subjects1 & subjects2)
union_count = max(len(subjects1 | subjects2), 1)
consistency_score += (overlap / union_count) * 0.4
# 谓语相似性
predicate_sim = self._calculate_text_similarity(mem1.content.predicate, mem2.content.predicate)
consistency_score += predicate_sim * 0.3
# 宾语相似性
if isinstance(mem1.content.object, str) and isinstance(mem2.content.object, str):
object_sim = self._calculate_text_similarity(str(mem1.content.object), str(mem2.content.object))
consistency_score += object_sim * 0.3
return consistency_score
def _update_similarity_matrix(self, group: DuplicateGroup, new_memory: MemoryChunk, similarity: float):
"""更新组的相似度矩阵"""
# 为新记忆添加行和列
for i in range(len(group.similarity_matrix)):
group.similarity_matrix[i].append(similarity)
# 添加新行
new_row = [similarity] + [1.0] * len(group.similarity_matrix)
group.similarity_matrix.append(new_row)
def _select_representative_memory(self, group: DuplicateGroup) -> MemoryChunk:
"""选择代表性记忆"""
if not group.memories:
return None
# 评分标准
best_memory = None
best_score = -1.0
for memory in group.memories:
score = 0.0
# 置信度权重
score += memory.metadata.confidence.value * 0.3
# 重要性权重
score += memory.metadata.importance.value * 0.3
# 访问次数权重
score += min(memory.metadata.access_count * 0.1, 0.2)
# 相关度权重
score += memory.metadata.relevance_score * 0.2
if score > best_score:
best_score = score
best_memory = memory
return best_memory
async def _fuse_memory_group(self, group: DuplicateGroup) -> MemoryChunk | None:
"""融合记忆组"""
if not group.memories:
return None
if len(group.memories) == 1:
return group.memories[0]
try:
# 选择基础记忆(通常是代表性记忆)
base_memory = group.representative_memory or group.memories[0]
# 融合其他记忆的属性
fused_memory = await self._merge_memory_attributes(base_memory, group.memories)
# 更新元数据
self._update_fused_metadata(fused_memory, group)
logger.debug(f"成功融合记忆组,包含 {len(group.memories)} 条原始记忆")
return fused_memory
except Exception as e:
logger.error(f"融合记忆组失败: {e}")
# 返回置信度最高的记忆
return max(group.memories, key=lambda m: m.metadata.confidence.value)
async def _merge_memory_attributes(self, base_memory: MemoryChunk, memories: list[MemoryChunk]) -> MemoryChunk:
"""合并记忆属性"""
# 创建基础记忆的深拷贝
fused_memory = MemoryChunk.from_dict(base_memory.to_dict())
# 合并关键词
all_keywords = set()
for memory in memories:
all_keywords.update(memory.keywords)
fused_memory.keywords = sorted(all_keywords)
# 合并标签
all_tags = set()
for memory in memories:
all_tags.update(memory.tags)
fused_memory.tags = sorted(all_tags)
# 合并分类
all_categories = set()
for memory in memories:
all_categories.update(memory.categories)
fused_memory.categories = sorted(all_categories)
# 合并关联记忆
all_related = set()
for memory in memories:
all_related.update(memory.related_memories)
# 移除对自身和组内记忆的引用
all_related = {rid for rid in all_related if rid not in [m.memory_id for m in memories]}
fused_memory.related_memories = sorted(all_related)
# 合并时间上下文
if self.fusion_strategies["temporal_proximity"]:
fused_memory.temporal_context = self._merge_temporal_context(memories)
return fused_memory
def _update_fused_metadata(self, fused_memory: MemoryChunk, group: DuplicateGroup):
"""更新融合记忆的元数据"""
# 更新修改时间
fused_memory.metadata.last_modified = time.time()
# 计算平均访问次数
total_access = sum(m.metadata.access_count for m in group.memories)
fused_memory.metadata.access_count = total_access
# 提升置信度(如果有多个来源支持)
if self.fusion_strategies["confidence_boosting"] and len(group.memories) > 1:
max_confidence = max(m.metadata.confidence.value for m in group.memories)
if max_confidence < ConfidenceLevel.VERIFIED.value:
fused_memory.metadata.confidence = ConfidenceLevel(
min(max_confidence + 1, ConfidenceLevel.VERIFIED.value)
)
# 保持最高重要性
if self.fusion_strategies["importance_preservation"]:
max_importance = max(m.metadata.importance.value for m in group.memories)
fused_memory.metadata.importance = ImportanceLevel(max_importance)
# 计算平均相关度
avg_relevance = sum(m.metadata.relevance_score for m in group.memories) / len(group.memories)
fused_memory.metadata.relevance_score = min(avg_relevance * 1.1, 1.0) # 稍微提升相关度
# 设置来源信息
source_ids = [m.memory_id[:8] for m in group.memories]
fused_memory.metadata.source_context = f"Fused from {len(group.memories)} memories: {', '.join(source_ids)}"
def _merge_temporal_context(self, memories: list[MemoryChunk]) -> dict[str, Any]:
"""合并时间上下文"""
contexts = [m.temporal_context for m in memories if m.temporal_context]
if not contexts:
return {}
# 计算时间范围
timestamps = [m.metadata.created_at for m in memories]
earliest_time = min(timestamps)
latest_time = max(timestamps)
merged_context = {
"earliest_timestamp": earliest_time,
"latest_timestamp": latest_time,
"time_span_hours": (latest_time - earliest_time) / 3600,
"source_memories": len(memories),
}
# 合并其他上下文信息
for context in contexts:
for key, value in context.items():
if key not in ["timestamp", "earliest_timestamp", "latest_timestamp"]:
if key not in merged_context:
merged_context[key] = value
elif merged_context[key] != value:
merged_context[key] = f"multiple: {value}"
return merged_context
async def incremental_fusion(
self, new_memory: MemoryChunk, existing_memories: list[MemoryChunk]
) -> tuple[MemoryChunk, list[MemoryChunk]]:
"""增量融合(单个新记忆与现有记忆融合)"""
# 寻找相似记忆
similar_memories = []
for existing in existing_memories:
similarity = self._calculate_comprehensive_similarity(new_memory, existing)
if similarity >= self.similarity_threshold:
similar_memories.append((existing, similarity))
if not similar_memories:
# 没有相似记忆,直接返回
return new_memory, existing_memories
# 按相似度排序
similar_memories.sort(key=lambda x: x[1], reverse=True)
# 与最相似的记忆融合
best_match, similarity = similar_memories[0]
# 创建融合组
group = DuplicateGroup(
group_id=f"incremental_{int(time.time())}",
memories=[new_memory, best_match],
similarity_matrix=[[1.0, similarity], [similarity, 1.0]],
)
# 执行融合
fused_memory = await self._fuse_memory_group(group)
# 从现有记忆中移除被融合的记忆
updated_existing = [m for m in existing_memories if m.memory_id != best_match.memory_id]
updated_existing.append(fused_memory)
logger.debug(f"增量融合完成,相似度: {similarity:.3f}")
return fused_memory, updated_existing
def _update_fusion_stats(self, original_count: int, removed_count: int, fusion_time: float):
"""更新融合统计"""
self.fusion_stats["total_fusions"] += 1
self.fusion_stats["memories_fused"] += original_count
self.fusion_stats["duplicates_removed"] += removed_count
# 更新平均相似度(估算)
if removed_count > 0:
avg_similarity = 0.9 # 假设平均相似度较高
total_similarity = self.fusion_stats["average_similarity"] * (self.fusion_stats["total_fusions"] - 1)
total_similarity += avg_similarity
self.fusion_stats["average_similarity"] = total_similarity / self.fusion_stats["total_fusions"]
async def maintenance(self):
"""维护操作"""
try:
logger.info("开始记忆融合引擎维护...")
# 可以在这里添加定期维护任务,如:
# - 重新评估低置信度记忆
# - 清理孤立记忆引用
# - 优化融合策略参数
logger.info("✅ 记忆融合引擎维护完成")
except Exception as e:
logger.error(f"❌ 记忆融合引擎维护失败: {e}", exc_info=True)
def get_fusion_stats(self) -> dict[str, Any]:
"""获取融合统计信息"""
return self.fusion_stats.copy()
def reset_stats(self):
"""重置统计信息"""
self.fusion_stats = {
"total_fusions": 0,
"memories_fused": 0,
"duplicates_removed": 0,
"average_similarity": 0.0,
}

View File

@@ -1,512 +0,0 @@
"""
记忆系统管理器
替代原有的 Hippocampus 和 instant_memory 系统
"""
import re
from dataclasses import dataclass
from typing import Any
from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType
from src.chat.memory_system.memory_system import MemorySystem
from src.chat.memory_system.message_collection_processor import MessageCollectionProcessor
from src.chat.memory_system.message_collection_storage import MessageCollectionStorage
from src.common.logger import get_logger
logger = get_logger(__name__)
@dataclass
class MemoryResult:
"""记忆查询结果"""
content: str
memory_type: str
confidence: float
importance: float
timestamp: float
source: str = "memory"
relevance_score: float = 0.0
structure: dict[str, Any] | None = None
class MemoryManager:
"""记忆系统管理器 - 替代原有的 HippocampusManager"""
def __init__(self):
self.memory_system: MemorySystem | None = None
self.message_collection_storage: MessageCollectionStorage | None = None
self.message_collection_processor: MessageCollectionProcessor | None = None
self.is_initialized = False
self.user_cache = {} # 用户记忆缓存
def _clean_text(self, text: Any) -> str:
if text is None:
return ""
cleaned = re.sub(r"[\s\u3000]+", " ", str(text)).strip()
cleaned = re.sub(r"[、,,;]+$", "", cleaned)
return cleaned
async def initialize(self):
"""初始化记忆系统"""
if self.is_initialized:
return
try:
from src.config.config import global_config
# 检查是否启用记忆系统
if not global_config.memory.enable_memory:
logger.info("记忆系统已禁用,跳过初始化")
self.is_initialized = True
return
logger.info("正在初始化记忆系统...")
# 初始化记忆系统
from src.chat.memory_system.memory_system import get_memory_system
self.memory_system = get_memory_system()
# 初始化消息集合系统
self.message_collection_storage = MessageCollectionStorage()
self.message_collection_processor = MessageCollectionProcessor(self.message_collection_storage)
self.is_initialized = True
logger.info(" 记忆系统初始化完成")
except Exception as e:
logger.error(f"记忆系统初始化失败: {e}")
# 如果系统初始化失败,创建一个空的管理器避免系统崩溃
self.memory_system = None
self.message_collection_storage = None
self.message_collection_processor = None
self.is_initialized = True # 标记为已初始化但系统不可用
def get_hippocampus(self):
"""兼容原有接口 - 返回空"""
logger.debug("get_hippocampus 调用 - 记忆系统不使用此方法")
return {}
async def build_memory(self):
"""兼容原有接口 - 构建记忆"""
if not self.is_initialized or not self.memory_system:
return
try:
# 记忆系统使用实时构建,不需要定时构建
logger.debug("build_memory 调用 - 记忆系统使用实时构建")
except Exception as e:
logger.error(f"build_memory 失败: {e}")
async def forget_memory(self, percentage: float = 0.005):
"""兼容原有接口 - 遗忘机制"""
if not self.is_initialized or not self.memory_system:
return
try:
# 增强记忆系统有内置的遗忘机制
logger.debug(f"forget_memory 调用 - 参数: {percentage}")
# 可以在这里调用增强系统的维护功能
await self.memory_system.maintenance()
except Exception as e:
logger.error(f"forget_memory 失败: {e}")
async def get_memory_from_text(
self,
text: str,
chat_id: str,
user_id: str,
max_memory_num: int = 3,
max_memory_length: int = 2,
time_weight: float = 1.0,
keyword_weight: float = 1.0,
) -> list[tuple[str, str]]:
"""从文本获取相关记忆 - 兼容原有接口"""
if not self.is_initialized or not self.memory_system:
return []
try:
# 使用增强记忆系统检索
context = {
"chat_id": chat_id,
"expected_memory_types": [MemoryType.PERSONAL_FACT, MemoryType.EVENT, MemoryType.PREFERENCE],
}
relevant_memories = await self.memory_system.retrieve_relevant_memories(
query=text, user_id=user_id, context=context, limit=max_memory_num
)
# 转换为原有格式 (topic, content)
results = []
for memory in relevant_memories:
topic = memory.memory_type.value
content = memory.text_content
results.append((topic, content))
logger.debug(f"从文本检索到 {len(results)} 条相关记忆")
# 如果检索到有效记忆,打印详细信息
if results:
logger.info(f"📚 从文本 '{text[:50]}...' 检索到 {len(results)} 条有效记忆:")
for i, (topic, content) in enumerate(results, 1):
# 处理长内容如果超过150字符则截断
display_content = content
if len(content) > 150:
display_content = content[:150] + "..."
logger.info(f" 记忆#{i} [{topic}]: {display_content}")
return results
except Exception as e:
logger.error(f"get_memory_from_text 失败: {e}")
return []
async def get_memory_from_topic(
self, valid_keywords: list[str], max_memory_num: int = 3, max_memory_length: int = 2, max_depth: int = 3
) -> list[tuple[str, str]]:
"""从关键词获取记忆 - 兼容原有接口"""
if not self.is_initialized or not self.memory_system:
return []
try:
# 将关键词转换为查询文本
query_text = " ".join(valid_keywords)
# 使用增强记忆系统检索
context = {
"keywords": valid_keywords,
"expected_memory_types": [
MemoryType.PERSONAL_FACT,
MemoryType.EVENT,
MemoryType.PREFERENCE,
MemoryType.OPINION,
],
}
relevant_memories = await self.memory_system.retrieve_relevant_memories(
query_text=query_text,
user_id="default_user", # 可以根据实际需要传递
context=context,
limit=max_memory_num,
)
# 转换为原有格式 (topic, content)
results = []
for memory in relevant_memories:
topic = memory.memory_type.value
content = memory.text_content
results.append((topic, content))
logger.debug(f"从关键词 {valid_keywords} 检索到 {len(results)} 条相关记忆")
# 如果检索到有效记忆,打印详细信息
if results:
keywords_str = ", ".join(valid_keywords[:5]) # 最多显示5个关键词
if len(valid_keywords) > 5:
keywords_str += f" ... (共{len(valid_keywords)}个关键词)"
logger.info(f"🔍 从关键词 [{keywords_str}] 检索到 {len(results)} 条有效记忆:")
for i, (topic, content) in enumerate(results, 1):
# 处理长内容如果超过150字符则截断
display_content = content
if len(content) > 150:
display_content = content[:150] + "..."
logger.info(f" 记忆#{i} [{topic}]: {display_content}")
return results
except Exception as e:
logger.error(f"get_memory_from_topic 失败: {e}")
return []
def get_memory_from_keyword(self, keyword: str, max_depth: int = 2) -> list:
"""从单个关键词获取记忆 - 兼容原有接口"""
if not self.is_initialized or not self.memory_system:
return []
try:
# 同步方法,返回空列表
logger.debug(f"get_memory_from_keyword 调用 - 关键词: {keyword}")
return []
except Exception as e:
logger.error(f"get_memory_from_keyword 失败: {e}")
return []
async def process_conversation(
self, conversation_text: str, context: dict[str, Any], user_id: str, timestamp: float | None = None
) -> list[MemoryChunk]:
"""处理对话并构建记忆 - 新增功能"""
if not self.is_initialized or not self.memory_system:
return []
try:
# 将消息添加到消息集合处理器
chat_id = context.get("chat_id")
if self.message_collection_processor and chat_id:
await self.message_collection_processor.add_message(conversation_text, chat_id)
payload_context = dict(context or {})
payload_context.setdefault("conversation_text", conversation_text)
if timestamp is not None:
payload_context.setdefault("timestamp", timestamp)
result = await self.memory_system.process_conversation_memory(payload_context)
# 从结果中提取记忆块
memory_chunks = []
if result.get("success"):
memory_chunks = result.get("created_memories", [])
logger.info(f"从对话构建了 {len(memory_chunks)} 条记忆")
return memory_chunks
except Exception as e:
logger.error(f"process_conversation 失败: {e}")
return []
async def get_enhanced_memory_context(
self, query_text: str, user_id: str, context: dict[str, Any] | None = None, limit: int = 5
) -> list[MemoryResult]:
"""获取增强记忆上下文 - 新增功能"""
if not self.is_initialized or not self.memory_system:
return []
try:
relevant_memories = await self.memory_system.retrieve_relevant_memories(
query=query_text, user_id=None, context=context or {}, limit=limit
)
results = []
for memory in relevant_memories:
formatted_content, structure = self._format_memory_chunk(memory)
result = MemoryResult(
content=formatted_content,
memory_type=memory.memory_type.value,
confidence=memory.metadata.confidence.value,
importance=memory.metadata.importance.value,
timestamp=memory.metadata.created_at,
source="enhanced_memory",
relevance_score=memory.metadata.relevance_score,
structure=structure,
)
results.append(result)
return results
except Exception as e:
logger.error(f"get_enhanced_memory_context 失败: {e}")
return []
def _format_memory_chunk(self, memory: MemoryChunk) -> tuple[str, dict[str, Any]]:
"""将记忆块转换为更易读的文本描述"""
structure = memory.content.to_dict()
if memory.display:
return self._clean_text(memory.display), structure
subject = structure.get("subject")
predicate = structure.get("predicate") or ""
obj = structure.get("object")
subject_display = self._format_subject(subject, memory)
formatted = self._apply_predicate_format(subject_display, predicate, obj)
if not formatted:
predicate_display = self._format_predicate(predicate)
object_display = self._format_object(obj)
formatted = f"{subject_display}{predicate_display}{object_display}".strip()
formatted = self._clean_text(formatted)
return formatted, structure
def _format_subject(self, subject: str | None, memory: MemoryChunk) -> str:
if not subject:
return "该用户"
if subject == memory.metadata.user_id:
return "该用户"
if memory.metadata.chat_id and subject == memory.metadata.chat_id:
return "该聊天"
return self._clean_text(subject)
def _apply_predicate_format(self, subject: str, predicate: str, obj: Any) -> str | None:
predicate = (predicate or "").strip()
obj_value = obj
if predicate == "is_named":
name = self._extract_from_object(obj_value, ["name", "nickname"]) or self._format_object(obj_value)
name = self._clean_text(name)
if not name:
return None
name_display = name if (name.startswith("") and name.endswith("")) else f"{name}"
return f"{subject}的昵称是{name_display}"
if predicate == "is_age":
age = self._extract_from_object(obj_value, ["age"]) or self._format_object(obj_value)
age = self._clean_text(age)
if not age:
return None
return f"{subject}今年{age}"
if predicate == "is_profession":
profession = self._extract_from_object(obj_value, ["profession", "job"]) or self._format_object(obj_value)
profession = self._clean_text(profession)
if not profession:
return None
return f"{subject}的职业是{profession}"
if predicate == "lives_in":
location = self._extract_from_object(obj_value, ["location", "city", "place"]) or self._format_object(
obj_value
)
location = self._clean_text(location)
if not location:
return None
return f"{subject}居住在{location}"
if predicate == "has_phone":
phone = self._extract_from_object(obj_value, ["phone", "number"]) or self._format_object(obj_value)
phone = self._clean_text(phone)
if not phone:
return None
return f"{subject}的电话号码是{phone}"
if predicate == "has_email":
email = self._extract_from_object(obj_value, ["email"]) or self._format_object(obj_value)
email = self._clean_text(email)
if not email:
return None
return f"{subject}的邮箱是{email}"
if predicate == "likes":
liked = self._format_object(obj_value)
if not liked:
return None
return f"{subject}喜欢{liked}"
if predicate == "likes_food":
food = self._format_object(obj_value)
if not food:
return None
return f"{subject}爱吃{food}"
if predicate == "dislikes":
disliked = self._format_object(obj_value)
if not disliked:
return None
return f"{subject}不喜欢{disliked}"
if predicate == "hates":
hated = self._format_object(obj_value)
if not hated:
return None
return f"{subject}讨厌{hated}"
if predicate == "favorite_is":
favorite = self._format_object(obj_value)
if not favorite:
return None
return f"{subject}最喜欢{favorite}"
if predicate == "mentioned_event":
event_text = self._extract_from_object(obj_value, ["event_text", "description"]) or self._format_object(
obj_value
)
event_text = self._clean_text(self._truncate(event_text))
if not event_text:
return None
return f"{subject}提到了计划或事件:{event_text}"
if predicate in {"正在", "", "正在进行"}:
action = self._format_object(obj_value)
if not action:
return None
return f"{subject}{predicate}{action}"
if predicate in {"感到", "觉得", "表示", "提到", "说道", ""}:
feeling = self._format_object(obj_value)
if not feeling:
return None
return f"{subject}{predicate}{feeling}"
if predicate in {"", "", ""}:
counterpart = self._format_object(obj_value)
if counterpart:
return f"{subject}{predicate}{counterpart}"
return f"{subject}{predicate}"
return None
def _format_predicate(self, predicate: str) -> str:
if not predicate:
return ""
predicate_map = {
"is_named": "的昵称是",
"is_profession": "的职业是",
"lives_in": "居住在",
"has_phone": "的电话是",
"has_email": "的邮箱是",
"likes": "喜欢",
"dislikes": "不喜欢",
"likes_food": "爱吃",
"hates": "讨厌",
"favorite_is": "最喜欢",
"mentioned_event": "提到的事件",
}
if predicate in predicate_map:
connector = predicate_map[predicate]
if connector.startswith(""):
return connector
return f" {connector} "
cleaned = predicate.replace("_", " ").strip()
if re.search(r"[\u4e00-\u9fff]", cleaned):
return cleaned
return f" {cleaned} "
def _format_object(self, obj: Any) -> str:
if obj is None:
return ""
if isinstance(obj, dict):
parts = []
for key, value in obj.items():
formatted_value = self._format_object(value)
if not formatted_value:
continue
pretty_key = {
"name": "名字",
"profession": "职业",
"location": "位置",
"event_text": "内容",
"timestamp": "时间",
}.get(key, key)
parts.append(f"{pretty_key}: {formatted_value}")
return self._clean_text("".join(parts))
if isinstance(obj, list):
formatted_items = [self._format_object(item) for item in obj]
filtered = [item for item in formatted_items if item]
return self._clean_text("".join(filtered)) if filtered else ""
if isinstance(obj, int | float):
return str(obj)
text = self._truncate(str(obj).strip())
return self._clean_text(text)
def _extract_from_object(self, obj: Any, keys: list[str]) -> str | None:
if isinstance(obj, dict):
for key in keys:
if obj.get(key):
value = obj[key]
if isinstance(value, dict | list):
return self._clean_text(self._format_object(value))
return self._clean_text(value)
if isinstance(obj, list) and obj:
return self._clean_text(self._format_object(obj[0]))
if isinstance(obj, str | int | float):
return self._clean_text(obj)
return None
def _truncate(self, text: str, max_length: int = 80) -> str:
if len(text) <= max_length:
return text
return text[: max_length - 1] + ""
async def shutdown(self):
"""关闭增强记忆系统"""
if not self.is_initialized:
return
try:
if self.memory_system:
await self.memory_system.shutdown()
logger.info(" 记忆系统已关闭")
except Exception as e:
logger.error(f"关闭记忆系统失败: {e}")
# 全局记忆管理器实例
memory_manager = MemoryManager()

View File

@@ -1,122 +0,0 @@
"""
记忆元数据索引。
"""
from dataclasses import asdict, dataclass
from typing import Any
from src.common.logger import get_logger
logger = get_logger(__name__)
from inkfox.memory import PyMetadataIndex as _RustIndex # type: ignore
@dataclass
class MemoryMetadataIndexEntry:
memory_id: str
user_id: str
memory_type: str
subjects: list[str]
objects: list[str]
keywords: list[str]
tags: list[str]
importance: int
confidence: int
created_at: float
access_count: int
chat_id: str | None = None
content_preview: str | None = None
class MemoryMetadataIndex:
"""Rust 加速版本唯一实现。"""
def __init__(self, index_file: str = "data/memory_metadata_index.json"):
self._rust = _RustIndex(index_file)
# 仅为向量层和调试提供最小缓存长度判断、get_entry 返回)
self.index: dict[str, MemoryMetadataIndexEntry] = {}
logger.info("✅ MemoryMetadataIndex (Rust) 初始化完成,仅支持加速实现")
# 向后代码仍调用的接口batch_add_or_update / add_or_update
def batch_add_or_update(self, entries: list[MemoryMetadataIndexEntry]):
if not entries:
return
payload = []
for e in entries:
if not e.memory_id:
continue
self.index[e.memory_id] = e
payload.append(asdict(e))
if payload:
try:
self._rust.batch_add(payload)
except Exception as ex:
logger.error(f"Rust 元数据批量添加失败: {ex}")
def add_or_update(self, entry: MemoryMetadataIndexEntry):
self.batch_add_or_update([entry])
def search(
self,
memory_types: list[str] | None = None,
subjects: list[str] | None = None,
keywords: list[str] | None = None,
tags: list[str] | None = None,
importance_min: int | None = None,
importance_max: int | None = None,
created_after: float | None = None,
created_before: float | None = None,
user_id: str | None = None,
limit: int | None = None,
flexible_mode: bool = True,
) -> list[str]:
params: dict[str, Any] = {
"user_id": user_id,
"memory_types": memory_types,
"subjects": subjects,
"keywords": keywords,
"tags": tags,
"importance_min": importance_min,
"importance_max": importance_max,
"created_after": created_after,
"created_before": created_before,
"limit": limit,
}
params = {k: v for k, v in params.items() if v is not None}
try:
if flexible_mode:
return list(self._rust.search_flexible(params))
return list(self._rust.search_strict(params))
except Exception as ex:
logger.error(f"Rust 搜索失败返回空: {ex}")
return []
def get_entry(self, memory_id: str) -> MemoryMetadataIndexEntry | None:
return self.index.get(memory_id)
def get_stats(self) -> dict[str, Any]:
try:
raw = self._rust.stats()
return {
"total_memories": raw.get("total", 0),
"types": raw.get("types_dist", {}),
"subjects_count": raw.get("subjects_indexed", 0),
"keywords_count": raw.get("keywords_indexed", 0),
"tags_count": raw.get("tags_indexed", 0),
}
except Exception as ex:
logger.warning(f"读取 Rust stats 失败: {ex}")
return {"total_memories": 0}
def save(self): # 仅调用 rust save
try:
self._rust.save()
except Exception as ex:
logger.warning(f"Rust save 失败: {ex}")
__all__ = [
"MemoryMetadataIndex",
"MemoryMetadataIndexEntry",
]

View File

@@ -1,219 +0,0 @@
"""记忆检索查询规划器"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Any
import orjson
from src.chat.memory_system.memory_chunk import MemoryType
from src.common.logger import get_logger
from src.llm_models.utils_model import LLMRequest
from src.utils.json_parser import extract_and_parse_json
logger = get_logger(__name__)
@dataclass
class MemoryQueryPlan:
"""查询规划结果"""
semantic_query: str
memory_types: list[MemoryType] = field(default_factory=list)
subject_includes: list[str] = field(default_factory=list)
object_includes: list[str] = field(default_factory=list)
required_keywords: list[str] = field(default_factory=list)
optional_keywords: list[str] = field(default_factory=list)
owner_filters: list[str] = field(default_factory=list)
recency_preference: str = "any"
limit: int = 10
emphasis: str | None = None
raw_plan: dict[str, Any] = field(default_factory=dict)
def ensure_defaults(self, fallback_query: str, default_limit: int) -> None:
if not self.semantic_query:
self.semantic_query = fallback_query
if self.limit <= 0:
self.limit = default_limit
self.recency_preference = (self.recency_preference or "any").lower()
if self.recency_preference not in {"any", "recent", "historical"}:
self.recency_preference = "any"
self.emphasis = (self.emphasis or "balanced").lower()
class MemoryQueryPlanner:
"""基于小模型的记忆检索查询规划器"""
def __init__(self, planner_model: LLMRequest | None, default_limit: int = 10):
self.model = planner_model
self.default_limit = default_limit
async def plan_query(self, query_text: str, context: dict[str, Any]) -> MemoryQueryPlan:
if not self.model:
logger.debug("未提供查询规划模型,使用默认规划")
return self._default_plan(query_text)
prompt = self._build_prompt(query_text, context)
try:
response, _ = await self.model.generate_response_async(prompt, temperature=0.2)
# 使用统一的 JSON 解析工具
data = extract_and_parse_json(response, strict=False)
if not data or not isinstance(data, dict):
logger.debug("查询规划模型未返回有效的结构化结果,使用默认规划")
return self._default_plan(query_text)
plan = self._parse_plan_dict(data, query_text)
plan.ensure_defaults(query_text, self.default_limit)
return plan
except Exception as exc:
logger.error("查询规划模型调用失败: %s", exc, exc_info=True)
return self._default_plan(query_text)
def _default_plan(self, query_text: str) -> MemoryQueryPlan:
return MemoryQueryPlan(semantic_query=query_text, limit=self.default_limit)
def _parse_plan_dict(self, data: dict[str, Any], fallback_query: str) -> MemoryQueryPlan:
semantic_query = self._safe_str(data.get("semantic_query")) or fallback_query
def _collect_list(key: str) -> list[str]:
value = data.get(key)
if isinstance(value, str):
return [value]
if isinstance(value, list):
return [self._safe_str(item) for item in value if self._safe_str(item)]
return []
memory_type_values = _collect_list("memory_types")
memory_types: list[MemoryType] = []
for item in memory_type_values:
if not item:
continue
try:
memory_types.append(MemoryType(item))
except ValueError:
# 尝试匹配value值
normalized = item.lower()
for mt in MemoryType:
if mt.value == normalized:
memory_types.append(mt)
break
plan = MemoryQueryPlan(
semantic_query=semantic_query,
memory_types=memory_types,
subject_includes=_collect_list("subject_includes"),
object_includes=_collect_list("object_includes"),
required_keywords=_collect_list("required_keywords"),
optional_keywords=_collect_list("optional_keywords"),
owner_filters=_collect_list("owner_filters"),
recency_preference=self._safe_str(data.get("recency")) or "any",
limit=self._safe_int(data.get("limit"), self.default_limit),
emphasis=self._safe_str(data.get("emphasis")) or "balanced",
raw_plan=data,
)
return plan
def _build_prompt(self, query_text: str, context: dict[str, Any]) -> str:
participants = context.get("participants") or context.get("speaker_names") or []
if isinstance(participants, str):
participants = [participants]
participants = [p for p in participants if isinstance(p, str) and p.strip()]
participant_preview = "".join(participants[:5]) or "未知"
persona = context.get("bot_personality") or context.get("bot_identity") or "未知"
# 构建未读消息上下文信息
context_section = ""
if context.get("has_unread_context") and context.get("unread_messages_context"):
unread_context = context["unread_messages_context"]
unread_messages = unread_context.get("messages", [])
unread_keywords = unread_context.get("keywords", [])
unread_participants = unread_context.get("participants", [])
context_summary = unread_context.get("context_summary", "")
if unread_messages:
# 构建未读消息摘要
message_previews = []
for msg in unread_messages[:5]: # 最多显示5条
sender = msg.get("sender", "未知")
content = msg.get("content", "")[:100] # 限制每条消息长度
message_previews.append(f"{sender}: {content}")
context_section = f"""
## 📋 未读消息上下文 (共{unread_context.get("total_count", 0)}条未读消息)
### 最近消息预览:
{chr(10).join(message_previews)}
### 上下文关键词:
{", ".join(unread_keywords[:15]) if unread_keywords else ""}
### 对话参与者:
{", ".join(unread_participants) if unread_participants else ""}
### 上下文摘要:
{context_summary[:300] if context_summary else ""}
"""
else:
context_section = """
## 📋 未读消息上下文:
无未读消息或上下文信息不可用
"""
return f"""
你是一名记忆检索规划助手,请基于输入生成一个简洁的 JSON 检索计划。
你的任务是分析当前查询并结合未读消息的上下文,生成更精准的记忆检索策略。
仅需提供以下字段:
- semantic_query: 用于向量召回的自然语言描述,要求具体且贴合当前查询和上下文;
- memory_types: 建议检索的记忆类型列表,取值范围来自 MemoryType 枚举 (personal_fact,event,preference,opinion,relationship,emotion,knowledge,skill,goal,experience,contextual)
- subject_includes: 建议出现在记忆主语中的人物或角色;
- object_includes: 建议关注的对象、主题或关键信息;
- required_keywords: 建议必须包含的关键词(从上下文中提取);
- recency: 推荐的时间偏好,可选 recent/any/historical
- limit: 推荐的最大返回数量 (1-15)
- emphasis: 检索重点,可选 balanced/contextual/recent/comprehensive。
请不要生成谓语字段,也不要额外补充其它参数。
## 当前查询:
"{query_text}"
## 已知对话参与者:
{participant_preview}
## 机器人设定:
{persona}{context_section}
## 🎯 指导原则:
1. **上下文关联**: 优先分析与当前查询相关的未读消息内容和关键词
2. **语义理解**: 结合上下文理解查询的真实意图,而非字面意思
3. **参与者感知**: 考虑未读消息中的参与者,检索与他们相关的记忆
4. **主题延续**: 关注未读消息中讨论的主题,检索相关的历史记忆
5. **时间相关性**: 如果未读消息讨论最近的事件,偏向检索相关时期的记忆
请直接输出符合要求的 JSON 对象,禁止添加额外文本或 Markdown 代码块。
"""
@staticmethod
def _safe_str(value: Any) -> str:
if isinstance(value, str):
return value.strip()
if value is None:
return ""
return str(value).strip()
@staticmethod
def _safe_int(value: Any, default: int) -> int:
try:
number = int(value)
if number <= 0:
return default
return number
except (TypeError, ValueError):
return default

View File

@@ -1,75 +0,0 @@
"""
消息集合处理器
负责收集消息、创建集合并将其存入向量存储。
"""
import asyncio
from collections import deque
from typing import Any
from src.chat.memory_system.memory_chunk import MessageCollection
from src.chat.memory_system.message_collection_storage import MessageCollectionStorage
from src.common.logger import get_logger
logger = get_logger(__name__)
class MessageCollectionProcessor:
"""处理消息集合的创建和存储"""
def __init__(self, storage: MessageCollectionStorage, buffer_size: int = 5):
self.storage = storage
self.buffer_size = buffer_size
self.message_buffers: dict[str, deque[str]] = {}
self._lock = asyncio.Lock()
async def add_message(self, message_text: str, chat_id: str):
"""添加一条新消息到指定聊天的缓冲区,并在满时触发处理"""
async with self._lock:
if not isinstance(message_text, str) or not message_text.strip():
return
if chat_id not in self.message_buffers:
self.message_buffers[chat_id] = deque(maxlen=self.buffer_size)
buffer = self.message_buffers[chat_id]
buffer.append(message_text)
logger.debug(f"消息已添加到聊天 '{chat_id}' 的缓冲区,当前数量: {len(buffer)}/{self.buffer_size}")
if len(buffer) == self.buffer_size:
await self._process_buffer(chat_id)
async def _process_buffer(self, chat_id: str):
"""处理指定聊天缓冲区中的消息,创建并存储一个集合"""
buffer = self.message_buffers.get(chat_id)
if not buffer or len(buffer) < self.buffer_size:
return
messages_to_process = list(buffer)
buffer.clear()
logger.info(f"聊天 '{chat_id}' 的消息缓冲区已满,开始创建消息集合...")
try:
combined_text = "\n".join(messages_to_process)
collection = MessageCollection(
chat_id=chat_id,
messages=messages_to_process,
combined_text=combined_text,
)
await self.storage.add_collection(collection)
logger.info(f"成功为聊天 '{chat_id}' 创建并存储了新的消息集合: {collection.collection_id}")
except Exception as e:
logger.error(f"处理聊天 '{chat_id}' 的消息缓冲区失败: {e}", exc_info=True)
def get_stats(self) -> dict[str, Any]:
"""获取处理器统计信息"""
total_buffered_messages = sum(len(buf) for buf in self.message_buffers.values())
return {
"active_buffers": len(self.message_buffers),
"total_buffered_messages": total_buffered_messages,
"buffer_capacity_per_chat": self.buffer_size,
}

View File

@@ -1,193 +0,0 @@
"""
消息集合向量存储系统
专用于存储和检索消息集合,以提供即时上下文。
"""
import time
from typing import Any
from src.chat.memory_system.memory_chunk import MessageCollection
from src.chat.utils.utils import get_embedding
from src.common.logger import get_logger
from src.common.vector_db import vector_db_service
from src.config.config import global_config
logger = get_logger(__name__)
class MessageCollectionStorage:
"""消息集合向量存储"""
def __init__(self):
self.config = global_config.memory
self.vector_db_service = vector_db_service
self.collection_name = "message_collections"
self._initialize_storage()
def _initialize_storage(self):
"""初始化存储"""
try:
self.vector_db_service.get_or_create_collection(
name=self.collection_name,
metadata={"description": "短期消息集合记忆", "hnsw:space": "cosine"},
)
logger.info(f"消息集合存储初始化完成,集合: '{self.collection_name}'")
except Exception as e:
logger.error(f"消息集合存储初始化失败: {e}", exc_info=True)
raise
async def add_collection(self, collection: MessageCollection):
"""添加一个新的消息集合,并处理容量和时间限制"""
try:
# 清理过期和超额的集合
await self._cleanup_collections()
# 向量化并存储
embedding = await get_embedding(collection.combined_text)
if not embedding:
logger.warning(f"无法为消息集合 {collection.collection_id} 生成向量,跳过存储。")
return
collection.embedding = embedding
self.vector_db_service.add(
collection_name=self.collection_name,
embeddings=[embedding],
ids=[collection.collection_id],
documents=[collection.combined_text],
metadatas=[collection.to_dict()],
)
logger.debug(f"成功存储消息集合: {collection.collection_id}")
except Exception as e:
logger.error(f"存储消息集合失败: {e}", exc_info=True)
async def _cleanup_collections(self):
"""清理超额和过期的消息集合"""
try:
# 基于时间清理
if self.config.instant_memory_retention_hours > 0:
expiration_time = time.time() - self.config.instant_memory_retention_hours * 3600
expired_docs = self.vector_db_service.get(
collection_name=self.collection_name,
where={"created_at": {"$lt": expiration_time}},
include=[], # 只获取ID
)
if expired_docs and expired_docs.get("ids"):
self.vector_db_service.delete(collection_name=self.collection_name, ids=expired_docs["ids"])
logger.info(f"删除了 {len(expired_docs['ids'])} 个过期的瞬时记忆")
# 基于数量清理
current_count = self.vector_db_service.count(self.collection_name)
if current_count > self.config.instant_memory_max_collections:
num_to_delete = current_count - self.config.instant_memory_max_collections
# 获取所有文档的元数据以进行排序
all_docs = self.vector_db_service.get(
collection_name=self.collection_name,
include=["metadatas"]
)
if all_docs and all_docs.get("ids"):
# 在内存中排序找到最旧的文档
sorted_docs = sorted(
zip(all_docs["ids"], all_docs["metadatas"]),
key=lambda item: item[1].get("created_at", 0),
)
ids_to_delete = [doc[0] for doc in sorted_docs[:num_to_delete]]
if ids_to_delete:
self.vector_db_service.delete(collection_name=self.collection_name, ids=ids_to_delete)
logger.info(f"消息集合已满,删除最旧的 {len(ids_to_delete)} 个集合")
except Exception as e:
logger.error(f"清理消息集合失败: {e}", exc_info=True)
async def get_relevant_collection(self, query_text: str, n_results: int = 1) -> list[MessageCollection]:
"""根据查询文本检索最相关的消息集合"""
if not query_text.strip():
return []
try:
query_embedding = await get_embedding(query_text)
if not query_embedding:
return []
results = self.vector_db_service.query(
collection_name=self.collection_name,
query_embeddings=[query_embedding],
n_results=n_results,
)
collections = []
if results and results.get("ids") and results["ids"][0]:
collections.extend(MessageCollection.from_dict(metadata) for metadata in results["metadatas"][0])
return collections
except Exception as e:
logger.error(f"检索相关消息集合失败: {e}", exc_info=True)
return []
async def get_message_collection_context(self, query_text: str, chat_id: str) -> str:
"""获取消息集合上下文,用于添加到 prompt 中。优先展示当前聊天的上下文。"""
try:
collections = await self.get_relevant_collection(query_text, n_results=5)
if not collections:
return ""
# 根据传入的 chat_id 对集合进行排序
collections.sort(key=lambda c: c.chat_id == chat_id, reverse=True)
context_parts = []
for collection in collections:
if not collection.combined_text:
continue
header = "## 📝 相关对话上下文\n"
if collection.chat_id == chat_id:
# 匹配的ID使用更明显的标识
context_parts.append(
f"{header} [🔥 来自当前聊天的上下文]\n```\n{collection.combined_text}\n```"
)
else:
# 不匹配的ID
context_parts.append(
f"{header} [💡 来自其他聊天的相关上下文 (ID: {collection.chat_id})]\n```\n{collection.combined_text}\n```"
)
if not context_parts:
return ""
# 格式化消息集合为 prompt 上下文
final_context = "\n\n---\n\n".join(context_parts) + "\n\n---"
logger.info(f"🔗 为查询 '{query_text[:50]}...' 在聊天 '{chat_id}' 中找到 {len(collections)} 个相关消息集合上下文")
return f"\n{final_context}\n"
except Exception as e:
logger.error(f"get_message_collection_context 失败: {e}")
return ""
def clear_all(self):
"""清空所有消息集合"""
try:
# In ChromaDB, the easiest way to clear a collection is to delete and recreate it.
self.vector_db_service.delete_collection(name=self.collection_name)
self._initialize_storage()
logger.info(f"已清空所有消息集合: '{self.collection_name}'")
except Exception as e:
logger.error(f"清空消息集合失败: {e}", exc_info=True)
def get_stats(self) -> dict[str, Any]:
"""获取存储统计信息"""
try:
count = self.vector_db_service.count(self.collection_name)
return {
"collection_name": self.collection_name,
"total_collections": count,
"storage_limit": self.config.instant_memory_max_collections,
}
except Exception as e:
logger.error(f"获取消息集合存储统计失败: {e}")
return {}

File diff suppressed because it is too large Load Diff

View File

@@ -256,8 +256,6 @@ class DefaultReplyer:
self._chat_info_initialized = False
self.heart_fc_sender = HeartFCSender()
# 使用新的增强记忆系统
# from src.chat.memory_system.enhanced_memory_activator import EnhancedMemoryActivator
self._chat_info_initialized = False
async def _initialize_chat_info(self):
@@ -401,19 +399,9 @@ class DefaultReplyer:
f"插件{result.get_summary().get('stopped_handlers', '')}于请求后取消了内容生成"
)
# 回复生成成功后,异步存储聊天记忆(不阻塞返回)
try:
# 将记忆存储作为子任务创建,可以被取消
memory_task = asyncio.create_task(
self._store_chat_memory_async(reply_to, reply_message),
name=f"store_memory_{self.chat_stream.stream_id}"
)
# 不等待完成,让它在后台运行
# 如果父任务被取消,这个子任务也会被垃圾回收
logger.debug(f"创建记忆存储子任务: {memory_task.get_name()}")
except Exception as memory_e:
# 记忆存储失败不应该影响回复生成的成功返回
logger.warning(f"记忆存储失败,但不影响回复生成: {memory_e}")
# 旧的自动记忆存储已移除,现在使用记忆图系统通过工具创建记忆
# 记忆由LLM在对话过程中通过CreateMemoryTool主动创建而非自动存储
pass
return True, llm_response, prompt
@@ -545,19 +533,6 @@ class DefaultReplyer:
Returns:
str: 记忆信息字符串
"""
chat_talking_prompt_short = build_readable_messages(
chat_history,
replace_bot_name=True,
merge_messages=False,
timestamp_mode="relative",
read_mark=0.0,
show_actions=True,
)
if not global_config.memory.enable_memory:
return ""
# 使用新的记忆图系统检索记忆(带智能查询优化)
all_memories = []
try:
@@ -1886,14 +1861,22 @@ class DefaultReplyer:
return f"你与{sender}是普通朋友关系。"
# 已废弃:旧的自动记忆存储逻辑
# 新的记忆图系统通过LLM工具(CreateMemoryTool)主动创建记忆,而非自动存储
async def _store_chat_memory_async(self, reply_to: str, reply_message: DatabaseMessages | dict[str, Any] | None = None):
"""
异步存储聊天记忆从build_memory_block迁移而来
[已废弃] 异步存储聊天记忆从build_memory_block迁移而来
此函数已被记忆图系统的工具调用方式替代。
记忆现在由LLM在对话过程中通过CreateMemoryTool主动创建。
Args:
reply_to: 回复对象
reply_message: 回复的原始消息
"""
return # 已禁用,保留函数签名以防其他地方有引用
# 以下代码已废弃,不再执行
try:
if not global_config.memory.enable_memory:
return

View File

@@ -641,150 +641,6 @@ class Prompt:
logger.error(f"构建表达习惯失败: {e}")
return {"expression_habits_block": ""}
# _build_memory_block 和 _build_memory_block_fast 已移除
# 记忆构建现在完全由 default_generator.py 的 build_memory_block 方法处理
# 使用新的记忆图系统,通过 pre_built_params 传入
async def _REMOVED_build_memory_block(self) -> dict[str, Any]:
"""已废弃:构建与当前对话相关的记忆上下文块(完整版)."""
if not global_config.memory.enable_memory:
return {"memory_block": ""}
try:
from src.chat.memory_system.enhanced_memory_activator import enhanced_memory_activator
# 准备用于记忆激活的聊天历史
chat_history = ""
if self.parameters.message_list_before_now_long:
recent_messages = self.parameters.message_list_before_now_long[-20:]
chat_history = await build_readable_messages(
recent_messages, replace_bot_name=True, timestamp_mode="normal", truncate=True
)
# 并行查询长期记忆和即时记忆以提高性能
import asyncio
memory_tasks = [
enhanced_memory_activator.activate_memory_with_chat_history(
target_message=self.parameters.target, chat_history_prompt=chat_history
),
enhanced_memory_activator.get_instant_memory(
target_message=self.parameters.target, chat_id=self.parameters.chat_id
),
]
try:
# 使用 `return_exceptions=True` 来防止一个任务的失败导致所有任务失败
running_memories, instant_memory = await asyncio.gather(*memory_tasks, return_exceptions=True)
# 单独处理每个任务的结果,如果是异常则记录并使用默认值
if isinstance(running_memories, BaseException):
logger.warning(f"长期记忆查询失败: {running_memories}")
running_memories = []
if isinstance(instant_memory, BaseException):
logger.warning(f"即时记忆查询失败: {instant_memory}")
instant_memory = None
except asyncio.TimeoutError:
logger.warning("记忆查询超时,使用部分结果")
running_memories = []
instant_memory = None
# 将检索到的记忆格式化为提示词
if running_memories:
try:
from src.chat.memory_system.memory_formatter import format_memories_bracket_style
# 将原始记忆数据转换为格式化器所需的标准格式
formatted_memories = []
for memory in running_memories:
content = memory.get("content", "")
display_text = content
# 清理内容,移除元数据括号
if "(类型:" in content and "" in content:
display_text = content.split("(类型:")[0].strip()
# 映射记忆主题到标准类型
topic = memory.get("topic", "personal_fact")
memory_type_mapping = {
"relationship": "personal_fact",
"opinion": "opinion",
"personal_fact": "personal_fact",
"preference": "preference",
"event": "event",
}
mapped_type = memory_type_mapping.get(topic, "personal_fact")
formatted_memories.append(
{
"display": display_text,
"memory_type": mapped_type,
"metadata": {
"confidence": memory.get("confidence", "未知"),
"importance": memory.get("importance", "一般"),
"timestamp": memory.get("timestamp", ""),
"source": memory.get("source", "unknown"),
"relevance_score": memory.get("relevance_score", 0.0),
},
}
)
# 使用指定的风格进行格式化
memory_block = format_memories_bracket_style(
formatted_memories, query_context=self.parameters.target
)
except Exception as e:
# 如果格式化失败,提供一个简化的、健壮的备用格式
logger.warning(f"记忆格式化失败,使用简化格式: {e}")
memory_parts = ["## 相关记忆回顾", ""]
for memory in running_memories:
content = memory.get("content", "")
if "(类型:" in content and "" in content:
clean_content = content.split("(类型:")[0].strip()
memory_parts.append(f"- {clean_content}")
else:
memory_parts.append(f"- {content}")
memory_block = "\n".join(memory_parts)
else:
memory_block = ""
# 将即时记忆附加到记忆块的末尾
if instant_memory:
if memory_block:
memory_block += f"\n- 最相关记忆:{instant_memory}"
else:
memory_block = f"- 最相关记忆:{instant_memory}"
return {"memory_block": memory_block}
except Exception as e:
logger.error(f"构建记忆块失败: {e}")
return {"memory_block": ""}
async def _REMOVED_build_memory_block_fast(self) -> dict[str, Any]:
"""已废弃:快速构建记忆块(简化版),作为未预构建时的后备方案."""
if not global_config.memory.enable_memory:
return {"memory_block": ""}
try:
from src.chat.memory_system.enhanced_memory_activator import enhanced_memory_activator
# 这个快速版本只查询最高优先级的“即时记忆”,速度更快
instant_memory = await enhanced_memory_activator.get_instant_memory(
target_message=self.parameters.target, chat_id=self.parameters.chat_id
)
if instant_memory:
memory_block = f"- 相关记忆:{instant_memory}"
else:
memory_block = ""
return {"memory_block": memory_block}
except Exception as e:
logger.warning(f"快速构建记忆块失败: {e}")
return {"memory_block": ""}
async def _build_relation_info(self) -> dict[str, Any]:
"""构建与对话目标相关的关系信息."""
try:

View File

@@ -13,7 +13,6 @@ from maim_message import MessageServer
from rich.traceback import install
from src.chat.emoji_system.emoji_manager import get_emoji_manager
from src.chat.memory_system.memory_manager import memory_manager
from src.chat.message_receive.bot import chat_bot
from src.chat.message_receive.chat_stream import get_chat_manager
from src.chat.utils.statistic import OnlineTimeRecordTask, StatisticOutputTask
@@ -76,8 +75,6 @@ class MainSystem:
"""主系统类,负责协调所有组件"""
def __init__(self) -> None:
# 使用增强记忆系统
self.memory_manager = memory_manager
self.individuality: Individuality = get_individuality()
# 使用消息API替代直接的FastAPI实例
@@ -250,12 +247,6 @@ class MainSystem:
logger.error(f"准备停止消息重组器时出错: {e}")
# 停止增强记忆系统
try:
if global_config.memory and getattr(global_config.memory, 'enable', False):
cleanup_tasks.append(("增强记忆系统", self.memory_manager.shutdown()))
except Exception as e:
logger.error(f"准备停止增强记忆系统时出错: {e}")
# 停止统一调度器
try:
from src.schedule.unified_scheduler import shutdown_scheduler
@@ -468,13 +459,12 @@ MoFox_Bot(第三方修改版)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
# 初始化增强记忆系统
if global_config.memory and getattr(global_config.memory, 'enable', False):
from src.chat.memory_system.memory_system import initialize_memory_system
await self._safe_init("增强记忆系统", initialize_memory_system)()
await self._safe_init("记忆管理器", self.memory_manager.initialize)()
else:
logger.info("记忆系统已禁用,跳过初始化")
# 初始化记忆系统
try:
from src.memory_graph.manager_singleton import initialize_memory_manager
await self._safe_init("记忆系统", initialize_memory_manager)()
except Exception as e:
logger.error(f"记忆图系统初始化失败: {e}")
# 初始化消息兴趣值计算组件
await self._initialize_interest_calculator()

View File

@@ -163,6 +163,14 @@ class MemoryGraphConfig:
activation_propagation_depth=getattr(mg_config, 'activation_propagation_depth', 1),
max_memory_nodes_per_memory=getattr(mg_config, 'max_memory_nodes_per_memory', 10),
max_related_memories=getattr(mg_config, 'max_related_memories', 5),
# 检索配置
retrieval=RetrievalConfig(
max_expand_depth=getattr(mg_config, 'search_max_expand_depth', 2),
vector_weight=getattr(mg_config, 'search_vector_weight', 0.4),
graph_distance_weight=getattr(mg_config, 'search_graph_distance_weight', 0.2),
importance_weight=getattr(mg_config, 'search_importance_weight', 0.2),
recency_weight=getattr(mg_config, 'search_recency_weight', 0.2),
),
)
return config
@@ -206,7 +214,14 @@ class MemoryGraphConfig:
max_related_memories=config_dict.get("max_related_memories", 5),
# 旧配置字段(向后兼容)
consolidation=ConsolidationConfig(**config_dict.get("consolidation", {})),
retrieval=RetrievalConfig(**config_dict.get("retrieval", {})),
retrieval=RetrievalConfig(
max_expand_depth=config_dict.get("search_max_expand_depth", 2),
vector_weight=config_dict.get("search_vector_weight", 0.4),
graph_distance_weight=config_dict.get("search_graph_distance_weight", 0.2),
importance_weight=config_dict.get("search_importance_weight", 0.2),
recency_weight=config_dict.get("search_recency_weight", 0.2),
**config_dict.get("retrieval", {})
),
node_merger=NodeMergerConfig(**config_dict.get("node_merger", {})),
storage=StorageConfig(**config_dict.get("storage", {})),
decay_rates=config_dict.get("decay_rates", cls().decay_rates),

View File

@@ -365,20 +365,26 @@ class MemoryManager:
chat_history = context.get("chat_history", "")
sender = context.get("sender", "")
prompt = f"""你是一个记忆检索查询优化助手。请将用户的查询转换为更适合语义搜索的表述
prompt = f"""你是一个记忆检索查询优化助手。你的任务是分析对话历史,生成一个综合性的搜索查询
要求:
1. 提取查询的核心意图和关键信息
2. 使用更具体、描述性的语言
3. 如果查询涉及人物,明确指出是谁
4. 保持简洁,只输出优化后的查询文本
**任务说明:**
不要只优化单个消息,而是要综合分析整个对话上下文,提取出最核心的检索意图。
当前查询: {query}
**要求:**
1. 仔细阅读对话历史,理解对话的主题和脉络
2. 识别关键人物、事件、关系和话题
3. 提取最值得检索的核心信息点
4. 生成一个简洁但信息丰富的搜索查询15-30字
5. 如果涉及特定人物,必须明确指出人名
6. 只输出查询文本,不要解释
{f"发言人: {sender}" if sender else ""}
{f"最近对话: {chat_history[-200:]}" if chat_history else ""}
**对话上下文:**
{chat_history[-500:] if chat_history else "(无历史对话)"}
优化后的查询:"""
**当前消息:**
{sender}: {query}
**生成综合查询:**"""
optimized_query, _ = await llm.generate_response_async(
prompt,

View File

@@ -648,18 +648,20 @@ class ChatterPlanFilter:
else:
keywords.append("晚上")
# 使用新的统一记忆系统检索记忆
# 使用记忆系统检索记忆
try:
from src.chat.memory_system import get_memory_system
from src.memory_graph.manager_singleton import get_memory_manager
memory_manager = get_memory_manager()
if not memory_manager:
return "记忆系统未初始化。"
memory_system = get_memory_system()
# 将关键词转换为查询字符串
query = " ".join(keywords)
enhanced_memories = await memory_system.retrieve_relevant_memories(
query_text=query,
user_id="system", # 系统查询
scope_id="system",
limit=5,
enhanced_memories = await memory_manager.search_memories(
query=query,
top_k=5,
optimize_query=False, # 直接使用关键词查询
)
if not enhanced_memories:
@@ -667,9 +669,14 @@ class ChatterPlanFilter:
# 转换格式以兼容现有代码
retrieved_memories = []
for memory_chunk in enhanced_memories:
content = memory_chunk.display or memory_chunk.text_content or ""
memory_type = memory_chunk.memory_type.value if memory_chunk.memory_type else "unknown"
for memory in enhanced_memories:
# 从记忆图的节点中提取内容
content_parts = []
for node in memory.nodes:
if node.content:
content_parts.append(node.content)
content = " ".join(content_parts) if content_parts else "无内容"
memory_type = memory.memory_type.value
retrieved_memories.append((memory_type, content))
memory_statements = [

View File

@@ -1,5 +1,5 @@
[inner]
version = "7.6.0"
version = "7.6.1"
#----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -254,7 +254,7 @@ search_min_importance = 0.3 # 最小重要性阈值 (0.0-1.0)
search_similarity_threshold = 0.5 # 向量相似度阈值
# 智能查询优化
enable_query_optimization = true # 启用查询优化(使用小模型优化搜索查询)
enable_query_optimization = true # 启用查询优化(使用小模型分析对话历史,生成综合性搜索查询)
# === 记忆整合配置 ===
consolidation_enabled = true # 是否启用记忆整合
@@ -272,27 +272,17 @@ activation_decay_rate = 0.9 # 激活度衰减率每天衰减10%
activation_propagation_strength = 0.5 # 激活传播强度(传播到相关记忆的激活度比例)
activation_propagation_depth = 1 # 激活传播深度(最多传播几层)
# === 记忆检索配置 ===
search_max_expand_depth = 2 # 检索时图扩展深度0=仅直接匹配1=扩展1跳2=扩展2跳推荐1-2
search_vector_weight = 0.4 # 向量相似度权重
search_graph_distance_weight = 0.2 # 图距离权重
search_importance_weight = 0.2 # 重要性权重
search_recency_weight = 0.2 # 时效性权重
# === 性能配置 ===
max_memory_nodes_per_memory = 10 # 每条记忆最多包含的节点数
max_related_memories = 5 # 激活传播时最多影响的相关记忆数
# ==================== 旧记忆系统配置 (已弃用) ====================
# 注意:以下配置仅用于向后兼容,新系统不使用这些配置
# 旧的 enhanced memory 系统已被 memory_graph 系统取代
[memory_legacy]
# 旧系统已禁用,所有配置保留仅供参考
enable_legacy_memory = false # 旧记忆系统已禁用
# Vector DB配置 (ChromaDB) - 保留用于其他系统
[vector_db]
type = "chromadb" # Vector DB类型
path = "data/chroma_db" # Vector DB数据路径用于其他系统非memory_graph
[vector_db.settings]
anonymized_telemetry = false # 禁用匿名遥测
allow_reset = true # 允许重置
[voice]
enable_asr = true # 是否启用语音识别启用后MoFox-Bot可以识别语音消息启用该功能需要配置语音识别模型[model.voice]
# [语音识别提供商] 可选值: "api", "local". 默认使用 "api".