refactor(memory): 重构瞬时记忆为全量向量化存储模型

新系统采用“全量存储,定时清理”的设计理念,将所有聊天消息向量化并存入ChromaDB。通过后台线程定时清理过期消息,取代了之前基于“重要性模式”判断是否记忆的复杂逻辑。

主要变更:
- **全量存储**: 不再进行前置判断,所有消息均被向量化存储,简化了记忆创建流程。
- **定时清理**: 引入基于`threading`的后台任务,根据设定的`retention_hours`自动清理过期记忆,确保系统轻量高效。
- **简化检索**: 检索逻辑更新为直接查询相似消息,并增加了相似度阈值过滤和时间差格式化,提高了上下文的准确性和可读性。

在 `DefaultReplyer` 中,已切换至新的 `HybridInstantMemory`(其底层实现为V2),并优化了记忆上下文的构建逻辑,使其能更稳定地处理不同类型的记忆返回结果。
This commit is contained in:
minecraft1024a
2025-08-19 19:56:56 +08:00
parent e5ebd6879d
commit f2c46d0d1d
7 changed files with 929 additions and 264 deletions

View File

@@ -0,0 +1,445 @@
# -*- coding: utf-8 -*-
"""
混合瞬时记忆系统 V2
融合LLM和向量两种记忆策略智能选择最优方式
现已集成VectorInstantMemoryV2支持
- 自动定时清理过期记忆
- 相似度搜索
- 时间感知的记忆检索
- 更完善的错误处理
"""
import time
import asyncio
from typing import List, Optional, Dict, Any, Tuple, Union
from enum import Enum
from dataclasses import dataclass
from src.common.logger import get_logger
from .instant_memory import InstantMemory, MemoryItem
from .vector_instant_memory import VectorInstantMemoryV2
logger = get_logger(__name__)
class MemoryMode(Enum):
"""记忆处理模式"""
VECTOR_ONLY = "vector_only" # 仅使用向量系统
LLM_ONLY = "llm_only" # 仅使用LLM系统
LLM_PREFERRED = "llm_preferred" # 优先LLM向量备份
HYBRID = "hybrid" # 混合模式,并行处理
@dataclass
class StrategyDecision:
"""策略决策结果"""
mode: MemoryMode
confidence: float
reason: str
class MemoryStrategy:
"""记忆策略判断器"""
def __init__(self):
# 情感关键词
self.emotional_keywords = {
"开心", "高兴", "兴奋", "激动", "快乐", "愉快", "满意",
"伤心", "难过", "沮丧", "失落", "郁闷", "痛苦", "心疼",
"生气", "愤怒", "气愤", "恼火", "烦躁", "讨厌", "厌烦",
"担心", "焦虑", "紧张", "害怕", "恐惧", "不安", "忧虑",
"感动", "温馨", "幸福", "甜蜜", "浪漫", "美好", "珍惜",
"重要", "关键", "关心", "在意", "喜欢", "", "想念"
}
# 重要信息标识词
self.important_keywords = {
"计划", "目标", "梦想", "理想", "希望", "打算", "准备",
"决定", "选择", "考虑", "想要", "需要", "必须", "应该",
"工作", "学习", "考试", "面试", "项目", "任务", "职业",
"家人", "朋友", "恋人", "同事", "老师", "同学", "领导",
"生日", "节日", "纪念日", "约会", "聚会", "旅行", "出差"
}
def analyze_content_complexity(self, text: str) -> Dict[str, Any]:
"""分析内容复杂度"""
# 基础指标
char_count = len(text)
sentence_count = text.count('') + text.count('!') + text.count('?') + 1
# 情感词汇检测
emotional_score = sum(1 for word in self.emotional_keywords if word in text)
# 重要信息检测
importance_score = sum(1 for word in self.important_keywords if word in text)
# 问号密度(询问程度)
question_density = text.count('?') / max(char_count / 50, 1)
# 语气词检测(口语化程度)
casual_markers = ['', '', '', '', '哈哈', '呵呵', '嘿嘿']
casual_score = sum(1 for marker in casual_markers if marker in text)
return {
'char_count': char_count,
'sentence_count': sentence_count,
'emotional_score': emotional_score,
'importance_score': importance_score,
'question_density': question_density,
'casual_score': casual_score
}
def decide_strategy(self, text: str) -> StrategyDecision:
"""智能决策使用哪种记忆策略"""
if not text.strip():
return StrategyDecision(MemoryMode.VECTOR_ONLY, 0.0, "空内容")
analysis = self.analyze_content_complexity(text)
# 决策逻辑
# 1. 极短内容 -> 向量优先
if analysis['char_count'] < 20:
return StrategyDecision(
MemoryMode.VECTOR_ONLY,
0.9,
f"内容过短({analysis['char_count']}字符)"
)
# 2. 高情感内容 -> LLM优先
if analysis['emotional_score'] >= 2:
return StrategyDecision(
MemoryMode.LLM_PREFERRED,
0.8 + min(analysis['emotional_score'] * 0.1, 0.2),
f"检测到{analysis['emotional_score']}个情感词汇"
)
# 3. 重要信息 -> LLM优先
if analysis['importance_score'] >= 3:
return StrategyDecision(
MemoryMode.LLM_PREFERRED,
0.7 + min(analysis['importance_score'] * 0.05, 0.3),
f"检测到{analysis['importance_score']}个重要信息标识"
)
# 4. 复杂长文本 -> 混合模式
if analysis['char_count'] > 100 and analysis['sentence_count'] >= 3:
return StrategyDecision(
MemoryMode.HYBRID,
0.75,
f"复杂内容({analysis['char_count']}字符,{analysis['sentence_count']}句)"
)
# 5. 高询问度 -> LLM处理更准确
if analysis['question_density'] > 0.3:
return StrategyDecision(
MemoryMode.LLM_PREFERRED,
0.65,
f"高询问密度({analysis['question_density']:.2f})"
)
# 6. 日常闲聊 -> 向量优先(快速)
if analysis['casual_score'] >= 2 and analysis['char_count'] < 80:
return StrategyDecision(
MemoryMode.VECTOR_ONLY,
0.7,
f"日常闲聊内容(休闲标记:{analysis['casual_score']})"
)
# 7. 默认混合模式
return StrategyDecision(
MemoryMode.HYBRID,
0.6,
"中等复杂度内容,使用混合策略"
)
class MemorySync:
"""记忆同步器 - 处理两套系统间的数据同步"""
def __init__(self, llm_memory: InstantMemory, vector_memory: VectorInstantMemoryV2):
self.llm_memory = llm_memory
self.vector_memory = vector_memory
async def sync_llm_to_vector(self, memory_item: MemoryItem):
"""将LLM生成的高质量记忆同步到向量系统"""
try:
# 使用新的V2系统的存储方法
await self.vector_memory.store_message(
content=memory_item.memory_text,
sender="llm_memory"
)
logger.debug(f"LLM记忆已同步到向量系统: {memory_item.memory_text[:50]}...")
except Exception as e:
logger.error(f"LLM记忆同步到向量系统失败: {e}")
async def sync_vector_to_llm(self, content: str, importance: float):
"""将向量系统的记忆同步到LLM系统异步低优先级"""
try:
# 只有高重要性的向量记忆才值得同步到LLM系统
if importance < 0.7:
return
# 创建MemoryItem
memory_id = f"{self.llm_memory.chat_id}_{int(time.time() * 1000)}_vec_sync"
# 简化的关键词提取避免LLM调用
keywords = self._extract_simple_keywords(content)
memory_item = MemoryItem(
memory_id=memory_id,
chat_id=self.llm_memory.chat_id,
memory_text=content,
keywords=keywords
)
await self.llm_memory.store_memory(memory_item)
logger.debug(f"向量记忆已同步到LLM系统: {content[:50]}...")
except Exception as e:
logger.error(f"向量记忆同步到LLM系统失败: {e}")
def _extract_simple_keywords(self, content: str) -> List[str]:
"""简单关键词提取不使用LLM"""
# 基于常见词汇的简单提取
import re
# 移除标点符号,分词
clean_text = re.sub(r'[,。!?;:""''()【】\s]', ' ', content)
words = [w.strip() for w in clean_text.split() if len(w.strip()) >= 2]
# 简单去重和筛选
keywords = list(set(words))[:10] # 最多10个关键词
return keywords
class HybridRetriever:
"""融合检索器 - 合并两种检索策略的结果"""
def __init__(self, llm_memory: InstantMemory, vector_memory: VectorInstantMemoryV2):
self.llm_memory = llm_memory
self.vector_memory = vector_memory
async def retrieve_memories(self, target: str, strategy: MemoryMode) -> Optional[Union[str, List[str]]]:
"""根据策略检索记忆"""
if strategy == MemoryMode.VECTOR_ONLY:
# 使用V2系统的获取相关记忆上下文方法
context = await self.vector_memory.get_memory_for_context(target)
return context if context else None
elif strategy == MemoryMode.LLM_ONLY:
return await self.llm_memory.get_memory(target)
elif strategy == MemoryMode.LLM_PREFERRED:
# 优先LLM失败则降级到向量
llm_result = await self.llm_memory.get_memory(target)
if llm_result:
return llm_result
context = await self.vector_memory.get_memory_for_context(target)
return context if context else None
elif strategy == MemoryMode.HYBRID:
# 并行查询两个系统
return await self._hybrid_retrieve(target)
return None
async def _hybrid_retrieve(self, target: str) -> Optional[List[str]]:
"""混合检索 - 并行查询并融合结果"""
try:
# 并行查询
results = await asyncio.gather(
self.llm_memory.get_memory(target),
self.vector_memory.get_memory_for_context(target),
return_exceptions=True
)
llm_result, vector_result = results
# 收集有效结果
combined_memories = set()
# 处理LLM结果
if isinstance(llm_result, list) and llm_result:
combined_memories.update(llm_result)
elif isinstance(llm_result, str) and llm_result:
combined_memories.add(llm_result)
elif isinstance(llm_result, Exception):
logger.warning(f"LLM检索出错: {llm_result}")
# 处理向量结果
if isinstance(vector_result, str) and vector_result:
combined_memories.add(vector_result)
elif isinstance(vector_result, Exception):
logger.warning(f"向量检索出错: {vector_result}")
if combined_memories:
# 转换为列表并去重
return list(combined_memories)
return None
except Exception as e:
logger.error(f"混合检索失败: {e}")
return None
class HybridInstantMemory:
"""混合瞬时记忆系统 V2
智能融合LLM和向量两种记忆策略
- 快速内容使用向量系统V2 (自动清理过期记忆)
- 复杂内容使用LLM系统
- 重要内容双重备份
- 统一检索接口
- 支持相似度搜索和时间感知记忆
"""
def __init__(self, chat_id: str, retention_hours: int = 24):
self.chat_id = chat_id
self.retention_hours = retention_hours
# 初始化两个子系统
self.llm_memory = InstantMemory(chat_id)
self.vector_memory = VectorInstantMemoryV2(chat_id, retention_hours)
# 初始化策略组件
self.strategy = MemoryStrategy()
self.sync = MemorySync(self.llm_memory, self.vector_memory)
self.retriever = HybridRetriever(self.llm_memory, self.vector_memory)
logger.info(f"混合瞬时记忆系统初始化完成: {chat_id} (向量保留{retention_hours}小时)")
async def create_and_store_memory(self, text: str) -> None:
"""智能创建和存储记忆"""
if not text.strip():
return
try:
# 1. 策略决策
decision = self.strategy.decide_strategy(text)
logger.debug(f"记忆策略: {decision.mode.value} (置信度: {decision.confidence:.2f}) - {decision.reason}")
# 2. 根据策略执行存储
if decision.mode == MemoryMode.VECTOR_ONLY:
await self._store_vector_only(text)
elif decision.mode == MemoryMode.LLM_ONLY:
await self._store_llm_only(text)
elif decision.mode == MemoryMode.LLM_PREFERRED:
await self._store_llm_preferred(text)
elif decision.mode == MemoryMode.HYBRID:
await self._store_hybrid(text)
except Exception as e:
logger.error(f"混合记忆存储失败: {e}")
async def _store_vector_only(self, text: str):
"""仅向量存储"""
await self.vector_memory.store_message(text)
async def _store_llm_only(self, text: str):
"""仅LLM存储"""
await self.llm_memory.create_and_store_memory(text)
async def _store_llm_preferred(self, text: str):
"""LLM优先存储向量备份"""
try:
# 主存储LLM系统
await self.llm_memory.create_and_store_memory(text)
# 异步备份到向量系统
asyncio.create_task(self.vector_memory.store_message(text))
except Exception as e:
logger.error(f"LLM优先存储失败降级到向量系统: {e}")
await self.vector_memory.store_message(text)
async def _store_hybrid(self, text: str):
"""混合存储 - 并行存储到两个系统"""
try:
await asyncio.gather(
self.llm_memory.create_and_store_memory(text),
self.vector_memory.store_message(text),
return_exceptions=True
)
except Exception as e:
logger.error(f"混合存储失败: {e}")
async def get_memory(self, target: str) -> Optional[Union[str, List[str]]]:
"""统一记忆检索接口"""
if not target.strip():
return None
try:
# 根据查询复杂度选择检索策略
query_decision = self.strategy.decide_strategy(target)
# 对于查询,更偏向混合检索以获得更全面的结果
if query_decision.mode == MemoryMode.VECTOR_ONLY and len(target) > 30:
query_decision.mode = MemoryMode.HYBRID
logger.debug(f"检索策略: {query_decision.mode.value} - {query_decision.reason}")
return await self.retriever.retrieve_memories(target, query_decision.mode)
except Exception as e:
logger.error(f"混合记忆检索失败: {e}")
return None
def get_stats(self) -> Dict[str, Any]:
"""获取系统统计信息"""
llm_stats = {"total_memories": 0} # LLM系统暂无统计接口
vector_stats = self.vector_memory.get_stats()
return {
"chat_id": self.chat_id,
"mode": "hybrid",
"retention_hours": self.retention_hours,
"llm_system": llm_stats,
"vector_system": vector_stats,
"strategy_patterns": {
"emotional_keywords": len(self.strategy.emotional_keywords),
"important_keywords": len(self.strategy.important_keywords)
}
}
async def sync_memories(self, direction: str = "both"):
"""手动同步记忆"""
try:
if direction in ["both", "llm_to_vector"]:
# LLM -> 向量的同步需要额外实现
logger.info("LLM到向量的同步需要进一步开发")
if direction in ["both", "vector_to_llm"]:
# 向量 -> LLM的同步也需要额外实现
logger.info("向量到LLM的同步需要进一步开发")
except Exception as e:
logger.error(f"记忆同步失败: {e}")
def stop(self):
"""停止混合记忆系统"""
try:
self.vector_memory.stop()
logger.info(f"混合瞬时记忆系统已停止: {self.chat_id}")
except Exception as e:
logger.error(f"停止混合记忆系统失败: {e}")
async def find_similar_memories(self, query: str, top_k: int = 5, similarity_threshold: float = 0.7):
"""查找相似记忆 - 利用V2系统的新功能"""
return await self.vector_memory.find_similar_messages(query, top_k, similarity_threshold)
# 为了保持向后兼容,提供快捷创建函数
def create_hybrid_memory(chat_id: str, retention_hours: int = 24) -> HybridInstantMemory:
"""创建混合瞬时记忆系统实例
Args:
chat_id: 聊天ID
retention_hours: 向量记忆保留时长(小时)默认24小时
"""
return HybridInstantMemory(chat_id, retention_hours)

View File

@@ -0,0 +1,168 @@
# 混合瞬时记忆系统设计
## 系统概述
融合 `instant_memory.py`LLM系统`vector_instant_memory.py`(向量系统)的混合记忆系统,智能选择最优策略,无需配置文件控制。
## 融合架构
```
聊天输入 → 智能调度器 → 选择策略 → 双重存储 → 融合检索 → 统一输出
```
## 核心组件设计
### 1. HybridInstantMemory (主类)
**职责**: 统一接口,智能调度两套记忆系统
**关键方法**:
- `__init__(chat_id)` - 初始化两套子系统
- `create_and_store_memory(text)` - 智能存储记忆
- `get_memory(target)` - 融合检索记忆
- `get_stats()` - 统计信息
### 2. MemoryStrategy (策略判断器)
**职责**: 判断使用哪种记忆策略
**判断规则**:
- 文本长度 < 30字符 优先向量系统快速
- 包含情感词汇/重要信息 使用LLM系统准确
- 复杂场景 双重验证
**实现方法**:
```python
def decide_strategy(self, text: str) -> MemoryMode:
# 长度判断
if len(text) < 30:
return MemoryMode.VECTOR_ONLY
# 情感关键词检测
if self._contains_emotional_content(text):
return MemoryMode.LLM_PREFERRED
# 默认混合模式
return MemoryMode.HYBRID
```
### 3. MemorySync (同步器)
**职责**: 处理两套系统间的记忆同步和去重
**同步策略**:
- 向量系统存储的记忆 异步同步到LLM系统
- LLM系统生成的高质量记忆 生成向量存储
- 定期去重避免重复记忆
### 4. HybridRetriever (检索器)
**职责**: 融合两种检索方式提供最优结果
**检索策略**:
1. 并行查询向量系统和LLM系统
2. 按相似度/相关性排序
3. 去重合并返回最相关的记忆
## 智能调度逻辑
### 快速路径 (Vector Path)
- 适用: 短文本常规对话快速查询
- 优势: 响应速度快资源消耗低
- 时机: 文本简单无特殊情感内容
### 准确路径 (LLM Path)
- 适用: 重要信息情感表达复杂语义
- 优势: 语义理解深度记忆质量高
- 时机: 检测到重要性标志
### 混合路径 (Hybrid Path)
- 适用: 中等复杂度内容
- 策略: 向量快速筛选 + LLM精确处理
- 平衡: 速度与准确性
## 记忆存储策略
### 双重备份机制
1. **主存储**: 根据策略选择主要存储方式
2. **备份存储**: 异步备份到另一系统
3. **同步检查**: 定期校验两边数据一致性
### 存储优化
- 向量系统: 立即存储快速可用
- LLM系统: 批量处理高质量整理
- 重复检测: 跨系统去重
## 检索融合策略
### 并行检索
```python
async def get_memory(self, target: str):
# 并行查询两个系统
vector_task = self.vector_memory.get_memory(target)
llm_task = self.llm_memory.get_memory(target)
vector_results, llm_results = await asyncio.gather(
vector_task, llm_task, return_exceptions=True
)
# 融合结果
return self._merge_results(vector_results, llm_results)
```
### 结果融合
1. **相似度评分**: 统一两种系统的相似度计算
2. **权重调整**: 根据查询类型调整系统权重
3. **去重合并**: 移除重复内容保留最相关的
## 性能优化
### 异步处理
- 向量检索: 同步快速响应
- LLM处理: 异步后台处理
- 批量操作: 减少系统调用开销
### 缓存策略
- 热点记忆缓存
- 查询结果缓存
- 向量计算缓存
### 降级机制
- 向量系统故障 只使用LLM系统
- LLM系统故障 只使用向量系统
- 全部故障 返回空结果记录错误
## 实现计划
1. **基础框架**: 创建HybridInstantMemory主类
2. **策略判断**: 实现智能调度逻辑
3. **存储融合**: 实现双重存储机制
4. **检索融合**: 实现并行检索和结果合并
5. **同步机制**: 实现跨系统数据同步
6. **性能优化**: 异步处理和缓存优化
7. **错误处理**: 降级机制和异常处理
## 使用接口
```python
# 初始化混合记忆系统
hybrid_memory = HybridInstantMemory(chat_id="user_123")
# 智能存储记忆
await hybrid_memory.create_and_store_memory("今天天气真好,我去公园散步了")
# 融合检索记忆
memories = await hybrid_memory.get_memory("天气")
# 获取系统状态
stats = hybrid_memory.get_stats()
print(f"向量记忆: {stats['vector_count']} 条")
print(f"LLM记忆: {stats['llm_count']} 条")
```
## 预期效果
- **响应速度**: 比纯LLM系统快60%+
- **记忆质量**: 比纯向量系统准确30%+
- **资源使用**: 智能调度按需使用资源
- **可靠性**: 双系统备份单点故障不影响服务

View File

@@ -4,46 +4,67 @@ import json
import hashlib import hashlib
from typing import List, Optional, Tuple, Dict, Any from typing import List, Optional, Tuple, Dict, Any
from dataclasses import dataclass from dataclasses import dataclass
import threading
from datetime import datetime, timedelta
import numpy as np import numpy as np
import chromadb import chromadb
from chromadb.config import Settings from chromadb.config import Settings
from sqlalchemy import select
from src.common.logger import get_logger from src.common.logger import get_logger
from src.chat.utils.utils import get_embedding from src.chat.utils.utils import get_embedding
from src.config.config import global_config from src.config.config import global_config
from src.common.database.sqlalchemy_models import Memory
from src.common.database.sqlalchemy_database_api import get_db_session
logger = get_logger("vector_instant_memory") logger = get_logger("vector_instant_memory_v2")
@dataclass @dataclass
class MemoryImportancePattern: class ChatMessage:
"""记忆重要性模式""" """聊天消息数据结构"""
description: str message_id: str
vector: List[float] chat_id: str
threshold: float = 0.6 content: str
timestamp: float
sender: str = "unknown"
message_type: str = "text"
class VectorInstantMemory: class VectorInstantMemoryV2:
"""基于向量瞬时记忆系统 """重构的向量瞬时记忆系统 V2
完全替换原有的LLM判断方式使用向量相似度进行 新设计理念
1. 记忆重要性判断 1. 全量存储 - 所有聊天记录都存储为向量
2. 记忆内容去重 2. 定时清理 - 定期清理过期记录
3. 记忆检索匹配 3. 实时匹配 - 新消息与历史记录做向量相似度匹配
""" """
def __init__(self, chat_id: str): def __init__(self, chat_id: str, retention_hours: int = 24, cleanup_interval: int = 3600):
"""
初始化向量瞬时记忆系统
Args:
chat_id: 聊天ID
retention_hours: 记忆保留时长(小时)
cleanup_interval: 清理间隔(秒)
"""
self.chat_id = chat_id self.chat_id = chat_id
self.retention_hours = retention_hours
self.cleanup_interval = cleanup_interval
# ChromaDB相关
self.client = None self.client = None
self.collection = None self.collection = None
self.importance_patterns = []
self._init_chroma()
# 清理任务相关
self.cleanup_task = None
self.is_running = True
# 初始化系统
self._init_chroma()
self._start_cleanup_task()
logger.info(f"向量瞬时记忆系统V2初始化完成: {chat_id} (保留{retention_hours}小时)")
def _init_chroma(self): def _init_chroma(self):
"""初始化ChromaDB连接""" """初始化ChromaDB连接"""
try: try:
@@ -53,7 +74,7 @@ class VectorInstantMemory:
settings=Settings(anonymized_telemetry=False) settings=Settings(anonymized_telemetry=False)
) )
self.collection = self.client.get_or_create_collection( self.collection = self.client.get_or_create_collection(
name="instant_memories", name="chat_messages",
metadata={"hnsw:space": "cosine"} metadata={"hnsw:space": "cosine"}
) )
logger.info(f"向量记忆数据库初始化成功: {db_path}") logger.info(f"向量记忆数据库初始化成功: {db_path}")
@@ -62,284 +83,289 @@ class VectorInstantMemory:
self.client = None self.client = None
self.collection = None self.collection = None
async def _load_importance_patterns(self): def _start_cleanup_task(self):
"""加载重要性判断模式向量""" """启动定时清理任务"""
if self.importance_patterns: def cleanup_worker():
while self.is_running:
try:
self._cleanup_expired_messages()
time.sleep(self.cleanup_interval)
except Exception as e:
logger.error(f"清理任务异常: {e}")
time.sleep(60) # 异常时等待1分钟再继续
self.cleanup_task = threading.Thread(target=cleanup_worker, daemon=True)
self.cleanup_task.start()
logger.info(f"定时清理任务已启动,间隔{self.cleanup_interval}")
def _cleanup_expired_messages(self):
"""清理过期的聊天记录"""
if not self.collection:
return return
patterns = [
"用户分享了重要的个人信息和经历",
"讨论了未来的计划、安排或目标",
"表达了强烈的情感、观点或态度",
"询问了重要的问题需要回答",
"发生了有趣的对话和深入交流",
"出现了新的话题或重要信息",
"用户表现出明显的情绪变化",
"涉及重要的决定或选择"
]
try: try:
for i, pattern in enumerate(patterns): # 计算过期时间戳
vector = await get_embedding(pattern) expire_time = time.time() - (self.retention_hours * 3600)
if vector:
self.importance_patterns.append( # 查询所有记录
MemoryImportancePattern( all_results = self.collection.get(
description=pattern, where={"chat_id": self.chat_id},
vector=vector, include=["metadatas"]
threshold=0.55 + i * 0.01 # 动态阈值 )
)
) # 找出过期的记录ID
expired_ids = []
logger.info(f"加载了 {len(self.importance_patterns)} 个重要性判断模式") metadatas = all_results.get("metadatas") or []
ids = all_results.get("ids") or []
for i, metadata in enumerate(metadatas):
if metadata and isinstance(metadata, dict):
timestamp = metadata.get("timestamp", 0)
if isinstance(timestamp, (int, float)) and timestamp < expire_time:
if i < len(ids):
expired_ids.append(ids[i])
# 批量删除过期记录
if expired_ids:
self.collection.delete(ids=expired_ids)
logger.info(f"清理了 {len(expired_ids)} 条过期聊天记录")
except Exception as e: except Exception as e:
logger.error(f"加载重要性模式失败: {e}") logger.error(f"清理过期记录失败: {e}")
async def should_create_memory(self, chat_history: str) -> Tuple[bool, float]: async def store_message(self, content: str, sender: str = "user") -> bool:
"""向量化判断是否需要创建记忆 """
存储聊天消息到向量库
Args: Args:
chat_history: 聊天历史 content: 消息内容
sender: 发送者
Returns: Returns:
(是否需要记忆, 重要性分数) bool: 是否存储成功
""" """
if not chat_history.strip(): if not self.collection or not content.strip():
return False, 0.0
await self._load_importance_patterns()
try:
# 获取聊天历史的向量表示
history_vector = await get_embedding(chat_history[-500:]) # 只取最后500字符
if not history_vector:
return False, 0.0
# 与重要性模式向量计算相似度
max_score = 0.0
best_pattern = None
for pattern in self.importance_patterns:
similarity = self._cosine_similarity(history_vector, pattern.vector)
if similarity > max_score:
max_score = similarity
best_pattern = pattern
should_remember = max_score > 0.6 # 基础阈值
if should_remember and best_pattern:
logger.debug(f"触发记忆模式: {best_pattern.description} (相似度: {max_score:.3f})")
return should_remember, max_score
except Exception as e:
logger.error(f"向量化判断记忆重要性失败: {e}")
return False, 0.0
def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
"""计算余弦相似度"""
try:
v1 = np.array(vec1)
v2 = np.array(vec2)
dot_product = np.dot(v1, v2)
norms = np.linalg.norm(v1) * np.linalg.norm(v2)
if norms == 0:
return 0.0
return dot_product / norms
except Exception as e:
logger.error(f"计算余弦相似度失败: {e}")
return 0.0
def _extract_key_content(self, chat_history: str) -> str:
"""快速提取关键内容避免LLM调用"""
lines = chat_history.strip().split('\n')
# 简单规则:取最后几行非空对话
key_lines = []
for line in reversed(lines):
if line.strip() and ':' in line: # 包含发言者格式
key_lines.insert(0, line.strip())
if len(key_lines) >= 3: # 最多3行
break
return '\n'.join(key_lines) if key_lines else chat_history[-200:]
async def _is_duplicate_memory(self, content: str) -> bool:
"""检查是否为重复记忆"""
if not self.collection:
return False return False
try: try:
content_vector = await get_embedding(content) # 生成消息向量
if not content_vector: message_vector = await get_embedding(content)
if not message_vector:
logger.warning(f"消息向量生成失败: {content[:50]}...")
return False return False
# 查询最相似的记忆 # 生成唯一消息ID
results = self.collection.query( message_id = f"{self.chat_id}_{int(time.time() * 1000)}_{hash(content) % 10000}"
query_embeddings=[content_vector],
n_results=1 # 创建消息对象
message = ChatMessage(
message_id=message_id,
chat_id=self.chat_id,
content=content,
timestamp=time.time(),
sender=sender
) )
if results['distances'] and results['distances'][0]:
similarity = 1 - results['distances'][0][0] # ChromaDB用距离转换为相似度
return similarity > 0.85 # 85%相似度认为重复
except Exception as e:
logger.error(f"检查重复记忆失败: {e}")
return False
async def create_and_store_memory(self, chat_history: str):
"""创建并存储向量化记忆"""
try:
# 1. 向量化判断重要性
should_store, importance_score = await self.should_create_memory(chat_history)
if not should_store:
logger.debug("聊天内容不需要记忆")
return
# 2. 提取关键内容
key_content = self._extract_key_content(chat_history)
# 3. 检查重复
if await self._is_duplicate_memory(key_content):
logger.debug("发现重复记忆,跳过存储")
return
# 4. 向量化存储
await self._store_vector_memory(key_content, importance_score)
logger.info(f"成功存储向量记忆 (重要性: {importance_score:.3f}): {key_content[:50]}...")
except Exception as e:
logger.error(f"创建向量记忆失败: {e}")
async def _store_vector_memory(self, content: str, importance: float):
"""存储向量化记忆"""
if not self.collection:
logger.warning("ChromaDB未初始化无法存储向量记忆")
return
try:
# 生成向量
content_vector = await get_embedding(content)
if not content_vector:
logger.error("生成记忆向量失败")
return
# 生成唯一ID
memory_id = f"{self.chat_id}_{int(time.time() * 1000)}"
# 存储到ChromaDB # 存储到ChromaDB
self.collection.add( self.collection.add(
embeddings=[content_vector], embeddings=[message_vector],
documents=[content], documents=[content],
metadatas=[{ metadatas=[{
"chat_id": self.chat_id, "message_id": message.message_id,
"timestamp": time.time(), "chat_id": message.chat_id,
"importance": importance, "timestamp": message.timestamp,
"type": "instant_memory" "sender": message.sender,
"message_type": message.message_type
}], }],
ids=[memory_id] ids=[message_id]
) )
# 同时存储到原数据库(保持兼容性) logger.debug(f"消息已存储: {content[:50]}...")
await self._store_to_db(content, importance) return True
except Exception as e: except Exception as e:
logger.error(f"存储向量记忆到ChromaDB失败: {e}") logger.error(f"存储消息失败: {e}")
return False
async def _store_to_db(self, content: str, importance: float): async def find_similar_messages(self, query: str, top_k: int = 5, similarity_threshold: float = 0.7) -> List[Dict[str, Any]]:
"""存储到原数据库表""" """
try: 查找与查询相似的历史消息
with get_db_session() as session:
memory = Memory( Args:
memory_id=f"{self.chat_id}_{int(time.time() * 1000)}", query: 查询内容
chat_id=self.chat_id, top_k: 返回的最相似消息数量
memory_text=content, similarity_threshold: 相似度阈值
keywords=[], # 向量版本不需要关键词
create_time=time.time(), Returns:
last_view_time=time.time() List[Dict]: 相似消息列表包含content、similarity、timestamp等信息
) """
session.add(memory) if not self.collection or not query.strip():
session.commit() return []
except Exception as e:
logger.error(f"存储记忆到数据库失败: {e}")
async def get_memory(self, target: str) -> Optional[str]:
"""向量化检索相关记忆"""
if not self.collection:
return await self._fallback_get_memory(target)
try: try:
target_vector = await get_embedding(target) # 生成查询向量
if not target_vector: query_vector = await get_embedding(query)
return None if not query_vector:
return []
# 向量相似度搜索 # 向量相似度搜索
results = self.collection.query( results = self.collection.query(
query_embeddings=[target_vector], query_embeddings=[query_vector],
n_results=3, # 取前3个最相关的 n_results=top_k,
where={"chat_id": self.chat_id} where={"chat_id": self.chat_id}
) )
if not results['documents'] or not results['documents'][0]: if not results['documents'] or not results['documents'][0]:
return None return []
# 返回最相关的记忆 # 处理搜索结果
best_memory = results['documents'][0][0] similar_messages = []
distance = results['distances'][0][0] if results['distances'] else 1.0 documents = results['documents'][0]
similarity = 1 - distance distances = results['distances'][0] if results['distances'] else []
metadatas = results['metadatas'][0] if results['metadatas'] else []
if similarity > 0.7: # 70%相似度阈值 for i, doc in enumerate(documents):
logger.debug(f"找到相关记忆 (相似度: {similarity:.3f}): {best_memory[:50]}...") # 计算相似度ChromaDB返回距离需转换
return best_memory distance = distances[i] if i < len(distances) else 1.0
similarity = 1 - distance
# 过滤低相似度结果
if similarity < similarity_threshold:
continue
# 获取元数据
metadata = metadatas[i] if i < len(metadatas) else {}
# 安全获取timestamp
timestamp = metadata.get("timestamp", 0) if isinstance(metadata, dict) else 0
timestamp = float(timestamp) if isinstance(timestamp, (int, float)) else 0.0
similar_messages.append({
"content": doc,
"similarity": similarity,
"timestamp": timestamp,
"sender": metadata.get("sender", "unknown") if isinstance(metadata, dict) else "unknown",
"message_id": metadata.get("message_id", "") if isinstance(metadata, dict) else "",
"time_ago": self._format_time_ago(timestamp)
})
return None # 按相似度排序
similar_messages.sort(key=lambda x: x["similarity"], reverse=True)
logger.debug(f"找到 {len(similar_messages)} 条相似消息 (查询: {query[:30]}...)")
return similar_messages
except Exception as e: except Exception as e:
logger.error(f"向量检索记忆失败: {e}") logger.error(f"查找相似消息失败: {e}")
return await self._fallback_get_memory(target) return []
async def _fallback_get_memory(self, target: str) -> Optional[str]: def _format_time_ago(self, timestamp: float) -> str:
"""回退到数据库检索""" """格式化时间差显示"""
try: if timestamp <= 0:
with get_db_session() as session: return "未知时间"
query = session.execute(select(Memory).where(
Memory.chat_id == self.chat_id
).order_by(Memory.create_time.desc()).limit(10)).scalars()
memories = list(query)
# 简单的关键词匹配
for memory in memories:
if any(word in memory.memory_text for word in target.split() if len(word) > 1):
return memory.memory_text
return memories[0].memory_text if memories else None
except Exception as e: try:
logger.error(f"回退检索记忆失败: {e}") now = time.time()
return None diff = now - timestamp
if diff < 60:
return f"{int(diff)}秒前"
elif diff < 3600:
return f"{int(diff/60)}分钟前"
elif diff < 86400:
return f"{int(diff/3600)}小时前"
else:
return f"{int(diff/86400)}天前"
except:
return "时间格式错误"
async def get_memory_for_context(self, current_message: str, context_size: int = 3) -> str:
"""
获取与当前消息相关的记忆上下文
Args:
current_message: 当前消息
context_size: 上下文消息数量
Returns:
str: 格式化的记忆上下文
"""
similar_messages = await self.find_similar_messages(
current_message,
top_k=context_size,
similarity_threshold=0.6 # 降低阈值以获得更多上下文
)
if not similar_messages:
return ""
# 格式化上下文
context_lines = []
for msg in similar_messages:
context_lines.append(
f"[{msg['time_ago']}] {msg['sender']}: {msg['content']} (相似度: {msg['similarity']:.2f})"
)
return "相关的历史记忆:\n" + "\n".join(context_lines)
def get_stats(self) -> Dict[str, Any]: def get_stats(self) -> Dict[str, Any]:
"""获取记忆统计信息""" """获取记忆系统统计信息"""
stats = { stats = {
"chat_id": self.chat_id, "chat_id": self.chat_id,
"vector_enabled": self.collection is not None, "retention_hours": self.retention_hours,
"total_memories": 0, "cleanup_interval": self.cleanup_interval,
"importance_patterns": len(self.importance_patterns) "system_status": "running" if self.is_running else "stopped",
"total_messages": 0,
"db_status": "connected" if self.collection else "disconnected"
} }
if self.collection: if self.collection:
try: try:
result = self.collection.count() result = self.collection.count()
stats["total_memories"] = result stats["total_messages"] = result
except: except:
pass stats["total_messages"] = "查询失败"
return stats return stats
def stop(self):
"""停止记忆系统"""
self.is_running = False
if self.cleanup_task and self.cleanup_task.is_alive():
logger.info("正在停止定时清理任务...")
logger.info(f"向量瞬时记忆系统已停止: {self.chat_id}")
# 为了兼容现有代码,提供工厂函数
def create_vector_memory_v2(chat_id: str, retention_hours: int = 24) -> VectorInstantMemoryV2:
"""创建向量瞬时记忆系统V2实例"""
return VectorInstantMemoryV2(chat_id, retention_hours)
# 使用示例
async def demo():
"""使用演示"""
memory = VectorInstantMemoryV2("demo_chat")
# 存储一些测试消息
await memory.store_message("今天天气不错,出去散步了", "用户")
await memory.store_message("刚才买了个冰淇淋,很好吃", "用户")
await memory.store_message("明天要开会,有点紧张", "用户")
# 查找相似消息
similar = await memory.find_similar_messages("天气怎么样")
print("相似消息:", similar)
# 获取上下文
context = await memory.get_memory_for_context("今天心情如何")
print("记忆上下文:", context)
# 查看统计信息
stats = memory.get_stats()
print("系统状态:", stats)
memory.stop()
if __name__ == "__main__":
asyncio.run(demo())

View File

@@ -25,7 +25,7 @@ from src.chat.utils.chat_message_builder import (
) )
from src.chat.express.expression_selector import expression_selector from src.chat.express.expression_selector import expression_selector
from src.chat.memory_system.memory_activator import MemoryActivator from src.chat.memory_system.memory_activator import MemoryActivator
from src.chat.memory_system.vector_instant_memory import VectorInstantMemory from src.chat.memory_system.hybrid_instant_memory import HybridInstantMemory
from src.mood.mood_manager import mood_manager from src.mood.mood_manager import mood_manager
from src.person_info.relationship_fetcher import relationship_fetcher_manager from src.person_info.relationship_fetcher import relationship_fetcher_manager
from src.person_info.person_info import get_person_info_manager from src.person_info.person_info import get_person_info_manager
@@ -226,7 +226,11 @@ class DefaultReplyer:
self.heart_fc_sender = HeartFCSender() self.heart_fc_sender = HeartFCSender()
self.memory_activator = MemoryActivator() self.memory_activator = MemoryActivator()
self.instant_memory = VectorInstantMemory(chat_id=self.chat_stream.stream_id) # 使用混合瞬时记忆系统V2支持自定义保留时间
self.instant_memory = HybridInstantMemory(
chat_id=self.chat_stream.stream_id,
retention_hours=1
)
from src.plugin_system.core.tool_use import ToolExecutor # 延迟导入ToolExecutor不然会循环依赖 from src.plugin_system.core.tool_use import ToolExecutor # 延迟导入ToolExecutor不然会循环依赖
@@ -470,23 +474,42 @@ class DefaultReplyer:
) )
if global_config.memory.enable_instant_memory: if global_config.memory.enable_instant_memory:
# 异步存储聊天历史到混合记忆系统
asyncio.create_task(self.instant_memory.create_and_store_memory(chat_history)) asyncio.create_task(self.instant_memory.create_and_store_memory(chat_history))
instant_memory_list = await self.instant_memory.get_memory(target) # 从混合记忆系统获取相关记忆
instant_memory = instant_memory_list[0] if instant_memory_list else None instant_memory_result = await self.instant_memory.get_memory(target)
logger.info(f"即时记忆:{instant_memory}")
# 处理不同类型的返回结果
instant_memory = None
if isinstance(instant_memory_result, list) and instant_memory_result:
instant_memory = instant_memory_result[0]
elif isinstance(instant_memory_result, str) and instant_memory_result:
instant_memory = instant_memory_result
logger.info(f"混合瞬时记忆:{instant_memory}")
if not running_memories: # 构建记忆字符串,即使某种记忆为空也要继续
return "" memory_str = ""
has_any_memory = False
memory_str = "以下是当前在聊天中,你回忆起的记忆:\n" # 添加长期记忆
for running_memory in running_memories: if running_memories:
memory_str += f"- {running_memory['content']}\n" if not memory_str:
memory_str = "以下是当前在聊天中,你回忆起的记忆:\n"
for running_memory in running_memories:
memory_str += f"- {running_memory['content']}\n"
has_any_memory = True
# 添加瞬时记忆
if instant_memory: if instant_memory:
if not memory_str:
memory_str = "以下是当前在聊天中,你回忆起的记忆:\n"
memory_str += f"- {instant_memory}\n" memory_str += f"- {instant_memory}\n"
has_any_memory = True
return memory_str # 只有当完全没有任何记忆时才返回空字符串
return memory_str if has_any_memory else ""
async def build_tool_info(self, chat_history: str, reply_to: str = "", enable_tool: bool = True) -> str: async def build_tool_info(self, chat_history: str, reply_to: str = "", enable_tool: bool = True) -> str:
"""构建工具信息块 """构建工具信息块
@@ -1154,7 +1177,7 @@ class DefaultReplyer:
"""构建单个发送消息""" """构建单个发送消息"""
bot_user_info = UserInfo( bot_user_info = UserInfo(
user_id=global_config.bot.qq_account, user_id=str(global_config.bot.qq_account),
user_nickname=global_config.bot.nickname, user_nickname=global_config.bot.nickname,
platform=self.chat_stream.platform, platform=self.chat_stream.platform,
) )

View File

@@ -475,6 +475,7 @@ class AiohttpGeminiClient(BaseClient):
# 直接重抛项目定义的异常 # 直接重抛项目定义的异常
raise raise
except Exception as e: except Exception as e:
logger.debug(e)
# 其他异常转换为网络连接错误 # 其他异常转换为网络连接错误
raise NetworkConnectionError() from e raise NetworkConnectionError() from e

View File

@@ -118,4 +118,4 @@ class CookieService:
return cookies return cookies
logger.error("所有Cookie获取方法均失败。") logger.error("所有Cookie获取方法均失败。")
return None return None

View File

@@ -312,11 +312,13 @@ class QZoneService:
raise RuntimeError(f"无法连接到Napcat服务: 超过最大重试次数({max_retries})") raise RuntimeError(f"无法连接到Napcat服务: 超过最大重试次数({max_retries})")
async def _get_api_client(self, qq_account: str, stream_id: Optional[str]) -> Optional[Dict]: async def _get_api_client(self, qq_account: str, stream_id: Optional[str]) -> Optional[Dict]:
cookies = await self._renew_and_load_cookies(qq_account, stream_id) cookies = await self.cookie_service.get_cookies(qq_account, stream_id)
if not cookies: return None if not cookies:
return None
p_skey = cookies.get('p_skey') or cookies.get('p_skey'.upper()) p_skey = cookies.get('p_skey') or cookies.get('p_skey'.upper())
if not p_skey: return None if not p_skey:
return None
gtk = self._generate_gtk(p_skey) gtk = self._generate_gtk(p_skey)
uin = cookies.get('uin', '').lstrip('o') uin = cookies.get('uin', '').lstrip('o')