feat:实现具有向量和元数据索引的统一内存存储系统

- 添加了 UnifiedMemoryStorage 类,用于管理带向量嵌入的内存块。
- 集成了 FAISS,以实现高效的向量存储和搜索。
- 实现了内存缓存、关键字、类型和用户索引。
- 增加了内存遗忘和自动保存存储数据的支持。
- 包含用于存储、搜索和遗忘记忆的方法。
- 引入了存储行为和性能的配置选项。
- 实现了从磁盘加载和保存内存及向量数据。
This commit is contained in:
Windpicker-owo
2025-10-01 18:02:42 +08:00
parent e09e8fd79e
commit 9359e489a9
28 changed files with 1883 additions and 499 deletions

View File

@@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
"""
简化记忆系统模块
移除即时记忆和长期记忆分类,实现统一记忆架构和智能遗忘机制
"""
# 核心数据结构
from .memory_chunk import (
MemoryChunk,
MemoryMetadata,
ContentStructure,
MemoryType,
ImportanceLevel,
ConfidenceLevel,
create_memory_chunk
)
# 遗忘引擎
from .memory_forgetting_engine import (
MemoryForgettingEngine,
ForgettingConfig,
get_memory_forgetting_engine
)
# 统一存储系统
from .unified_memory_storage import (
UnifiedMemoryStorage,
UnifiedStorageConfig,
get_unified_memory_storage,
initialize_unified_memory_storage
)
# 记忆核心系统
from .memory_system import (
MemorySystem,
MemorySystemConfig,
get_memory_system,
initialize_memory_system
)
# 记忆管理器
from .memory_manager import (
MemoryManager,
MemoryResult,
memory_manager
)
# 激活器
from .enhanced_memory_activator import (
MemoryActivator,
memory_activator
)
# 格式化器
from .memory_formatter import (
MemoryFormatter,
FormatterConfig,
format_memories_for_llm,
format_memories_bracket_style
)
# 兼容性别名
from .memory_chunk import MemoryChunk as Memory
__all__ = [
# 核心数据结构
"MemoryChunk",
"Memory", # 兼容性别名
"MemoryMetadata",
"ContentStructure",
"MemoryType",
"ImportanceLevel",
"ConfidenceLevel",
"create_memory_chunk",
# 遗忘引擎
"MemoryForgettingEngine",
"ForgettingConfig",
"get_memory_forgetting_engine",
# 统一存储
"UnifiedMemoryStorage",
"UnifiedStorageConfig",
"get_unified_memory_storage",
"initialize_unified_memory_storage",
# 记忆系统
"MemorySystem",
"MemorySystemConfig",
"get_memory_system",
"initialize_memory_system",
# 记忆管理器
"MemoryManager",
"MemoryResult",
"memory_manager",
# 激活器
"MemoryActivator",
"memory_activator",
# 格式化器
"MemoryFormatter",
"FormatterConfig",
"format_memories_for_llm",
"format_memories_bracket_style",
]
# 版本信息
__version__ = "3.0.0"
__author__ = "MoFox Team"
__description__ = "简化记忆系统 - 统一记忆架构与智能遗忘机制"

View File

@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
"""
增强记忆激活器
替代原有的 MemoryActivator使用增强记忆系统
记忆激活器
记忆系统的激活器组件
"""
import difflib
@@ -15,9 +15,9 @@ from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config, model_config
from src.common.logger import get_logger
from src.chat.utils.prompt import Prompt, global_prompt_manager
from src.chat.memory_system.enhanced_memory_manager import enhanced_memory_manager, EnhancedMemoryResult
from src.chat.memory_system.memory_manager import memory_manager, MemoryResult
logger = get_logger("enhanced_memory_activator")
logger = get_logger("memory_activator")
def get_keywords_from_json(json_str) -> List:
@@ -43,9 +43,9 @@ def get_keywords_from_json(json_str) -> List:
def init_prompt():
# --- Enhanced Memory Activator Prompt ---
enhanced_memory_activator_prompt = """
你是一个增强记忆分析器,你需要根据以下信息来进行记忆检索
# --- Memory Activator Prompt ---
memory_activator_prompt = """
你是一个记忆分析器,你需要根据以下信息来进行记忆检索
以下是一段聊天记录,请根据这些信息,总结出几个关键词作为记忆检索的触发词
@@ -66,25 +66,25 @@ def init_prompt():
不要输出其他多余内容只输出json格式就好
"""
Prompt(enhanced_memory_activator_prompt, "enhanced_memory_activator_prompt")
Prompt(memory_activator_prompt, "memory_activator_prompt")
class EnhancedMemoryActivator:
"""增强记忆激活器 - 替代原有的 MemoryActivator"""
class MemoryActivator:
"""记忆激活器"""
def __init__(self):
self.key_words_model = LLMRequest(
model_set=model_config.model_task_config.utils_small,
request_type="enhanced_memory.activator",
request_type="memory.activator",
)
self.running_memory = []
self.cached_keywords = set() # 用于缓存历史关键词
self.last_enhanced_query_time = 0 # 上次查询增强记忆的时间
self.last_memory_query_time = 0 # 上次查询记忆的时间
async def activate_memory_with_chat_history(self, target_message, chat_history_prompt) -> List[Dict]:
"""
激活增强记忆
激活记忆
"""
# 如果记忆系统被禁用,直接返回空列表
if not global_config.memory.enable_memory:
@@ -94,7 +94,7 @@ class EnhancedMemoryActivator:
cached_keywords_str = ", ".join(self.cached_keywords) if self.cached_keywords else "暂无历史关键词"
prompt = await global_prompt_manager.format_prompt(
"enhanced_memory_activator_prompt",
"memory_activator_prompt",
obs_info_text=chat_history_prompt,
target_message=target_message,
cached_keywords=cached_keywords_str,
@@ -117,14 +117,14 @@ class EnhancedMemoryActivator:
# 添加新的关键词到缓存
self.cached_keywords.update(keywords)
logger.debug(f"增强记忆关键词: {self.cached_keywords}")
logger.debug(f"记忆关键词: {self.cached_keywords}")
# 使用增强记忆系统获取相关记忆
enhanced_results = await self._query_enhanced_memory(keywords, target_message)
# 使用记忆系统获取相关记忆
memory_results = await self._query_unified_memory(keywords, target_message)
# 处理和增强记忆结果
if enhanced_results:
for result in enhanced_results:
# 处理和记忆结果
if memory_results:
for result in memory_results:
# 检查是否已存在相似内容的记忆
exists = any(
m["content"] == result.content or
@@ -139,78 +139,95 @@ class EnhancedMemoryActivator:
"duration": 1,
"confidence": result.confidence,
"importance": result.importance,
"source": result.source
"source": result.source,
"relevance_score": result.relevance_score # 添加相关度评分
}
self.running_memory.append(memory_entry)
logger.debug(f"添加新增强记忆: {result.memory_type} - {result.content}")
logger.debug(f"添加新记忆: {result.memory_type} - {result.content}")
# 激活时所有已有记忆的duration+1达到3则移除
for m in self.running_memory[:]:
m["duration"] = m.get("duration", 1) + 1
self.running_memory = [m for m in self.running_memory if m["duration"] < 3]
# 限制同时加载的记忆条数最多保留最后5条(增强记忆可以处理更多)
# 限制同时加载的记忆条数最多保留最后5条
if len(self.running_memory) > 5:
self.running_memory = self.running_memory[-5:]
return self.running_memory
async def _query_enhanced_memory(self, keywords: List[str], query_text: str) -> List[EnhancedMemoryResult]:
"""查询增强记忆系统"""
async def _query_unified_memory(self, keywords: List[str], query_text: str) -> List[MemoryResult]:
"""查询统一记忆系统"""
try:
# 确保增强记忆管理器已初始化
if not enhanced_memory_manager.is_initialized:
await enhanced_memory_manager.initialize()
# 使用记忆系统
from src.chat.memory_system.memory_system import get_memory_system
memory_system = get_memory_system()
if not memory_system or memory_system.status.value != "ready":
logger.warning("记忆系统未就绪")
return []
# 构建查询上下文
context = {
"keywords": keywords,
"query_intent": "conversation_response",
"expected_memory_types": [
"personal_fact", "event", "preference", "opinion"
]
"query_intent": "conversation_response"
}
# 查询增强记忆
enhanced_results = await enhanced_memory_manager.get_enhanced_memory_context(
# 查询记忆
memories = await memory_system.retrieve_relevant_memories(
query_text=query_text,
user_id="default_user", # 可以根据实际用户ID调整
user_id="global", # 使用全局作用域
context=context,
limit=5
)
logger.debug(f"增强记忆查询返回 {len(enhanced_results)} 条结果")
return enhanced_results
# 转换为 MemoryResult 格式
memory_results = []
for memory in memories:
result = MemoryResult(
content=memory.display,
memory_type=memory.memory_type.value,
confidence=memory.metadata.confidence.value,
importance=memory.metadata.importance.value,
timestamp=memory.metadata.created_at,
source="unified_memory",
relevance_score=memory.metadata.relevance_score
)
memory_results.append(result)
logger.debug(f"统一记忆查询返回 {len(memory_results)} 条结果")
return memory_results
except Exception as e:
logger.error(f"查询增强记忆失败: {e}")
logger.error(f"查询统一记忆失败: {e}")
return []
async def get_instant_memory(self, target_message: str, chat_id: str) -> Optional[str]:
"""
获取即时记忆 - 兼容原有接口
获取即时记忆 - 兼容原有接口(使用统一存储)
"""
try:
# 使用增强记忆系统获取相关记忆
if not enhanced_memory_manager.is_initialized:
await enhanced_memory_manager.initialize()
# 使用统一存储系统获取相关记忆
from src.chat.memory_system.memory_system import get_memory_system
memory_system = get_memory_system()
if not memory_system or memory_system.status.value != "ready":
return None
context = {
"query_intent": "instant_response",
"chat_id": chat_id,
"expected_memory_types": ["preference", "opinion", "personal_fact"]
"chat_id": chat_id
}
enhanced_results = await enhanced_memory_manager.get_enhanced_memory_context(
memories = await memory_system.retrieve_relevant_memories(
query_text=target_message,
user_id="default_user",
user_id="global",
context=context,
limit=1
)
if enhanced_results:
# 返回最相关的记忆内容
return enhanced_results[0].content
if memories:
return memories[0].display
return None
@@ -222,15 +239,11 @@ class EnhancedMemoryActivator:
"""清除缓存"""
self.cached_keywords.clear()
self.running_memory.clear()
logger.debug("增强记忆激活器缓存已清除")
logger.debug("记忆激活器缓存已清除")
# 创建全局实例
enhanced_memory_activator = EnhancedMemoryActivator()
# 为了兼容性,保留原有名称
MemoryActivator = EnhancedMemoryActivator
memory_activator = MemoryActivator()
init_prompt()

View File

@@ -0,0 +1,249 @@
# -*- coding: utf-8 -*-
"""
记忆激活器
记忆系统的激活器组件
"""
import difflib
import orjson
import time
from typing import List, Dict, Optional
from datetime import datetime
from json_repair import repair_json
from src.llm_models.utils_model import LLMRequest
from src.config.config import global_config, model_config
from src.common.logger import get_logger
from src.chat.utils.prompt import Prompt, global_prompt_manager
from src.chat.memory_system.memory_manager import memory_manager, MemoryResult
logger = get_logger("memory_activator")
def get_keywords_from_json(json_str) -> List:
"""
从JSON字符串中提取关键词列表
Args:
json_str: JSON格式的字符串
Returns:
List[str]: 关键词列表
"""
try:
# 使用repair_json修复JSON格式
fixed_json = repair_json(json_str)
# 如果repair_json返回的是字符串需要解析为Python对象
result = orjson.loads(fixed_json) if isinstance(fixed_json, str) else fixed_json
return result.get("keywords", [])
except Exception as e:
logger.error(f"解析关键词JSON失败: {e}")
return []
def init_prompt():
# --- Memory Activator Prompt ---
memory_activator_prompt = """
你是一个记忆分析器,你需要根据以下信息来进行记忆检索
以下是一段聊天记录,请根据这些信息,总结出几个关键词作为记忆检索的触发词
聊天记录:
{obs_info_text}
用户想要回复的消息:
{target_message}
历史关键词(请避免重复提取这些关键词):
{cached_keywords}
请输出一个json格式包含以下字段
{{
"keywords": ["关键词1", "关键词2", "关键词3",......]
}}
不要输出其他多余内容只输出json格式就好
"""
Prompt(memory_activator_prompt, "memory_activator_prompt")
class MemoryActivator:
"""记忆激活器"""
def __init__(self):
self.key_words_model = LLMRequest(
model_set=model_config.model_task_config.utils_small,
request_type="memory.activator",
)
self.running_memory = []
self.cached_keywords = set() # 用于缓存历史关键词
self.last_memory_query_time = 0 # 上次查询记忆的时间
async def activate_memory_with_chat_history(self, target_message, chat_history_prompt) -> List[Dict]:
"""
激活记忆
"""
# 如果记忆系统被禁用,直接返回空列表
if not global_config.memory.enable_memory:
return []
# 将缓存的关键词转换为字符串用于prompt
cached_keywords_str = ", ".join(self.cached_keywords) if self.cached_keywords else "暂无历史关键词"
prompt = await global_prompt_manager.format_prompt(
"memory_activator_prompt",
obs_info_text=chat_history_prompt,
target_message=target_message,
cached_keywords=cached_keywords_str,
)
# 生成关键词
response, (reasoning_content, model_name, _) = await self.key_words_model.generate_response_async(
prompt, temperature=0.5
)
keywords = list(get_keywords_from_json(response))
# 更新关键词缓存
if keywords:
# 限制缓存大小最多保留10个关键词
if len(self.cached_keywords) > 10:
# 转换为列表,移除最早的关键词
cached_list = list(self.cached_keywords)
self.cached_keywords = set(cached_list[-8:])
# 添加新的关键词到缓存
self.cached_keywords.update(keywords)
logger.debug(f"记忆关键词: {self.cached_keywords}")
# 使用记忆系统获取相关记忆
memory_results = await self._query_unified_memory(keywords, target_message)
# 处理和记忆结果
if memory_results:
for result in memory_results:
# 检查是否已存在相似内容的记忆
exists = any(
m["content"] == result.content or
difflib.SequenceMatcher(None, m["content"], result.content).ratio() >= 0.7
for m in self.running_memory
)
if not exists:
memory_entry = {
"topic": result.memory_type,
"content": result.content,
"timestamp": datetime.fromtimestamp(result.timestamp).isoformat(),
"duration": 1,
"confidence": result.confidence,
"importance": result.importance,
"source": result.source,
"relevance_score": result.relevance_score # 添加相关度评分
}
self.running_memory.append(memory_entry)
logger.debug(f"添加新记忆: {result.memory_type} - {result.content}")
# 激活时所有已有记忆的duration+1达到3则移除
for m in self.running_memory[:]:
m["duration"] = m.get("duration", 1) + 1
self.running_memory = [m for m in self.running_memory if m["duration"] < 3]
# 限制同时加载的记忆条数最多保留最后5条
if len(self.running_memory) > 5:
self.running_memory = self.running_memory[-5:]
return self.running_memory
async def _query_unified_memory(self, keywords: List[str], query_text: str) -> List[MemoryResult]:
"""查询统一记忆系统"""
try:
# 使用记忆系统
from src.chat.memory_system.memory_system import get_memory_system
memory_system = get_memory_system()
if not memory_system or memory_system.status.value != "ready":
logger.warning("记忆系统未就绪")
return []
# 构建查询上下文
context = {
"keywords": keywords,
"query_intent": "conversation_response"
}
# 查询记忆
memories = await memory_system.retrieve_relevant_memories(
query_text=query_text,
user_id="global", # 使用全局作用域
context=context,
limit=5
)
# 转换为 MemoryResult 格式
memory_results = []
for memory in memories:
result = MemoryResult(
content=memory.display,
memory_type=memory.memory_type.value,
confidence=memory.metadata.confidence.value,
importance=memory.metadata.importance.value,
timestamp=memory.metadata.created_at,
source="unified_memory",
relevance_score=memory.metadata.relevance_score
)
memory_results.append(result)
logger.debug(f"统一记忆查询返回 {len(memory_results)} 条结果")
return memory_results
except Exception as e:
logger.error(f"查询统一记忆失败: {e}")
return []
async def get_instant_memory(self, target_message: str, chat_id: str) -> Optional[str]:
"""
获取即时记忆 - 兼容原有接口(使用统一存储)
"""
try:
# 使用统一存储系统获取相关记忆
from src.chat.memory_system.memory_system import get_memory_system
memory_system = get_memory_system()
if not memory_system or memory_system.status.value != "ready":
return None
context = {
"query_intent": "instant_response",
"chat_id": chat_id
}
memories = await memory_system.retrieve_relevant_memories(
query_text=target_message,
user_id="global",
context=context,
limit=1
)
if memories:
return memories[0].display
return None
except Exception as e:
logger.error(f"获取即时记忆失败: {e}")
return None
def clear_cache(self):
"""清除缓存"""
self.cached_keywords.clear()
self.running_memory.clear()
logger.debug("记忆激活器缓存已清除")
# 创建全局实例
memory_activator = MemoryActivator()
init_prompt()

View File

@@ -97,7 +97,7 @@ class ContentStructure:
@dataclass
class MemoryMetadata:
"""记忆元数据"""
"""记忆元数据 - 简化版本"""
# 基础信息
memory_id: str # 唯一标识符
user_id: str # 用户ID
@@ -108,47 +108,116 @@ class MemoryMetadata:
last_accessed: float = 0.0 # 最后访问时间
last_modified: float = 0.0 # 最后修改时间
# 激活频率管理
last_activation_time: float = 0.0 # 最后激活时间
activation_frequency: int = 0 # 激活频率(单位时间内的激活次数)
total_activations: int = 0 # 总激活次数
# 统计信息
access_count: int = 0 # 访问次数
relevance_score: float = 0.0 # 相关度评分
# 信心和重要性
# 信心和重要性(核心字段)
confidence: ConfidenceLevel = ConfidenceLevel.MEDIUM
importance: ImportanceLevel = ImportanceLevel.NORMAL
# 情感和关系
emotional_context: Optional[str] = None # 情感上下文
relationship_score: float = 0.0 # 关系分0-1
# 遗忘机制相关
forgetting_threshold: float = 0.0 # 遗忘阈值(动态计算)
last_forgetting_check: float = 0.0 # 上次遗忘检查时间
# 来源和验证
# 来源信息
source_context: Optional[str] = None # 来源上下文片段
verification_status: bool = False # 验证状态
def __post_init__(self):
"""后初始化处理"""
if not self.memory_id:
self.memory_id = str(uuid.uuid4())
current_time = time.time()
if self.created_at == 0:
self.created_at = time.time()
self.created_at = current_time
if self.last_accessed == 0:
self.last_accessed = self.created_at
self.last_accessed = current_time
if self.last_modified == 0:
self.last_modified = self.created_at
self.last_modified = current_time
if self.last_activation_time == 0:
self.last_activation_time = current_time
if self.last_forgetting_check == 0:
self.last_forgetting_check = current_time
def update_access(self):
"""更新访问信息"""
current_time = time.time()
self.last_accessed = current_time
self.access_count += 1
self.total_activations += 1
# 更新激活频率
self._update_activation_frequency(current_time)
def _update_activation_frequency(self, current_time: float):
"""更新激活频率24小时内的激活次数"""
from datetime import datetime, timedelta
# 如果超过24小时重置激活频率
if current_time - self.last_activation_time > 86400: # 24小时 = 86400秒
self.activation_frequency = 1
else:
self.activation_frequency += 1
self.last_activation_time = current_time
def update_relevance(self, new_score: float):
"""更新相关度评分"""
self.relevance_score = max(0.0, min(1.0, new_score))
self.last_modified = time.time()
def calculate_forgetting_threshold(self) -> float:
"""计算遗忘阈值(天数)"""
# 基础天数
base_days = 30.0
# 重要性权重 (1-4 -> 0-3)
importance_weight = (self.importance.value - 1) * 15 # 0, 15, 30, 45
# 置信度权重 (1-4 -> 0-3)
confidence_weight = (self.confidence.value - 1) * 10 # 0, 10, 20, 30
# 激活频率权重每5次激活增加1天
frequency_weight = min(self.activation_frequency, 20) * 0.5 # 最多10天
# 计算最终阈值
threshold = base_days + importance_weight + confidence_weight + frequency_weight
# 设置最小和最大阈值
return max(7.0, min(threshold, 365.0)) # 7天到1年之间
def should_forget(self, current_time: Optional[float] = None) -> bool:
"""判断是否应该遗忘"""
if current_time is None:
current_time = time.time()
# 计算遗忘阈值
self.forgetting_threshold = self.calculate_forgetting_threshold()
# 计算距离最后激活的时间
days_since_activation = (current_time - self.last_activation_time) / 86400
return days_since_activation > self.forgetting_threshold
def is_dormant(self, current_time: Optional[float] = None, inactive_days: int = 90) -> bool:
"""判断是否处于休眠状态(长期未激活)"""
if current_time is None:
current_time = time.time()
days_since_last_access = (current_time - self.last_accessed) / 86400
return days_since_last_access > inactive_days
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
@@ -158,14 +227,16 @@ class MemoryMetadata:
"created_at": self.created_at,
"last_accessed": self.last_accessed,
"last_modified": self.last_modified,
"last_activation_time": self.last_activation_time,
"activation_frequency": self.activation_frequency,
"total_activations": self.total_activations,
"access_count": self.access_count,
"relevance_score": self.relevance_score,
"confidence": self.confidence.value,
"importance": self.importance.value,
"emotional_context": self.emotional_context,
"relationship_score": self.relationship_score,
"source_context": self.source_context,
"verification_status": self.verification_status
"forgetting_threshold": self.forgetting_threshold,
"last_forgetting_check": self.last_forgetting_check,
"source_context": self.source_context
}
@classmethod
@@ -178,14 +249,16 @@ class MemoryMetadata:
created_at=data.get("created_at", 0),
last_accessed=data.get("last_accessed", 0),
last_modified=data.get("last_modified", 0),
last_activation_time=data.get("last_activation_time", 0),
activation_frequency=data.get("activation_frequency", 0),
total_activations=data.get("total_activations", 0),
access_count=data.get("access_count", 0),
relevance_score=data.get("relevance_score", 0.0),
confidence=ConfidenceLevel(data.get("confidence", ConfidenceLevel.MEDIUM.value)),
importance=ImportanceLevel(data.get("importance", ImportanceLevel.NORMAL.value)),
emotional_context=data.get("emotional_context"),
relationship_score=data.get("relationship_score", 0.0),
source_context=data.get("source_context"),
verification_status=data.get("verification_status", False)
forgetting_threshold=data.get("forgetting_threshold", 0.0),
last_forgetting_check=data.get("last_forgetting_check", 0),
source_context=data.get("source_context")
)
@@ -269,6 +342,18 @@ class MemoryChunk:
"""更新相关度评分"""
self.metadata.update_relevance(new_score)
def should_forget(self, current_time: Optional[float] = None) -> bool:
"""判断是否应该遗忘"""
return self.metadata.should_forget(current_time)
def is_dormant(self, current_time: Optional[float] = None, inactive_days: int = 90) -> bool:
"""判断是否处于休眠状态(长期未激活)"""
return self.metadata.is_dormant(current_time, inactive_days)
def calculate_forgetting_threshold(self) -> float:
"""计算遗忘阈值(天数)"""
return self.metadata.calculate_forgetting_threshold()
def add_keyword(self, keyword: str):
"""添加关键词"""
if keyword and keyword not in self.keywords:

View File

@@ -0,0 +1,352 @@
# -*- coding: utf-8 -*-
"""
智能记忆遗忘引擎
基于重要程度、置信度和激活频率的智能遗忘机制
"""
import time
import asyncio
from typing import List, Dict, Optional, Set, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
from src.common.logger import get_logger
from src.chat.memory_system.memory_chunk import MemoryChunk, ImportanceLevel, ConfidenceLevel
logger = get_logger(__name__)
@dataclass
class ForgettingStats:
"""遗忘统计信息"""
total_checked: int = 0
marked_for_forgetting: int = 0
actually_forgotten: int = 0
dormant_memories: int = 0
last_check_time: float = 0.0
check_duration: float = 0.0
@dataclass
class ForgettingConfig:
"""遗忘引擎配置"""
# 检查频率配置
check_interval_hours: int = 24 # 定期检查间隔(小时)
batch_size: int = 100 # 批处理大小
# 遗忘阈值配置
base_forgetting_days: float = 30.0 # 基础遗忘天数
min_forgetting_days: float = 7.0 # 最小遗忘天数
max_forgetting_days: float = 365.0 # 最大遗忘天数
# 重要程度权重
critical_importance_bonus: float = 45.0 # 关键重要性额外天数
high_importance_bonus: float = 30.0 # 高重要性额外天数
normal_importance_bonus: float = 15.0 # 一般重要性额外天数
low_importance_bonus: float = 0.0 # 低重要性额外天数
# 置信度权重
verified_confidence_bonus: float = 30.0 # 已验证置信度额外天数
high_confidence_bonus: float = 20.0 # 高置信度额外天数
medium_confidence_bonus: float = 10.0 # 中等置信度额外天数
low_confidence_bonus: float = 0.0 # 低置信度额外天数
# 激活频率权重
activation_frequency_weight: float = 0.5 # 每次激活增加的天数权重
max_frequency_bonus: float = 10.0 # 最大激活频率奖励天数
# 休眠配置
dormant_threshold_days: int = 90 # 休眠状态判定天数
force_forget_dormant_days: int = 180 # 强制遗忘休眠记忆的天数
class MemoryForgettingEngine:
"""智能记忆遗忘引擎"""
def __init__(self, config: Optional[ForgettingConfig] = None):
self.config = config or ForgettingConfig()
self.stats = ForgettingStats()
self._last_forgetting_check = 0.0
self._forgetting_lock = asyncio.Lock()
logger.info("MemoryForgettingEngine 初始化完成")
def calculate_forgetting_threshold(self, memory: MemoryChunk) -> float:
"""
计算记忆的遗忘阈值(天数)
Args:
memory: 记忆块
Returns:
遗忘阈值(天数)
"""
# 基础天数
threshold = self.config.base_forgetting_days
# 重要性权重
importance = memory.metadata.importance
if importance == ImportanceLevel.CRITICAL:
threshold += self.config.critical_importance_bonus
elif importance == ImportanceLevel.HIGH:
threshold += self.config.high_importance_bonus
elif importance == ImportanceLevel.NORMAL:
threshold += self.config.normal_importance_bonus
# LOW 级别不增加额外天数
# 置信度权重
confidence = memory.metadata.confidence
if confidence == ConfidenceLevel.VERIFIED:
threshold += self.config.verified_confidence_bonus
elif confidence == ConfidenceLevel.HIGH:
threshold += self.config.high_confidence_bonus
elif confidence == ConfidenceLevel.MEDIUM:
threshold += self.config.medium_confidence_bonus
# LOW 级别不增加额外天数
# 激活频率权重
frequency_bonus = min(
memory.metadata.activation_frequency * self.config.activation_frequency_weight,
self.config.max_frequency_bonus
)
threshold += frequency_bonus
# 确保在合理范围内
return max(self.config.min_forgetting_days,
min(threshold, self.config.max_forgetting_days))
def should_forget_memory(self, memory: MemoryChunk, current_time: Optional[float] = None) -> bool:
"""
判断记忆是否应该被遗忘
Args:
memory: 记忆块
current_time: 当前时间戳
Returns:
是否应该遗忘
"""
if current_time is None:
current_time = time.time()
# 关键重要性的记忆永不遗忘
if memory.metadata.importance == ImportanceLevel.CRITICAL:
return False
# 计算遗忘阈值
forgetting_threshold = self.calculate_forgetting_threshold(memory)
# 计算距离最后激活的时间
days_since_activation = (current_time - memory.metadata.last_activation_time) / 86400
# 判断是否超过阈值
should_forget = days_since_activation > forgetting_threshold
if should_forget:
logger.debug(
f"记忆 {memory.memory_id[:8]} 触发遗忘条件: "
f"重要性={memory.metadata.importance.name}, "
f"置信度={memory.metadata.confidence.name}, "
f"激活频率={memory.metadata.activation_frequency}, "
f"阈值={forgetting_threshold:.1f}天, "
f"未激活天数={days_since_activation:.1f}"
)
return should_forget
def is_dormant_memory(self, memory: MemoryChunk, current_time: Optional[float] = None) -> bool:
"""
判断记忆是否处于休眠状态
Args:
memory: 记忆块
current_time: 当前时间戳
Returns:
是否处于休眠状态
"""
return memory.is_dormant(current_time, self.config.dormant_threshold_days)
def should_force_forget_dormant(self, memory: MemoryChunk, current_time: Optional[float] = None) -> bool:
"""
判断是否应该强制遗忘休眠记忆
Args:
memory: 记忆块
current_time: 当前时间戳
Returns:
是否应该强制遗忘
"""
if current_time is None:
current_time = time.time()
# 只有非关键重要性的记忆才会被强制遗忘
if memory.metadata.importance == ImportanceLevel.CRITICAL:
return False
days_since_last_access = (current_time - memory.metadata.last_accessed) / 86400
return days_since_last_access > self.config.force_forget_dormant_days
async def check_memories_for_forgetting(self, memories: List[MemoryChunk]) -> Tuple[List[str], List[str]]:
"""
检查记忆列表,识别需要遗忘的记忆
Args:
memories: 记忆块列表
Returns:
(普通遗忘列表, 强制遗忘列表)
"""
start_time = time.time()
current_time = start_time
normal_forgetting_ids = []
force_forgetting_ids = []
self.stats.total_checked = len(memories)
self.stats.last_check_time = current_time
for memory in memories:
try:
# 检查休眠状态
if self.is_dormant_memory(memory, current_time):
self.stats.dormant_memories += 1
# 检查是否应该强制遗忘休眠记忆
if self.should_force_forget_dormant(memory, current_time):
force_forgetting_ids.append(memory.memory_id)
logger.debug(f"休眠记忆 {memory.memory_id[:8]} 被标记为强制遗忘")
continue
# 检查普通遗忘条件
if self.should_forget_memory(memory, current_time):
normal_forgetting_ids.append(memory.memory_id)
self.stats.marked_for_forgetting += 1
except Exception as e:
logger.warning(f"检查记忆 {memory.memory_id[:8]} 遗忘状态失败: {e}")
continue
self.stats.check_duration = time.time() - start_time
logger.info(
f"遗忘检查完成 | 总数={self.stats.total_checked}, "
f"标记遗忘={len(normal_forgetting_ids)}, "
f"强制遗忘={len(force_forgetting_ids)}, "
f"休眠={self.stats.dormant_memories}, "
f"耗时={self.stats.check_duration:.3f}s"
)
return normal_forgetting_ids, force_forgetting_ids
async def perform_forgetting_check(self, memories: List[MemoryChunk]) -> Dict[str, any]:
"""
执行完整的遗忘检查流程
Args:
memories: 记忆块列表
Returns:
检查结果统计
"""
async with self._forgetting_lock:
normal_forgetting, force_forgetting = await self.check_memories_for_forgetting(memories)
# 更新统计
self.stats.actually_forgotten = len(normal_forgetting) + len(force_forgetting)
return {
"normal_forgetting": normal_forgetting,
"force_forgetting": force_forgetting,
"stats": {
"total_checked": self.stats.total_checked,
"marked_for_forgetting": self.stats.marked_for_forgetting,
"actually_forgotten": self.stats.actually_forgotten,
"dormant_memories": self.stats.dormant_memories,
"check_duration": self.stats.check_duration,
"last_check_time": self.stats.last_check_time
}
}
def is_forgetting_check_needed(self) -> bool:
"""检查是否需要进行遗忘检查"""
current_time = time.time()
hours_since_last_check = (current_time - self._last_forgetting_check) / 3600
return hours_since_last_check >= self.config.check_interval_hours
async def schedule_periodic_check(self, memories_provider, enable_auto_cleanup: bool = True):
"""
定期执行遗忘检查(可以在后台任务中调用)
Args:
memories_provider: 提供记忆列表的函数
enable_auto_cleanup: 是否启用自动清理
"""
if not self.is_forgetting_check_needed():
return
try:
logger.info("开始执行定期遗忘检查...")
# 获取记忆列表
memories = await memories_provider()
if not memories:
logger.debug("无记忆数据需要检查")
return
# 执行遗忘检查
result = await self.perform_forgetting_check(memories)
# 如果启用自动清理,执行实际的遗忘操作
if enable_auto_cleanup and (result["normal_forgetting"] or result["force_forgetting"]):
logger.info(f"检测到 {len(result['normal_forgetting'])} 条普通遗忘和 {len(result['force_forgetting'])} 条强制遗忘记忆")
# 这里可以调用实际的删除逻辑
# await self.cleanup_forgotten_memories(result["normal_forgetting"] + result["force_forgetting"])
self._last_forgetting_check = time.time()
except Exception as e:
logger.error(f"定期遗忘检查失败: {e}", exc_info=True)
def get_forgetting_stats(self) -> Dict[str, any]:
"""获取遗忘统计信息"""
return {
"total_checked": self.stats.total_checked,
"marked_for_forgetting": self.stats.marked_for_forgetting,
"actually_forgotten": self.stats.actually_forgotten,
"dormant_memories": self.stats.dormant_memories,
"last_check_time": datetime.fromtimestamp(self.stats.last_check_time).isoformat() if self.stats.last_check_time else None,
"last_check_duration": self.stats.check_duration,
"config": {
"check_interval_hours": self.config.check_interval_hours,
"base_forgetting_days": self.config.base_forgetting_days,
"min_forgetting_days": self.config.min_forgetting_days,
"max_forgetting_days": self.config.max_forgetting_days
}
}
def reset_stats(self):
"""重置统计信息"""
self.stats = ForgettingStats()
logger.debug("遗忘统计信息已重置")
def update_config(self, **kwargs):
"""更新配置"""
for key, value in kwargs.items():
if hasattr(self.config, key):
setattr(self.config, key, value)
logger.debug(f"遗忘配置更新: {key} = {value}")
else:
logger.warning(f"未知的配置项: {key}")
# 创建全局遗忘引擎实例
memory_forgetting_engine = MemoryForgettingEngine()
def get_memory_forgetting_engine() -> MemoryForgettingEngine:
"""获取全局遗忘引擎实例"""
return memory_forgetting_engine

View File

@@ -84,12 +84,6 @@ class MemoryFormatter:
lines = ["## 🧠 相关记忆回顾", ""]
if query_context:
lines.extend([
f"*查询上下文: {query_context}*",
""
])
if self.config.group_by_type:
lines.extend(self._format_memories_by_type(memories))
else:

View File

@@ -5,12 +5,9 @@
"""
import time
import hashlib
from typing import Dict, List, Optional, Tuple, Set, Any
from datetime import datetime, timedelta
from dataclasses import dataclass
from collections import defaultdict
import asyncio
from src.common.logger import get_logger
from src.chat.memory_system.memory_chunk import (

View File

@@ -1,45 +1,42 @@
# -*- coding: utf-8 -*-
"""
增强记忆系统管理器
记忆系统管理器
替代原有的 Hippocampus instant_memory 系统
"""
import asyncio
import re
import time
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime
from dataclasses import dataclass
from src.common.logger import get_logger
from src.config.config import global_config
from src.chat.memory_system.enhanced_memory_core import EnhancedMemorySystem
from src.chat.memory_system.memory_system import MemorySystem
from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType
from src.chat.memory_system.enhanced_memory_adapter import (
initialize_enhanced_memory_system
from src.chat.memory_system.memory_system import (
initialize_memory_system
)
logger = get_logger(__name__)
@dataclass
class EnhancedMemoryResult:
"""增强记忆查询结果"""
class MemoryResult:
"""记忆查询结果"""
content: str
memory_type: str
confidence: float
importance: float
timestamp: float
source: str = "enhanced_memory"
source: str = "memory"
relevance_score: float = 0.0
structure: Dict[str, Any] | None = None
class EnhancedMemoryManager:
"""增强记忆系统管理器 - 替代原有的 HippocampusManager"""
class MemoryManager:
"""记忆系统管理器 - 替代原有的 HippocampusManager"""
def __init__(self):
self.enhanced_system: Optional[EnhancedMemorySystem] = None
self.memory_system: Optional[MemorySystem] = None
self.is_initialized = False
self.user_cache = {} # 用户记忆缓存
@@ -52,73 +49,73 @@ class EnhancedMemoryManager:
return cleaned
async def initialize(self):
"""初始化增强记忆系统"""
"""初始化记忆系统"""
if self.is_initialized:
return
try:
from src.config.config import global_config
# 检查是否启用增强记忆系统
if not global_config.memory.enable_enhanced_memory:
logger.info("增强记忆系统已禁用,跳过初始化")
# 检查是否启用记忆系统
if not global_config.memory.enable_memory:
logger.info("记忆系统已禁用,跳过初始化")
self.is_initialized = True
return
logger.info("正在初始化增强记忆系统...")
logger.info("正在初始化记忆系统...")
# 获取LLM模型
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
llm_model = LLMRequest(model_set=model_config.model_task_config.utils, request_type="memory")
# 初始化增强记忆系统
self.enhanced_system = await initialize_enhanced_memory_system(llm_model)
# 初始化记忆系统
self.memory_system = await initialize_memory_system(llm_model)
# 设置全局实例
global_enhanced_manager = self.enhanced_system
global_memory_manager = self.memory_system
self.is_initialized = True
logger.info("增强记忆系统初始化完成")
logger.info("✅ 记忆系统初始化完成")
except Exception as e:
logger.error(f"增强记忆系统初始化失败: {e}")
# 如果增强系统初始化失败,创建一个空的管理器避免系统崩溃
self.enhanced_system = None
logger.error(f"❌ 记忆系统初始化失败: {e}")
# 如果系统初始化失败,创建一个空的管理器避免系统崩溃
self.memory_system = None
self.is_initialized = True # 标记为已初始化但系统不可用
def get_hippocampus(self):
"""兼容原有接口 - 返回空"""
logger.debug("get_hippocampus 调用 - 增强记忆系统不使用此方法")
logger.debug("get_hippocampus 调用 - 记忆系统不使用此方法")
return {}
async def build_memory(self):
"""兼容原有接口 - 构建记忆"""
if not self.is_initialized or not self.enhanced_system:
if not self.is_initialized or not self.memory_system:
return
try:
# 增强记忆系统使用实时构建,不需要定时构建
logger.debug("build_memory 调用 - 增强记忆系统使用实时构建")
# 记忆系统使用实时构建,不需要定时构建
logger.debug("build_memory 调用 - 记忆系统使用实时构建")
except Exception as e:
logger.error(f"build_memory 失败: {e}")
async def forget_memory(self, percentage: float = 0.005):
"""兼容原有接口 - 遗忘机制"""
if not self.is_initialized or not self.enhanced_system:
if not self.is_initialized or not self.memory_system:
return
try:
# 增强记忆系统有内置的遗忘机制
logger.debug(f"forget_memory 调用 - 参数: {percentage}")
# 可以在这里调用增强系统的维护功能
await self.enhanced_system.maintenance()
await self.memory_system.maintenance()
except Exception as e:
logger.error(f"forget_memory 失败: {e}")
async def consolidate_memory(self):
"""兼容原有接口 - 记忆巩固"""
if not self.is_initialized or not self.enhanced_system:
if not self.is_initialized or not self.memory_system:
return
try:
@@ -138,7 +135,7 @@ class EnhancedMemoryManager:
keyword_weight: float = 1.0
) -> List[Tuple[str, str]]:
"""从文本获取相关记忆 - 兼容原有接口"""
if not self.is_initialized or not self.enhanced_system:
if not self.is_initialized or not self.memory_system:
return []
try:
@@ -148,7 +145,7 @@ class EnhancedMemoryManager:
"expected_memory_types": [MemoryType.PERSONAL_FACT, MemoryType.EVENT, MemoryType.PREFERENCE]
}
relevant_memories = await self.enhanced_system.retrieve_relevant_memories(
relevant_memories = await self.memory_system.retrieve_relevant_memories(
query=text,
user_id=user_id,
context=context,
@@ -177,7 +174,7 @@ class EnhancedMemoryManager:
max_depth: int = 3
) -> List[Tuple[str, str]]:
"""从关键词获取记忆 - 兼容原有接口"""
if not self.is_initialized or not self.enhanced_system:
if not self.is_initialized or not self.memory_system:
return []
try:
@@ -195,7 +192,7 @@ class EnhancedMemoryManager:
]
}
relevant_memories = await self.enhanced_system.retrieve_relevant_memories(
relevant_memories = await self.memory_system.retrieve_relevant_memories(
query_text=query_text,
user_id="default_user", # 可以根据实际需要传递
context=context,
@@ -218,7 +215,7 @@ class EnhancedMemoryManager:
def get_memory_from_keyword(self, keyword: str, max_depth: int = 2) -> list:
"""从单个关键词获取记忆 - 兼容原有接口"""
if not self.is_initialized or not self.enhanced_system:
if not self.is_initialized or not self.memory_system:
return []
try:
@@ -237,7 +234,7 @@ class EnhancedMemoryManager:
timestamp: Optional[float] = None
) -> List[MemoryChunk]:
"""处理对话并构建记忆 - 新增功能"""
if not self.is_initialized or not self.enhanced_system:
if not self.is_initialized or not self.memory_system:
return []
try:
@@ -246,7 +243,7 @@ class EnhancedMemoryManager:
if timestamp is not None:
payload_context.setdefault("timestamp", timestamp)
result = await self.enhanced_system.process_conversation_memory(payload_context)
result = await self.memory_system.process_conversation_memory(payload_context)
# 从结果中提取记忆块
memory_chunks = []
@@ -266,13 +263,13 @@ class EnhancedMemoryManager:
user_id: str,
context: Optional[Dict[str, Any]] = None,
limit: int = 5
) -> List[EnhancedMemoryResult]:
) -> List[MemoryResult]:
"""获取增强记忆上下文 - 新增功能"""
if not self.is_initialized or not self.enhanced_system:
if not self.is_initialized or not self.memory_system:
return []
try:
relevant_memories = await self.enhanced_system.retrieve_relevant_memories(
relevant_memories = await self.memory_system.retrieve_relevant_memories(
query=query_text,
user_id=None,
context=context or {},
@@ -282,7 +279,7 @@ class EnhancedMemoryManager:
results = []
for memory in relevant_memories:
formatted_content, structure = self._format_memory_chunk(memory)
result = EnhancedMemoryResult(
result = MemoryResult(
content=formatted_content,
memory_type=memory.memory_type.value,
confidence=memory.metadata.confidence.value,
@@ -500,12 +497,12 @@ class EnhancedMemoryManager:
return
try:
if self.enhanced_system:
await self.enhanced_system.shutdown()
logger.info("增强记忆系统已关闭")
if self.memory_system:
await self.memory_system.shutdown()
logger.info("✅ 记忆系统已关闭")
except Exception as e:
logger.error(f"关闭增强记忆系统失败: {e}")
logger.error(f"关闭记忆系统失败: {e}")
# 全局增强记忆管理器实例
enhanced_memory_manager = EnhancedMemoryManager()
# 全局记忆管理器实例
memory_manager = MemoryManager()

View File

@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
增强型精准记忆系统核心模块
精准记忆系统核心模块
1. 基于文档设计的高效记忆构建存储与召回优化系统覆盖构建向量化与多阶段检索全流程
2. 内置 LLM 查询规划器与嵌入维度自动解析机制直接从模型配置推断向量存储参数
"""
@@ -21,13 +21,11 @@ from src.config.config import model_config, global_config
from src.chat.memory_system.memory_chunk import MemoryChunk
from src.chat.memory_system.memory_builder import MemoryBuilder, MemoryExtractionError
from src.chat.memory_system.memory_fusion import MemoryFusionEngine
from src.chat.memory_system.vector_storage import VectorStorageManager, VectorStorageConfig
from src.chat.memory_system.metadata_index import MetadataIndexManager, IndexType
from src.chat.memory_system.multi_stage_retrieval import MultiStageRetrieval, RetrievalConfig
from src.chat.memory_system.memory_query_planner import MemoryQueryPlanner
if TYPE_CHECKING:
from src.common.data_models.database_data_model import DatabaseMessages
from src.chat.memory_system.memory_forgetting_engine import MemoryForgettingEngine
logger = get_logger(__name__)
@@ -121,8 +119,8 @@ class MemorySystemConfig:
)
class EnhancedMemorySystem:
"""增强型精准记忆系统核心类"""
class MemorySystem:
"""精准记忆系统核心类"""
def __init__(
self,
@@ -133,13 +131,12 @@ class EnhancedMemorySystem:
self.llm_model = llm_model
self.status = MemorySystemStatus.INITIALIZING
# 核心组件
# 核心组件(简化版)
self.memory_builder: MemoryBuilder = None
self.fusion_engine: MemoryFusionEngine = None
self.vector_storage: VectorStorageManager = None
self.metadata_index: MetadataIndexManager = None
self.retrieval_system: MultiStageRetrieval = None
self.unified_storage = None # 统一存储系统
self.query_planner: MemoryQueryPlanner = None
self.forgetting_engine: Optional[MemoryForgettingEngine] = None
# LLM模型
self.value_assessment_model: LLMRequest = None
@@ -156,12 +153,12 @@ class EnhancedMemorySystem:
# 记忆指纹缓存,用于快速检测重复记忆
self._memory_fingerprints: Dict[str, str] = {}
logger.info("EnhancedMemorySystem 初始化开始")
logger.info("MemorySystem 初始化开始")
async def initialize(self):
"""异步初始化记忆系统"""
try:
logger.info("正在初始化增强型记忆系统...")
logger.info("正在初始化记忆系统...")
# 初始化LLM模型
fallback_task = getattr(self.llm_model, "model_for_task", None) if self.llm_model else None
@@ -190,46 +187,68 @@ class EnhancedMemorySystem:
request_type="memory.extraction"
)
# 初始化核心组件
# 初始化核心组件(简化版)
self.memory_builder = MemoryBuilder(self.memory_extraction_model)
self.fusion_engine = MemoryFusionEngine(self.config.fusion_similarity_threshold)
# 创建向量存储配置
vector_config = VectorStorageConfig(
# 初始化统一存储系统
from src.chat.memory_system.unified_memory_storage import initialize_unified_memory_storage, UnifiedStorageConfig
storage_config = UnifiedStorageConfig(
dimension=self.config.vector_dimension,
similarity_threshold=self.config.similarity_threshold
similarity_threshold=self.config.similarity_threshold,
storage_path=getattr(global_config.memory, 'unified_storage_path', 'data/unified_memory'),
cache_size_limit=getattr(global_config.memory, 'unified_storage_cache_limit', 10000),
auto_save_interval=getattr(global_config.memory, 'unified_storage_auto_save_interval', 50),
enable_compression=getattr(global_config.memory, 'unified_storage_enable_compression', True),
enable_forgetting=getattr(global_config.memory, 'enable_memory_forgetting', True),
forgetting_check_interval=getattr(global_config.memory, 'forgetting_check_interval_hours', 24)
)
self.vector_storage = VectorStorageManager(vector_config)
# 尝试加载现有的向量数据
try:
await self.vector_storage.load_storage()
loaded_count = self.vector_storage.storage_stats.get("total_vectors", 0)
logger.info(f"✅ 向量存储数据加载完成,向量数量: {loaded_count}")
self.unified_storage = await initialize_unified_memory_storage(storage_config)
if self.unified_storage is None:
raise RuntimeError("统一存储系统初始化返回None")
logger.info("✅ 统一存储系统初始化成功")
except Exception as storage_error:
logger.error(f"❌ 统一存储系统初始化失败: {storage_error}", exc_info=True)
raise
# 如果没有加载到向量,尝试重建索引
if loaded_count == 0:
logger.info("向量存储为空,尝试从缓存重建...")
await self._rebuild_vector_storage_if_needed()
# 初始化遗忘引擎
from src.chat.memory_system.memory_forgetting_engine import MemoryForgettingEngine, ForgettingConfig
except Exception as e:
logger.warning(f"向量存储数据加载失败: {e},将使用空索引")
await self._rebuild_vector_storage_if_needed()
# 从全局配置创建遗忘引擎配置
forgetting_config = ForgettingConfig(
# 检查频率配置
check_interval_hours=getattr(global_config.memory, 'forgetting_check_interval_hours', 24),
batch_size=100, # 固定值,暂不配置
self.metadata_index = MetadataIndexManager()
# 创建检索配置
retrieval_config = RetrievalConfig(
metadata_filter_limit=self.config.coarse_recall_limit,
vector_search_limit=self.config.fine_recall_limit,
semantic_rerank_limit=self.config.semantic_rerank_limit,
final_result_limit=self.config.final_recall_limit,
vector_similarity_threshold=self.config.similarity_threshold,
semantic_similarity_threshold=self.config.semantic_similarity_threshold,
vector_weight=self.config.vector_weight,
semantic_weight=self.config.semantic_weight,
context_weight=self.config.context_weight,
recency_weight=self.config.recency_weight,
# 遗忘阈值配置
base_forgetting_days=getattr(global_config.memory, 'base_forgetting_days', 30.0),
min_forgetting_days=getattr(global_config.memory, 'min_forgetting_days', 7.0),
max_forgetting_days=getattr(global_config.memory, 'max_forgetting_days', 365.0),
# 重要程度权重
critical_importance_bonus=getattr(global_config.memory, 'critical_importance_bonus', 45.0),
high_importance_bonus=getattr(global_config.memory, 'high_importance_bonus', 30.0),
normal_importance_bonus=getattr(global_config.memory, 'normal_importance_bonus', 15.0),
low_importance_bonus=getattr(global_config.memory, 'low_importance_bonus', 0.0),
# 置信度权重
verified_confidence_bonus=getattr(global_config.memory, 'verified_confidence_bonus', 30.0),
high_confidence_bonus=getattr(global_config.memory, 'high_confidence_bonus', 20.0),
medium_confidence_bonus=getattr(global_config.memory, 'medium_confidence_bonus', 10.0),
low_confidence_bonus=getattr(global_config.memory, 'low_confidence_bonus', 0.0),
# 激活频率权重
activation_frequency_weight=getattr(global_config.memory, 'activation_frequency_weight', 0.5),
max_frequency_bonus=getattr(global_config.memory, 'max_frequency_bonus', 10.0),
# 休眠配置
dormant_threshold_days=getattr(global_config.memory, 'dormant_threshold_days', 90)
)
self.retrieval_system = MultiStageRetrieval(retrieval_config)
self.forgetting_engine = MemoryForgettingEngine(forgetting_config)
planner_task_config = getattr(model_config.model_task_config, "planner", None)
planner_model: Optional[LLMRequest] = None
@@ -246,13 +265,11 @@ class EnhancedMemorySystem:
default_limit=self.config.final_recall_limit
)
# 加载持久化数据
await self.vector_storage.load_storage()
await self.metadata_index.load_index()
self._populate_memory_fingerprints()
# 统一存储已经自动加载数据,无需额外加载
logger.info("✅ 简化版记忆系统初始化完成")
self.status = MemorySystemStatus.READY
logger.info("增强型记忆系统初始化完成")
logger.info("✅ 记忆系统初始化完成")
except Exception as e:
self.status = MemorySystemStatus.ERROR
@@ -266,7 +283,7 @@ class EnhancedMemorySystem:
context: Optional[Dict[str, Any]] = None,
limit: int = 5
) -> List[MemoryChunk]:
"""在构建记忆时检索相关记忆,允许在BUILDING状态下进行检索
"""在构建记忆时检索相关记忆,使用统一存储系统
Args:
query_text: 查询文本
@@ -280,19 +297,25 @@ class EnhancedMemorySystem:
logger.warning(f"记忆系统状态不允许检索: {self.status.value}")
return []
try:
# 临时切换到检索状态
original_status = self.status
self.status = MemorySystemStatus.RETRIEVING
if not self.unified_storage:
logger.warning("统一存储系统未初始化")
return []
# 执行检索
memories = await self.vector_storage.search_similar_memories(
try:
# 使用统一存储检索相似记忆
search_results = await self.unified_storage.search_similar_memories(
query_text=query_text,
limit=limit
limit=limit,
scope_id=user_id
)
# 恢复原始状态
self.status = original_status
# 转换为记忆对象
memories = []
for memory_id, similarity_score in search_results:
memory = self.unified_storage.get_memory_by_id(memory_id)
if memory:
memory.update_access() # 更新访问信息
memories.append(memory)
return memories
@@ -377,8 +400,8 @@ class EnhancedMemorySystem:
existing_candidates
)
# 4. 存储记忆
stored_count = await self._store_memories(fused_chunks)
# 4. 存储记忆到统一存储
stored_count = await self._store_memories_unified(fused_chunks)
# 4.1 控制台预览
self._log_memory_preview(fused_chunks)
@@ -391,15 +414,7 @@ class EnhancedMemorySystem:
build_time = time.time() - start_time
logger.info(
"✅ 生成 %d 条记忆,成功入库 %d 条,耗时 %.2f",
len(fused_chunks),
stored_count,
build_time,
extra={
"generated_count": len(fused_chunks),
"stored_count": stored_count,
"build_duration_seconds": round(build_time, 4),
},
f"✅ 生成 {len(fused_chunks)} 条记忆,成功入库 {stored_count} 条,耗时 {build_time:.2f}",
)
self.status = original_status
@@ -463,34 +478,31 @@ class EnhancedMemorySystem:
except Exception as exc:
logger.debug("构建记忆指纹失败,跳过候选收集: %s", exc)
# 基于主体索引的候选
subject_index = None
if self.metadata_index and hasattr(self.metadata_index, "indices"):
subject_index = self.metadata_index.indices.get(IndexType.SUBJECT)
if subject_index:
# 基于主体索引的候选(使用统一存储)
if self.unified_storage and self.unified_storage.keyword_index:
for memory in new_memories:
for subject in memory.subjects:
normalized = subject.strip().lower() if isinstance(subject, str) else ""
if not normalized:
continue
subject_candidates = subject_index.get(normalized)
subject_candidates = self.unified_storage.keyword_index.get(normalized)
if subject_candidates:
candidate_ids.update(subject_candidates)
# 基于向量搜索的候选
# 基于向量搜索的候选(使用统一存储)
total_vectors = 0
if self.vector_storage and hasattr(self.vector_storage, "storage_stats"):
total_vectors = self.vector_storage.storage_stats.get("total_vectors", 0) or 0
if self.unified_storage:
storage_stats = self.unified_storage.get_storage_stats()
total_vectors = storage_stats.get("total_vectors", 0) or 0
if self.vector_storage and total_vectors > 0:
if self.unified_storage and total_vectors > 0:
search_tasks = []
for memory in new_memories:
display_text = (memory.display or "").strip()
if not display_text:
continue
search_tasks.append(
self.vector_storage.search_similar_memories(
self.unified_storage.search_similar_memories(
query_text=display_text,
limit=8,
scope_id=GLOBAL_MEMORY_SCOPE
@@ -518,7 +530,7 @@ class EnhancedMemorySystem:
candidate_ids.add(memory_id)
existing_candidates: List[MemoryChunk] = []
cache = self.vector_storage.memory_cache if self.vector_storage else {}
cache = self.unified_storage.memory_cache if self.unified_storage else {}
for candidate_id in candidate_ids:
if candidate_id in new_memory_ids:
continue
@@ -597,24 +609,18 @@ class EnhancedMemorySystem:
limit: int = 5,
**kwargs
) -> List[MemoryChunk]:
"""检索相关记忆,兼容 query/query_text 参数形式"""
"""检索相关记忆(简化版,使用统一存储)"""
raw_query = query_text or kwargs.get("query")
if not raw_query:
raise ValueError("query_text 或 query 参数不能为空")
if not self.unified_storage:
logger.warning("统一存储系统未初始化")
return []
context = context or {}
resolved_user_id = GLOBAL_MEMORY_SCOPE
if self.retrieval_system is None or self.metadata_index is None:
raise RuntimeError("检索组件未初始化")
all_memories_cache = self.vector_storage.memory_cache
if not all_memories_cache:
logger.debug("记忆缓存为空,返回空结果")
self.last_retrieval_time = time.time()
self.status = MemorySystemStatus.READY
return []
self.status = MemorySystemStatus.RETRIEVING
start_time = time.time()
@@ -622,107 +628,50 @@ class EnhancedMemorySystem:
normalized_context = self._normalize_context(context, GLOBAL_MEMORY_SCOPE, None)
effective_limit = limit or self.config.final_recall_limit
query_plan = None
planner_ran = False
resolved_query_text = raw_query
# 构建过滤器
filters = {
"user_id": resolved_user_id
}
# 应用查询规划结果
if self.query_planner:
try:
planner_ran = True
query_plan = await self.query_planner.plan_query(raw_query, normalized_context)
normalized_context["query_plan"] = query_plan
effective_limit = min(effective_limit, query_plan.limit or effective_limit)
if getattr(query_plan, "memory_types", None):
filters["memory_types"] = [mt.value for mt in query_plan.memory_types]
if getattr(query_plan, "subject_includes", None):
filters["keywords"] = query_plan.subject_includes
if getattr(query_plan, "semantic_query", None):
resolved_query_text = query_plan.semantic_query
logger.debug(
"查询规划: semantic='%s', types=%s, subjects=%s, limit=%d",
query_plan.semantic_query,
[mt.value for mt in query_plan.memory_types],
query_plan.subject_includes,
query_plan.limit,
)
raw_query = query_plan.semantic_query
except Exception as plan_exc:
logger.warning("查询规划失败,使用默认检索策略: %s", plan_exc, exc_info=True)
effective_limit = effective_limit or self.config.final_recall_limit
effective_limit = max(1, min(effective_limit, self.config.final_recall_limit))
normalized_context["resolved_query_text"] = resolved_query_text
query_debug_payload = {
"raw_query": raw_query,
"semantic_query": resolved_query_text,
"limit": effective_limit,
"planner_used": planner_ran,
"memory_types": [mt.value for mt in (query_plan.memory_types if query_plan else [])],
"subjects": getattr(query_plan, "subject_includes", []) if query_plan else [],
"objects": getattr(query_plan, "object_includes", []) if query_plan else [],
"recency": getattr(query_plan, "recency_preference", None) if query_plan else None,
"optional_keywords": getattr(query_plan, "optional_keywords", []) if query_plan else [],
}
try:
logger.info(
f"🔍 记忆检索指令 | raw='{raw_query}' | semantic='{resolved_query_text}' | limit={effective_limit}",
extra={"memory_query": query_debug_payload},
)
except Exception:
logger.info(
"🔍 记忆检索指令: %s",
orjson.dumps(query_debug_payload, ensure_ascii=False).decode("utf-8"),
)
if normalized_context.get("__memory_building__"):
logger.debug("当前处于记忆构建流程,跳过查询规划并进行降级检索")
self.status = MemorySystemStatus.BUILDING
final_memories = []
candidate_memories = list(all_memories_cache.values())
candidate_memories.sort(key=lambda m: m.metadata.last_accessed, reverse=True)
final_memories = candidate_memories[:effective_limit]
else:
retrieval_result = await self.retrieval_system.retrieve_memories(
query=resolved_query_text,
user_id=resolved_user_id,
context=normalized_context,
metadata_index=self.metadata_index,
vector_storage=self.vector_storage,
all_memories_cache=all_memories_cache,
# 使用统一存储搜索
search_results = await self.unified_storage.search_similar_memories(
query_text=raw_query,
limit=effective_limit,
filters=filters
)
final_memories = retrieval_result.final_memories
for memory in final_memories:
# 转换为记忆对象
final_memories = []
for memory_id, similarity_score in search_results:
memory = self.unified_storage.get_memory_by_id(memory_id)
if memory:
memory.update_access()
cache_entry = self.metadata_index.memory_metadata_cache.get(memory.memory_id)
if cache_entry is not None:
cache_entry["last_accessed"] = memory.metadata.last_accessed
cache_entry["access_count"] = memory.metadata.access_count
cache_entry["relevance_score"] = memory.metadata.relevance_score
final_memories.append(memory)
retrieval_time = time.time() - start_time
plan_summary = ""
if planner_ran and query_plan:
plan_types = ",".join(mt.value for mt in query_plan.memory_types) or "-"
plan_subjects = ",".join(query_plan.subject_includes) or "-"
plan_summary = (
f" | planner.semantic='{query_plan.semantic_query}'"
f" | planner.limit={query_plan.limit}"
f" | planner.types={plan_types}"
f" | planner.subjects={plan_subjects}"
)
log_message = (
"✅ 记忆检索完成"
logger.info(
"简化记忆检索完成"
f" | user={resolved_user_id}"
f" | count={len(final_memories)}"
f" | duration={retrieval_time:.3f}s"
f" | applied_limit={effective_limit}"
f" | raw_query='{raw_query}'"
f" | semantic_query='{resolved_query_text}'"
f"{plan_summary}"
f" | query='{raw_query}'"
)
logger.info(log_message)
self.last_retrieval_time = time.time()
self.status = MemorySystemStatus.READY
@@ -1049,58 +998,30 @@ class EnhancedMemorySystem:
logger.error(f"信息价值评估失败: {e}", exc_info=True)
return 0.5 # 默认中等价值
async def _store_memories_unified(self, memory_chunks: List[MemoryChunk]) -> int:
"""使用统一存储系统存储记忆块"""
if not memory_chunks or not self.unified_storage:
return 0
try:
# 直接存储到统一存储系统
stored_count = await self.unified_storage.store_memories(memory_chunks)
logger.debug(
"统一存储成功存储 %d 条记忆",
stored_count,
)
return stored_count
except Exception as e:
logger.error(f"统一存储记忆失败: {e}", exc_info=True)
return 0
# 保留原有方法以兼容旧代码
async def _store_memories(self, memory_chunks: List[MemoryChunk]) -> int:
"""存储记忆块到各个存储系统,返回成功入库数量"""
if not memory_chunks:
return 0
unique_memories: List[MemoryChunk] = []
skipped_duplicates = 0
for memory in memory_chunks:
fingerprint = self._build_memory_fingerprint(memory)
key = self._fingerprint_key(memory.user_id, fingerprint)
existing_id = self._memory_fingerprints.get(key)
if existing_id:
existing = self.vector_storage.memory_cache.get(existing_id)
if existing:
self._merge_existing_memory(existing, memory)
await self.metadata_index.update_memory_entry(existing)
skipped_duplicates += 1
logger.debug(
"检测到重复记忆,已合并到现有记录 | memory_id=%s",
existing.memory_id,
)
continue
else:
# 指纹存在但缓存缺失,视为新记忆并覆盖旧映射
logger.debug("检测到过期指纹映射,重写现有条目")
unique_memories.append(memory)
if not unique_memories:
if skipped_duplicates:
logger.info("本次记忆全部与现有内容重复,跳过入库")
return 0
# 并行存储到向量数据库和元数据索引
storage_tasks = [
self.vector_storage.store_memories(unique_memories),
self.metadata_index.index_memories(unique_memories),
]
await asyncio.gather(*storage_tasks, return_exceptions=True)
self._register_memory_fingerprints(unique_memories)
logger.debug(
"成功存储 %d 条记忆(跳过重复 %d 条)",
len(unique_memories),
skipped_duplicates,
)
return len(unique_memories)
"""兼容性方法:重定向到统一存储"""
return await self._store_memories_unified(memory_chunks)
def _merge_existing_memory(self, existing: MemoryChunk, incoming: MemoryChunk) -> None:
"""将新记忆的信息合并到已存在的记忆中"""
@@ -1142,7 +1063,7 @@ class EnhancedMemorySystem:
def _populate_memory_fingerprints(self) -> None:
"""基于当前缓存构建记忆指纹映射"""
self._memory_fingerprints.clear()
for memory in self.vector_storage.memory_cache.values():
for memory in self.unified_storage.memory_cache.values():
fingerprint = self._build_memory_fingerprint(memory)
key = self._fingerprint_key(memory.user_id, fingerprint)
self._memory_fingerprints[key] = memory.memory_id
@@ -1219,34 +1140,41 @@ class EnhancedMemorySystem:
return {token for token in tokens if len(token) > 1}
async def maintenance(self):
"""系统维护操作"""
"""系统维护操作(简化版)"""
try:
logger.info("开始记忆系统维护...")
logger.info("开始简化记忆系统维护...")
# 向量存储优化
await self.vector_storage.optimize_storage()
# 执行遗忘检查
if self.unified_storage and self.forgetting_engine:
forgetting_result = await self.unified_storage.perform_forgetting_check()
if "error" not in forgetting_result:
logger.info(f"遗忘检查完成: {forgetting_result.get('stats', {})}")
else:
logger.warning(f"遗忘检查失败: {forgetting_result['error']}")
# 元数据索引优化
await self.metadata_index.optimize_index()
# 保存存储数据
if self.unified_storage:
await self.unified_storage.save_storage()
# 记忆融合引擎维护
if self.fusion_engine:
await self.fusion_engine.maintenance()
logger.info("✅ 记忆系统维护完成")
logger.info("简化记忆系统维护完成")
except Exception as e:
logger.error(f"❌ 记忆系统维护失败: {e}", exc_info=True)
async def shutdown(self):
"""关闭系统"""
"""关闭系统(简化版)"""
try:
logger.info("正在关闭增强型记忆系统...")
logger.info("正在关闭简化记忆系统...")
# 保存持久化数据
await self.vector_storage.save_storage()
await self.metadata_index.save_index()
# 保存统一存储数据
if self.unified_storage:
await self.unified_storage.cleanup()
logger.info("增强型记忆系统已关闭")
logger.info("简化记忆系统已关闭")
except Exception as e:
logger.error(f"❌ 记忆系统关闭失败: {e}", exc_info=True)
@@ -1255,15 +1183,15 @@ class EnhancedMemorySystem:
"""重建向量存储(如果需要)"""
try:
# 检查是否有记忆缓存数据
if not hasattr(self.vector_storage, 'memory_cache') or not self.vector_storage.memory_cache:
if not hasattr(self.unified_storage, 'memory_cache') or not self.unified_storage.memory_cache:
logger.info("无记忆缓存数据,跳过向量存储重建")
return
logger.info(f"开始重建向量存储,记忆数量: {len(self.vector_storage.memory_cache)}")
logger.info(f"开始重建向量存储,记忆数量: {len(self.unified_storage.memory_cache)}")
# 收集需要重建向量的记忆
memories_to_rebuild = []
for memory_id, memory in self.vector_storage.memory_cache.items():
for memory_id, memory in self.unified_storage.memory_cache.items():
# 检查记忆是否有有效的 display 文本
if memory.display and memory.display.strip():
memories_to_rebuild.append(memory)
@@ -1283,7 +1211,7 @@ class EnhancedMemorySystem:
for i in range(0, len(memories_to_rebuild), batch_size):
batch = memories_to_rebuild[i:i + batch_size]
try:
await self.vector_storage.store_memories(batch)
await self.unified_storage.store_memories(batch)
rebuild_count += len(batch)
if rebuild_count % 50 == 0:
@@ -1294,9 +1222,9 @@ class EnhancedMemorySystem:
continue
# 保存重建的向量存储
await self.vector_storage.save_storage()
await self.unified_storage.save_storage()
final_count = self.vector_storage.storage_stats.get("total_vectors", 0)
final_count = self.unified_storage.storage_stats.get("total_vectors", 0)
logger.info(f"✅ 向量存储重建完成,最终向量数量: {final_count}")
except Exception as e:
@@ -1304,21 +1232,21 @@ class EnhancedMemorySystem:
# 全局记忆系统实例
enhanced_memory_system: EnhancedMemorySystem = None
memory_system: MemorySystem = None
def get_enhanced_memory_system() -> EnhancedMemorySystem:
def get_memory_system() -> MemorySystem:
"""获取全局记忆系统实例"""
global enhanced_memory_system
if enhanced_memory_system is None:
enhanced_memory_system = EnhancedMemorySystem()
return enhanced_memory_system
global memory_system
if memory_system is None:
memory_system = MemorySystem()
return memory_system
async def initialize_enhanced_memory_system():
async def initialize_memory_system(llm_model: Optional[LLMRequest] = None):
"""初始化全局记忆系统"""
global enhanced_memory_system
if enhanced_memory_system is None:
enhanced_memory_system = EnhancedMemorySystem()
await enhanced_memory_system.initialize()
return enhanced_memory_system
global memory_system
if memory_system is None:
memory_system = MemorySystem(llm_model=llm_model)
await memory_system.initialize()
return memory_system

View File

@@ -0,0 +1,577 @@
# -*- coding: utf-8 -*-
"""
统一记忆存储系统
简化后的记忆存储,整合向量存储和元数据索引
"""
import os
import time
import orjson
import asyncio
import threading
from typing import Dict, List, Optional, Tuple, Set, Any
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import numpy as np
from src.common.logger import get_logger
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config, global_config
from src.chat.memory_system.memory_chunk import MemoryChunk
from src.chat.memory_system.memory_forgetting_engine import MemoryForgettingEngine
logger = get_logger(__name__)
# 尝试导入FAISS
try:
import faiss
FAISS_AVAILABLE = True
except ImportError:
FAISS_AVAILABLE = False
logger.warning("FAISS not available, using simple vector storage")
@dataclass
class UnifiedStorageConfig:
"""统一存储配置"""
# 向量存储配置
dimension: int = 1024
similarity_threshold: float = 0.8
storage_path: str = "data/unified_memory"
# 性能配置
cache_size_limit: int = 10000
auto_save_interval: int = 50
search_limit: int = 20
enable_compression: bool = True
# 遗忘配置
enable_forgetting: bool = True
forgetting_check_interval: int = 24 # 小时
class UnifiedMemoryStorage:
"""统一记忆存储系统"""
def __init__(self, config: Optional[UnifiedStorageConfig] = None):
self.config = config or UnifiedStorageConfig()
# 存储路径
self.storage_path = Path(self.config.storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
# 向量索引
self.vector_index = None
self.memory_id_to_index: Dict[str, int] = {}
self.index_to_memory_id: Dict[int, str] = {}
# 内存缓存
self.memory_cache: Dict[str, MemoryChunk] = {}
self.vector_cache: Dict[str, np.ndarray] = {}
# 元数据索引(简化版)
self.keyword_index: Dict[str, Set[str]] = {} # keyword -> memory_id set
self.type_index: Dict[str, Set[str]] = {} # type -> memory_id set
self.user_index: Dict[str, Set[str]] = {} # user_id -> memory_id set
# 遗忘引擎
self.forgetting_engine: Optional[MemoryForgettingEngine] = None
if self.config.enable_forgetting:
self.forgetting_engine = MemoryForgettingEngine()
# 统计信息
self.stats = {
"total_memories": 0,
"total_vectors": 0,
"cache_size": 0,
"last_save_time": 0.0,
"total_searches": 0,
"total_stores": 0,
"forgetting_stats": {}
}
# 线程锁
self._lock = threading.RLock()
self._operation_count = 0
# 嵌入模型
self.embedding_model: Optional[LLMRequest] = None
# 初始化
self._initialize_storage()
def _initialize_storage(self):
"""初始化存储系统"""
try:
# 初始化向量索引
if FAISS_AVAILABLE:
self.vector_index = faiss.IndexFlatIP(self.config.dimension)
logger.info(f"FAISS向量索引初始化完成维度: {self.config.dimension}")
else:
# 简单向量存储
self.vector_index = {}
logger.info("使用简单向量存储FAISS不可用")
# 尝试加载现有数据
self._load_storage()
logger.info(f"统一记忆存储初始化完成,当前记忆数: {len(self.memory_cache)}")
except Exception as e:
logger.error(f"存储系统初始化失败: {e}", exc_info=True)
def set_embedding_model(self, model: LLMRequest):
"""设置嵌入模型"""
self.embedding_model = model
async def _generate_embedding(self, text: str) -> Optional[np.ndarray]:
"""生成文本的向量表示"""
if not self.embedding_model:
logger.warning("未设置嵌入模型,无法生成向量")
return None
try:
# 使用嵌入模型生成向量
response, _ = await self.embedding_model.generate_response_async(
f"请为以下文本生成语义向量表示:{text}",
temperature=0.1
)
# 这里需要实际的嵌入模型调用逻辑
# 暂时返回随机向量作为占位符
embedding = np.random.random(self.config.dimension).astype(np.float32)
# 归一化向量
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
return embedding
except Exception as e:
logger.error(f"生成向量失败: {e}")
return None
def _add_to_keyword_index(self, memory: MemoryChunk):
"""添加到关键词索引"""
for keyword in memory.keywords:
if keyword not in self.keyword_index:
self.keyword_index[keyword] = set()
self.keyword_index[keyword].add(memory.memory_id)
def _add_to_type_index(self, memory: MemoryChunk):
"""添加到类型索引"""
memory_type = memory.memory_type.value
if memory_type not in self.type_index:
self.type_index[memory_type] = set()
self.type_index[memory_type].add(memory.memory_id)
def _add_to_user_index(self, memory: MemoryChunk):
"""添加到用户索引"""
user_id = memory.user_id
if user_id not in self.user_index:
self.user_index[user_id] = set()
self.user_index[user_id].add(memory.memory_id)
def _remove_from_indexes(self, memory: MemoryChunk):
"""从所有索引中移除记忆"""
memory_id = memory.memory_id
# 从关键词索引移除
for keyword, memory_ids in self.keyword_index.items():
memory_ids.discard(memory_id)
if not memory_ids:
del self.keyword_index[keyword]
# 从类型索引移除
memory_type = memory.memory_type.value
if memory_type in self.type_index:
self.type_index[memory_type].discard(memory_id)
if not self.type_index[memory_type]:
del self.type_index[memory_type]
# 从用户索引移除
if memory.user_id in self.user_index:
self.user_index[memory.user_id].discard(memory_id)
if not self.user_index[memory.user_id]:
del self.user_index[memory.user_id]
async def store_memories(self, memories: List[MemoryChunk]) -> int:
"""存储记忆列表"""
if not memories:
return 0
stored_count = 0
with self._lock:
for memory in memories:
try:
# 生成向量
vector = None
if memory.display and memory.display.strip():
vector = await self._generate_embedding(memory.display)
elif memory.text_content and memory.text_content.strip():
vector = await self._generate_embedding(memory.text_content)
# 存储到缓存
self.memory_cache[memory.memory_id] = memory
if vector is not None:
self.vector_cache[memory.memory_id] = vector
# 添加到向量索引
if FAISS_AVAILABLE:
index_id = self.vector_index.ntotal
self.vector_index.add(vector.reshape(1, -1))
self.memory_id_to_index[memory.memory_id] = index_id
self.index_to_memory_id[index_id] = memory.memory_id
else:
# 简单存储
self.vector_index[memory.memory_id] = vector
# 更新元数据索引
self._add_to_keyword_index(memory)
self._add_to_type_index(memory)
self._add_to_user_index(memory)
stored_count += 1
except Exception as e:
logger.error(f"存储记忆 {memory.memory_id[:8]} 失败: {e}")
continue
# 更新统计
self.stats["total_memories"] = len(self.memory_cache)
self.stats["total_vectors"] = len(self.vector_cache)
self.stats["total_stores"] += stored_count
# 自动保存
self._operation_count += stored_count
if self._operation_count >= self.config.auto_save_interval:
await self._save_storage()
self._operation_count = 0
logger.debug(f"成功存储 {stored_count}/{len(memories)} 条记忆")
return stored_count
async def search_similar_memories(
self,
query_text: str,
limit: int = 10,
scope_id: Optional[str] = None,
filters: Optional[Dict[str, Any]] = None
) -> List[Tuple[str, float]]:
"""搜索相似记忆"""
if not query_text or not self.vector_cache:
return []
# 生成查询向量
query_vector = await self._generate_embedding(query_text)
if query_vector is None:
return []
try:
results = []
if FAISS_AVAILABLE and self.vector_index.ntotal > 0:
# 使用FAISS搜索
query_vector = query_vector.reshape(1, -1)
scores, indices = self.vector_index.search(
query_vector,
min(limit, self.vector_index.ntotal)
)
for score, idx in zip(scores[0], indices[0]):
if idx >= 0 and score >= self.config.similarity_threshold:
memory_id = self.index_to_memory_id.get(idx)
if memory_id and memory_id in self.memory_cache:
# 应用过滤器
if self._apply_filters(self.memory_cache[memory_id], filters):
results.append((memory_id, float(score)))
else:
# 简单余弦相似度搜索
for memory_id, vector in self.vector_cache.items():
if memory_id not in self.memory_cache:
continue
# 计算余弦相似度
similarity = np.dot(query_vector, vector)
if similarity >= self.config.similarity_threshold:
# 应用过滤器
if self._apply_filters(self.memory_cache[memory_id], filters):
results.append((memory_id, float(similarity)))
# 排序并限制结果
results.sort(key=lambda x: x[1], reverse=True)
results = results[:limit]
self.stats["total_searches"] += 1
return results
except Exception as e:
logger.error(f"搜索相似记忆失败: {e}")
return []
def _apply_filters(self, memory: MemoryChunk, filters: Optional[Dict[str, Any]]) -> bool:
"""应用搜索过滤器"""
if not filters:
return True
# 用户过滤器
if "user_id" in filters and memory.user_id != filters["user_id"]:
return False
# 类型过滤器
if "memory_types" in filters and memory.memory_type.value not in filters["memory_types"]:
return False
# 关键词过滤器
if "keywords" in filters:
memory_keywords = set(k.lower() for k in memory.keywords)
filter_keywords = set(k.lower() for k in filters["keywords"])
if not memory_keywords.intersection(filter_keywords):
return False
# 重要性过滤器
if "min_importance" in filters and memory.metadata.importance.value < filters["min_importance"]:
return False
return True
def get_memory_by_id(self, memory_id: str) -> Optional[MemoryChunk]:
"""根据ID获取记忆"""
return self.memory_cache.get(memory_id)
def get_memories_by_filters(self, filters: Dict[str, Any], limit: int = 50) -> List[MemoryChunk]:
"""根据过滤器获取记忆"""
results = []
for memory in self.memory_cache.values():
if self._apply_filters(memory, filters):
results.append(memory)
if len(results) >= limit:
break
return results
async def forget_memories(self, memory_ids: List[str]) -> int:
"""遗忘指定的记忆"""
if not memory_ids:
return 0
forgotten_count = 0
with self._lock:
for memory_id in memory_ids:
try:
memory = self.memory_cache.get(memory_id)
if not memory:
continue
# 从向量索引移除
if FAISS_AVAILABLE and memory_id in self.memory_id_to_index:
# FAISS不支持直接删除这里简化处理
# 在实际使用中,可能需要重建索引
logger.debug(f"FAISS索引删除 {memory_id} (需要重建索引)")
elif memory_id in self.vector_index:
del self.vector_index[memory_id]
# 从缓存移除
self.memory_cache.pop(memory_id, None)
self.vector_cache.pop(memory_id, None)
# 从索引移除
self._remove_from_indexes(memory)
forgotten_count += 1
except Exception as e:
logger.error(f"遗忘记忆 {memory_id[:8]} 失败: {e}")
continue
# 更新统计
self.stats["total_memories"] = len(self.memory_cache)
self.stats["total_vectors"] = len(self.vector_cache)
logger.info(f"成功遗忘 {forgotten_count}/{len(memory_ids)} 条记忆")
return forgotten_count
async def perform_forgetting_check(self) -> Dict[str, Any]:
"""执行遗忘检查"""
if not self.forgetting_engine:
return {"error": "遗忘引擎未启用"}
try:
# 执行遗忘检查
result = await self.forgetting_engine.perform_forgetting_check(list(self.memory_cache.values()))
# 遗忘标记的记忆
forgetting_ids = result["normal_forgetting"] + result["force_forgetting"]
if forgetting_ids:
forgotten_count = await self.forget_memories(forgetting_ids)
result["forgotten_count"] = forgotten_count
# 更新统计
self.stats["forgetting_stats"] = self.forgetting_engine.get_forgetting_stats()
return result
except Exception as e:
logger.error(f"执行遗忘检查失败: {e}")
return {"error": str(e)}
def _load_storage(self):
"""加载存储数据"""
try:
# 加载记忆缓存
memory_file = self.storage_path / "memory_cache.json"
if memory_file.exists():
with open(memory_file, 'rb') as f:
memory_data = orjson.loads(f.read())
for memory_id, memory_dict in memory_data.items():
self.memory_cache[memory_id] = MemoryChunk.from_dict(memory_dict)
# 加载向量缓存(如果启用压缩)
if not self.config.enable_compression:
vector_file = self.storage_path / "vectors.npz"
if vector_file.exists():
vectors = np.load(vector_file)
self.vector_cache = {
memory_id: vectors[memory_id]
for memory_id in vectors.files
if memory_id in self.memory_cache
}
# 重建向量索引
if FAISS_AVAILABLE and self.vector_cache:
logger.info("重建FAISS向量索引...")
vectors = []
memory_ids = []
for memory_id, vector in self.vector_cache.items():
vectors.append(vector)
memory_ids.append(memory_id)
if vectors:
vectors_array = np.vstack(vectors)
self.vector_index.reset()
self.vector_index.add(vectors_array)
# 重建映射
for idx, memory_id in enumerate(memory_ids):
self.memory_id_to_index[memory_id] = idx
self.index_to_memory_id[idx] = memory_id
logger.info(f"存储数据加载完成,记忆数: {len(self.memory_cache)}")
except Exception as e:
logger.warning(f"加载存储数据失败: {e}")
async def _save_storage(self):
"""保存存储数据"""
try:
start_time = time.time()
# 保存记忆缓存
memory_data = {
memory_id: memory.to_dict()
for memory_id, memory in self.memory_cache.items()
}
memory_file = self.storage_path / "memory_cache.json"
with open(memory_file, 'wb') as f:
f.write(orjson.dumps(memory_data, option=orjson.OPT_INDENT_2))
# 保存向量缓存(如果启用压缩)
if not self.config.enable_compression and self.vector_cache:
vector_file = self.storage_path / "vectors.npz"
np.savez_compressed(vector_file, **self.vector_cache)
save_time = time.time() - start_time
self.stats["last_save_time"] = time.time()
logger.debug(f"存储数据保存完成,耗时: {save_time:.3f}s")
except Exception as e:
logger.error(f"保存存储数据失败: {e}")
async def save_storage(self):
"""手动保存存储数据"""
await self._save_storage()
def get_storage_stats(self) -> Dict[str, Any]:
"""获取存储统计信息"""
stats = self.stats.copy()
stats.update({
"cache_size": len(self.memory_cache),
"vector_count": len(self.vector_cache),
"keyword_index_size": len(self.keyword_index),
"type_index_size": len(self.type_index),
"user_index_size": len(self.user_index),
"config": {
"dimension": self.config.dimension,
"similarity_threshold": self.config.similarity_threshold,
"enable_forgetting": self.config.enable_forgetting
}
})
return stats
async def cleanup(self):
"""清理存储系统"""
try:
logger.info("开始清理统一记忆存储...")
# 保存数据
await self._save_storage()
# 清空缓存
self.memory_cache.clear()
self.vector_cache.clear()
self.keyword_index.clear()
self.type_index.clear()
self.user_index.clear()
# 重置索引
if FAISS_AVAILABLE:
self.vector_index.reset()
self.memory_id_to_index.clear()
self.index_to_memory_id.clear()
logger.info("统一记忆存储清理完成")
except Exception as e:
logger.error(f"清理存储系统失败: {e}")
# 创建全局存储实例
unified_memory_storage: Optional[UnifiedMemoryStorage] = None
def get_unified_memory_storage() -> Optional[UnifiedMemoryStorage]:
"""获取统一存储实例"""
return unified_memory_storage
async def initialize_unified_memory_storage(config: Optional[UnifiedStorageConfig] = None) -> UnifiedMemoryStorage:
"""初始化统一记忆存储"""
global unified_memory_storage
if unified_memory_storage is None:
unified_memory_storage = UnifiedMemoryStorage(config)
# 设置嵌入模型
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
try:
embedding_task = getattr(model_config.model_task_config, "embedding", None)
if embedding_task:
unified_memory_storage.set_embedding_model(
LLMRequest(model_set=embedding_task, request_type="memory.embedding")
)
except Exception as e:
logger.warning(f"设置嵌入模型失败: {e}")
return unified_memory_storage

View File

@@ -473,10 +473,10 @@ class DefaultReplyer:
running_memories = []
instant_memory = None
if global_config.memory.enable_instant_memory:
if global_config.memory.enable_memory:
try:
# 使用新的增强记忆系统
from src.chat.memory_system.enhanced_memory_integration import recall_memories, remember_message
# 使用新的统一记忆系统
from src.chat.memory_system import get_memory_system
stream = self.chat_stream
user_info_obj = getattr(stream, "user_info", None)
@@ -570,33 +570,37 @@ class DefaultReplyer:
memory_context = {key: value for key, value in memory_context.items() if value}
# 获取记忆系统实例
memory_system = get_memory_system()
# 检索相关记忆
enhanced_memories = await recall_memories(
enhanced_memories = await memory_system.retrieve_relevant_memories(
query=target,
user_id=memory_user_id,
chat_id=stream.stream_id,
context=memory_context
scope_id=stream.stream_id,
context=memory_context,
limit=10
)
# 注意:记忆存储已迁移到回复生成完成后进行,不在查询阶段执行
# 转换格式以兼容现有代码
running_memories = []
if enhanced_memories and enhanced_memories.get("has_memories"):
for memory in enhanced_memories.get("memories", []):
if enhanced_memories:
for memory_chunk in enhanced_memories:
running_memories.append({
"content": memory.get("content", ""),
"memory_type": memory.get("type", "unknown"),
"confidence": memory.get("confidence"),
"importance": memory.get("importance"),
"relevance": memory.get("relevance"),
"source": memory.get("source"),
"structure": memory.get("structure"),
"content": memory_chunk.display or memory_chunk.text_content or "",
"memory_type": memory_chunk.memory_type.value,
"confidence": memory_chunk.metadata.confidence.value,
"importance": memory_chunk.metadata.importance.value,
"relevance": getattr(memory_chunk, 'relevance_score', 0.5),
"source": memory_chunk.metadata.source,
"structure": memory_chunk.content_structure.value if memory_chunk.content_structure else "unknown",
})
# 构建瞬时记忆字符串
if enhanced_memories and enhanced_memories.get("has_memories"):
top_memory = enhanced_memories.get("memories", [])[:1]
if running_memories:
top_memory = running_memories[:1]
if top_memory:
instant_memory = top_memory[0].get("content", "")
@@ -621,26 +625,45 @@ class DefaultReplyer:
rounded = int(value)
return mapping.get(rounded, f"{value:.2f}")
# 构建记忆字符串,即使某种记忆为空也要继续
# 构建记忆字符串,使用方括号格式
memory_str = ""
has_any_memory = False
# 添加长期记忆(来自增强记忆系统)
if running_memories:
if not memory_str:
memory_str = "以下是当前在聊天中,你回忆起的记忆:\n"
for running_memory in running_memories:
details = []
details.append(f"类型: {running_memory.get('memory_type', 'unknown')}")
if running_memory.get("confidence") is not None:
details.append(f"置信度: {_format_confidence_label(running_memory.get('confidence'))}")
if running_memory.get("importance") is not None:
details.append(f"重要性: {_format_importance_label(running_memory.get('importance'))}")
if running_memory.get("relevance") is not None:
details.append(f"相关度: {running_memory['relevance']:.2f}")
# 使用方括号格式
memory_parts = ["### 🧠 相关记忆 (Relevant Memories)", ""]
detail_text = f" {''.join(details)}" if details else ""
memory_str += f"- {running_memory['content']}{detail_text}\n"
# 按相关度排序,并记录相关度信息用于调试
sorted_memories = sorted(running_memories, key=lambda x: x.get('relevance', 0.0), reverse=True)
# 调试相关度信息
relevance_info = [(m.get('memory_type', 'unknown'), m.get('relevance', 0.0)) for m in sorted_memories]
logger.debug(f"记忆相关度信息: {relevance_info}")
for running_memory in sorted_memories:
content = running_memory.get('content', '')
memory_type = running_memory.get('memory_type', 'unknown')
# 映射记忆类型到中文标签
type_mapping = {
"personal_fact": "个人事实",
"preference": "偏好",
"event": "事件",
"opinion": "观点",
"relationship": "个人事实",
"unknown": "未知"
}
chinese_type = type_mapping.get(memory_type, "未知")
# 提取纯净内容(如果包含旧格式的元数据)
clean_content = content
if "(类型:" in content and "" in content:
clean_content = content.split("(类型:")[0].strip()
memory_parts.append(f"- **[{chinese_type}]** {clean_content}")
memory_str = "\n".join(memory_parts) + "\n"
has_any_memory = True
# 添加瞬时记忆
@@ -1684,11 +1707,11 @@ class DefaultReplyer:
reply_message: 回复的原始消息
"""
try:
if not global_config.memory.enable_memory or not global_config.memory.enable_instant_memory:
if not global_config.memory.enable_memory:
return
# 使用增强记忆系统存储记忆
from src.chat.memory_system.enhanced_memory_integration import remember_message
# 使用统一记忆系统存储记忆
from src.chat.memory_system import get_memory_system
stream = self.chat_stream
user_info_obj = getattr(stream, "user_info", None)
@@ -1798,12 +1821,15 @@ class DefaultReplyer:
)
# 异步存储聊天历史(完全非阻塞)
memory_system = get_memory_system()
asyncio.create_task(
remember_message(
message=chat_history,
user_id=memory_user_id,
chat_id=stream.stream_id,
context=memory_context
memory_system.process_conversation_memory(
context={
"conversation_text": chat_history,
"user_id": memory_user_id,
"scope_id": stream.stream_id,
**memory_context
}
)
)

View File

@@ -584,13 +584,38 @@ class Prompt:
# 使用记忆格式化器进行格式化
from src.chat.memory_system.memory_formatter import format_memories_bracket_style
# 转换记忆数据格式
# 转换记忆数据格式 - 修复字段映射
formatted_memories = []
for memory in running_memories:
# 从 content 字段提取 display 内容,移除括号中的元数据
content = memory.get("content", "")
display_text = content
# 如果包含元数据括号,提取纯文本部分
if "(类型:" in content and "" in content:
display_text = content.split("(类型:")[0].strip()
# 映射 topic 到 memory_type
topic = memory.get("topic", "personal_fact")
memory_type_mapping = {
"relationship": "personal_fact",
"opinion": "opinion",
"personal_fact": "personal_fact",
"preference": "preference",
"event": "event"
}
mapped_type = memory_type_mapping.get(topic, "personal_fact")
formatted_memories.append({
"display": memory.get("display", memory.get("content", "")),
"memory_type": memory.get("memory_type", "personal_fact"),
"metadata": memory.get("metadata", {})
"display": display_text,
"memory_type": mapped_type,
"metadata": {
"confidence": memory.get("confidence", "未知"),
"importance": memory.get("importance", "一般"),
"timestamp": memory.get("timestamp", ""),
"source": memory.get("source", "unknown"),
"relevance_score": memory.get("relevance_score", 0.0)
}
})
# 使用方括号格式格式化记忆
@@ -600,10 +625,15 @@ class Prompt:
)
except Exception as e:
logger.warning(f"记忆格式化失败,使用简化格式: {e}")
# 备用简化格式
memory_parts = ["以下是当前在聊天中,你回忆起的记忆:"]
# 备用简化格式 - 提取纯净内容
memory_parts = ["## 相关记忆回顾", ""]
for memory in running_memories:
content = memory["content"]
content = memory.get("content", "")
# 提取纯文本内容
if "(类型:" in content and "" in content:
clean_content = content.split("(类型:")[0].strip()
memory_parts.append(f"- {clean_content}")
else:
memory_parts.append(f"- {content}")
memory_block = "\n".join(memory_parts)
else:

View File

@@ -441,15 +441,8 @@ class EmojiConfig(ValidatedConfigBase):
class MemoryConfig(ValidatedConfigBase):
"""记忆配置类"""
enable_memory: bool = Field(default=True, description="启用记忆")
memory_build_interval: int = Field(default=600, description="记忆构建间隔")
enable_instant_memory: bool = Field(default=True, description="启用即时记忆")
enable_llm_instant_memory: bool = Field(default=True, description="启用基于LLM的瞬时记忆")
enable_vector_instant_memory: bool = Field(default=True, description="启用基于向量的瞬时记忆")
# 增强记忆系统配置
enable_enhanced_memory: bool = Field(default=True, description="启用增强记忆系统")
enhanced_memory_auto_save: bool = Field(default=True, description="自动保存增强记忆")
enable_memory: bool = Field(default=True, description="启用记忆系统")
memory_build_interval: int = Field(default=600, description="记忆构建间隔(秒)")
# 记忆构建配置
min_memory_length: int = Field(default=10, description="最小记忆长度")

View File

@@ -34,7 +34,7 @@ from src.plugin_system.core.plugin_manager import plugin_manager
from src.common.message import get_global_api
# 导入增强记忆系统管理器
from src.chat.memory_system.enhanced_memory_manager import enhanced_memory_manager
from src.chat.memory_system.memory_manager import memory_manager
# 插件系统现在使用统一的插件加载器
@@ -60,7 +60,7 @@ def _task_done_callback(task: asyncio.Task, message_id: str, start_time: float):
class MainSystem:
def __init__(self):
# 使用增强记忆系统
self.enhanced_memory_manager = enhanced_memory_manager
self.memory_manager = memory_manager
self.individuality: Individuality = get_individuality()
@@ -126,7 +126,7 @@ class MainSystem:
# 停止增强记忆系统
try:
if global_config.memory.enable_memory:
await self.enhanced_memory_manager.shutdown()
await self.memory_manager.shutdown()
logger.info("🛑 增强记忆系统已停止")
except Exception as e:
logger.error(f"停止增强记忆系统时出错: {e}")
@@ -270,7 +270,7 @@ MoFox_Bot(第三方修改版)
logger.info("聊天管理器初始化成功")
# 初始化增强记忆系统
await self.enhanced_memory_manager.initialize()
await self.memory_manager.initialize()
logger.info("增强记忆系统初始化成功")
# 老记忆系统已完全删除

View File

@@ -41,21 +41,22 @@ async def _calculate_interest(message: MessageRecv) -> Tuple[float, bool]:
if global_config.memory.enable_memory:
with Timer("记忆激活"):
# 使用新的增强记忆系统计算兴趣度
# 使用新的统一记忆系统计算兴趣度
try:
from src.chat.memory_system.enhanced_memory_integration import recall_memories
from src.chat.memory_system import get_memory_system
# 检索相关记忆来估算兴趣度
enhanced_memories = await recall_memories(
query=message.processed_plain_text,
memory_system = get_memory_system()
enhanced_memories = await memory_system.retrieve_relevant_memories(
query_text=message.processed_plain_text,
user_id=str(message.user_info.user_id),
chat_id=message.chat_id
scope_id=message.chat_id,
limit=5
)
# 基于检索结果计算兴趣度
if enhanced_memories:
# 有相关记忆,兴趣度基于相似度计算
max_score = max(score for _, score in enhanced_memories)
max_score = max(getattr(memory, 'relevance_score', 0.5) for memory in enhanced_memories)
interested_rate = min(max_score, 1.0) # 限制在0-1之间
else:
# 没有相关记忆,给予基础兴趣度

View File

@@ -172,20 +172,22 @@ class PromptBuilder:
@staticmethod
async def build_memory_block(text: str) -> str:
# 使用新的增强记忆系统检索记忆
# 使用新的统一记忆系统检索记忆
try:
from src.chat.memory_system.enhanced_memory_integration import recall_memories
from src.chat.memory_system import get_memory_system
enhanced_memories = await recall_memories(
query=text,
memory_system = get_memory_system()
enhanced_memories = await memory_system.retrieve_relevant_memories(
query_text=text,
user_id="system", # 系统查询
chat_id="system"
scope_id="system",
limit=5
)
related_memory_info = ""
if enhanced_memories and enhanced_memories.get("has_memories"):
for memory in enhanced_memories.get("memories", []):
related_memory_info += memory.get("content", "") + " "
if enhanced_memories:
for memory_chunk in enhanced_memories:
related_memory_info += memory_chunk.display or memory_chunk.text_content or ""
return await global_prompt_manager.format_prompt("memory_prompt", memory_info=related_memory_info.strip())
return ""

View File

@@ -603,16 +603,18 @@ class ChatterPlanFilter:
else:
keywords.append("晚上")
# 使用新的增强记忆系统检索记忆
# 使用新的统一记忆系统检索记忆
try:
from src.chat.memory_system.enhanced_memory_integration import recall_memories
from src.chat.memory_system import get_memory_system
memory_system = get_memory_system()
# 将关键词转换为查询字符串
query = " ".join(keywords)
enhanced_memories = await recall_memories(
query=query,
enhanced_memories = await memory_system.retrieve_relevant_memories(
query_text=query,
user_id="system", # 系统查询
chat_id="system"
scope_id="system",
limit=5
)
if not enhanced_memories:
@@ -620,9 +622,10 @@ class ChatterPlanFilter:
# 转换格式以兼容现有代码
retrieved_memories = []
if enhanced_memories and enhanced_memories.get("has_memories"):
for memory in enhanced_memories.get("memories", []):
retrieved_memories.append((memory.get("type", "unknown"), memory.get("content", "")))
for memory_chunk in enhanced_memories:
content = memory_chunk.display or memory_chunk.text_content or ""
memory_type = memory_chunk.memory_type.value if memory_chunk.memory_type else "unknown"
retrieved_memories.append((memory_type, content))
memory_statements = [f"关于'{topic}', 你记得'{memory_item}'" for topic, memory_item in retrieved_memories]

View File

@@ -349,7 +349,7 @@ class RemindAction(BaseAction):
# === 基本信息 ===
action_name = "set_reminder"
action_description = "根据用户的对话内容,智能地设置一个未来的提醒事项。"
activation_type = (ActionActivationType.KEYWORD,)
activation_type = ActionActivationType.KEYWORD
activation_keywords = ["提醒", "叫我", "记得", "别忘了"]
chat_type_allow = ChatType.ALL
parallel_action = True

View File

@@ -1,5 +1,5 @@
[inner]
version = "7.0.3"
version = "7.1.3"
#----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -254,18 +254,13 @@ max_context_emojis = 30 # 每次随机传递给LLM的表情包详细描述的最
[memory]
enable_memory = true # 是否启用记忆系统
memory_build_interval = 600 # 记忆构建间隔 单位秒 间隔越低MoFox-Bot学习越多但是冗余信息也会增多
enable_instant_memory = true # 是否启用即时记忆
enable_llm_instant_memory = true # 是否启用基于LLM的瞬时记忆
enable_vector_instant_memory = true # 是否启用基于向量的瞬时记忆
enable_enhanced_memory = true # 是否启用增强记忆系统
enhanced_memory_auto_save = true # 是否自动保存增强记忆
memory_build_interval = 600 # 记忆构建间隔(秒)。间隔越低,学习越频繁,但可能产生更多冗余信息
min_memory_length = 10 # 最小记忆长度
max_memory_length = 500 # 最大记忆长度
memory_value_threshold = 0.5 # 记忆价值阈值,低于该值的记忆会被丢弃
vector_similarity_threshold = 0.4 # 向量相似度阈值
semantic_similarity_threshold = 0.4 # 语义重排阶段的最低匹配阈值
vector_similarity_threshold = 0.7 # 向量相似度阈值
semantic_similarity_threshold = 0.6 # 语义重排阶段的最低匹配阈值
metadata_filter_limit = 100 # 元数据过滤阶段返回数量上限
vector_search_limit = 50 # 向量搜索阶段返回数量上限
@@ -277,12 +272,42 @@ semantic_weight = 0.3 # 综合评分中语义匹配的权重
context_weight = 0.2 # 综合评分中上下文关联的权重
recency_weight = 0.1 # 综合评分中时效性的权重
fusion_similarity_threshold = 0.6 # 记忆融合时的相似度阈值
fusion_similarity_threshold = 0.85 # 记忆融合时的相似度阈值
deduplication_window_hours = 24 # 记忆去重窗口(小时)
enable_memory_cache = true # 是否启用本地记忆缓存
cache_ttl_seconds = 300 # 缓存有效期(秒)
max_cache_size = 1000 # 缓存中允许的最大记忆条数
# 智能遗忘机制配置 (新增)
enable_memory_forgetting = true # 是否启用智能遗忘机制
forgetting_check_interval_hours = 24 # 遗忘检查间隔(小时)
# 遗忘阈值配置
base_forgetting_days = 30.0 # 基础遗忘天数
min_forgetting_days = 7.0 # 最小遗忘天数(重要记忆也会被保留的最少天数)
max_forgetting_days = 365.0 # 最大遗忘天数(普通记忆最长保留天数)
# 重要程度权重 - 不同重要程度的额外保护天数
critical_importance_bonus = 45.0 # 关键重要性额外天数
high_importance_bonus = 30.0 # 高重要性额外天数
normal_importance_bonus = 15.0 # 一般重要性额外天数
low_importance_bonus = 0.0 # 低重要性额外天数
# 置信度权重 - 不同置信度的额外保护天数
verified_confidence_bonus = 30.0 # 已验证置信度额外天数
high_confidence_bonus = 20.0 # 高置信度额外天数
medium_confidence_bonus = 10.0 # 中等置信度额外天数
low_confidence_bonus = 0.0 # 低置信度额外天数
# 激活频率权重
activation_frequency_weight = 0.5 # 每次激活增加的天数权重
max_frequency_bonus = 10.0 # 最大激活频率奖励天数
# 休眠机制
dormant_threshold_days = 90 # 休眠状态判定天数(超过此天数未访问的记忆进入休眠状态)
# 统一存储配置 (新增)
unified_storage_path = "data/unified_memory" # 统一存储数据路径
unified_storage_cache_limit = 10000 # 内存缓存大小限制
unified_storage_auto_save_interval = 50 # 自动保存间隔(记忆条数)
unified_storage_enable_compression = true # 是否启用数据压缩
[voice]
enable_asr = true # 是否启用语音识别启用后MoFox-Bot可以识别语音消息启用该功能需要配置语音识别模型[model.voice]