diff --git a/src/chat/memory_system/hybrid_instant_memory.py b/src/chat/memory_system/hybrid_instant_memory.py new file mode 100644 index 000000000..a8178a2f4 --- /dev/null +++ b/src/chat/memory_system/hybrid_instant_memory.py @@ -0,0 +1,445 @@ +# -*- coding: utf-8 -*- +""" +混合瞬时记忆系统 V2 +融合LLM和向量两种记忆策略,智能选择最优方式 +现已集成VectorInstantMemoryV2,支持: +- 自动定时清理过期记忆 +- 相似度搜索 +- 时间感知的记忆检索 +- 更完善的错误处理 +""" + +import time +import asyncio +from typing import List, Optional, Dict, Any, Tuple, Union +from enum import Enum +from dataclasses import dataclass + +from src.common.logger import get_logger +from .instant_memory import InstantMemory, MemoryItem +from .vector_instant_memory import VectorInstantMemoryV2 + +logger = get_logger(__name__) + + +class MemoryMode(Enum): + """记忆处理模式""" + VECTOR_ONLY = "vector_only" # 仅使用向量系统 + LLM_ONLY = "llm_only" # 仅使用LLM系统 + LLM_PREFERRED = "llm_preferred" # 优先LLM,向量备份 + HYBRID = "hybrid" # 混合模式,并行处理 + + +@dataclass +class StrategyDecision: + """策略决策结果""" + mode: MemoryMode + confidence: float + reason: str + + +class MemoryStrategy: + """记忆策略判断器""" + + def __init__(self): + # 情感关键词 + self.emotional_keywords = { + "开心", "高兴", "兴奋", "激动", "快乐", "愉快", "满意", + "伤心", "难过", "沮丧", "失落", "郁闷", "痛苦", "心疼", + "生气", "愤怒", "气愤", "恼火", "烦躁", "讨厌", "厌烦", + "担心", "焦虑", "紧张", "害怕", "恐惧", "不安", "忧虑", + "感动", "温馨", "幸福", "甜蜜", "浪漫", "美好", "珍惜", + "重要", "关键", "关心", "在意", "喜欢", "爱", "想念" + } + + # 重要信息标识词 + self.important_keywords = { + "计划", "目标", "梦想", "理想", "希望", "打算", "准备", + "决定", "选择", "考虑", "想要", "需要", "必须", "应该", + "工作", "学习", "考试", "面试", "项目", "任务", "职业", + "家人", "朋友", "恋人", "同事", "老师", "同学", "领导", + "生日", "节日", "纪念日", "约会", "聚会", "旅行", "出差" + } + + def analyze_content_complexity(self, text: str) -> Dict[str, Any]: + """分析内容复杂度""" + # 基础指标 + char_count = len(text) + sentence_count = text.count('。') + text.count('!') + text.count('?') + 1 + + # 情感词汇检测 + emotional_score = sum(1 for word in self.emotional_keywords if word in text) + + # 重要信息检测 + importance_score = sum(1 for word in self.important_keywords if word in text) + + # 问号密度(询问程度) + question_density = text.count('?') / max(char_count / 50, 1) + + # 语气词检测(口语化程度) + casual_markers = ['啊', '呀', '嗯', '哦', '哈哈', '呵呵', '嘿嘿'] + casual_score = sum(1 for marker in casual_markers if marker in text) + + return { + 'char_count': char_count, + 'sentence_count': sentence_count, + 'emotional_score': emotional_score, + 'importance_score': importance_score, + 'question_density': question_density, + 'casual_score': casual_score + } + + def decide_strategy(self, text: str) -> StrategyDecision: + """智能决策使用哪种记忆策略""" + if not text.strip(): + return StrategyDecision(MemoryMode.VECTOR_ONLY, 0.0, "空内容") + + analysis = self.analyze_content_complexity(text) + # 决策逻辑 + + # 1. 极短内容 -> 向量优先 + if analysis['char_count'] < 20: + return StrategyDecision( + MemoryMode.VECTOR_ONLY, + 0.9, + f"内容过短({analysis['char_count']}字符)" + ) + + # 2. 高情感内容 -> LLM优先 + if analysis['emotional_score'] >= 2: + return StrategyDecision( + MemoryMode.LLM_PREFERRED, + 0.8 + min(analysis['emotional_score'] * 0.1, 0.2), + f"检测到{analysis['emotional_score']}个情感词汇" + ) + + # 3. 重要信息 -> LLM优先 + if analysis['importance_score'] >= 3: + return StrategyDecision( + MemoryMode.LLM_PREFERRED, + 0.7 + min(analysis['importance_score'] * 0.05, 0.3), + f"检测到{analysis['importance_score']}个重要信息标识" + ) + + # 4. 复杂长文本 -> 混合模式 + if analysis['char_count'] > 100 and analysis['sentence_count'] >= 3: + return StrategyDecision( + MemoryMode.HYBRID, + 0.75, + f"复杂内容({analysis['char_count']}字符,{analysis['sentence_count']}句)" + ) + + # 5. 高询问度 -> LLM处理更准确 + if analysis['question_density'] > 0.3: + return StrategyDecision( + MemoryMode.LLM_PREFERRED, + 0.65, + f"高询问密度({analysis['question_density']:.2f})" + ) + + # 6. 日常闲聊 -> 向量优先(快速) + if analysis['casual_score'] >= 2 and analysis['char_count'] < 80: + return StrategyDecision( + MemoryMode.VECTOR_ONLY, + 0.7, + f"日常闲聊内容(休闲标记:{analysis['casual_score']})" + ) + + # 7. 默认混合模式 + return StrategyDecision( + MemoryMode.HYBRID, + 0.6, + "中等复杂度内容,使用混合策略" + ) + + +class MemorySync: + """记忆同步器 - 处理两套系统间的数据同步""" + + def __init__(self, llm_memory: InstantMemory, vector_memory: VectorInstantMemoryV2): + self.llm_memory = llm_memory + self.vector_memory = vector_memory + + async def sync_llm_to_vector(self, memory_item: MemoryItem): + """将LLM生成的高质量记忆同步到向量系统""" + try: + # 使用新的V2系统的存储方法 + await self.vector_memory.store_message( + content=memory_item.memory_text, + sender="llm_memory" + ) + logger.debug(f"LLM记忆已同步到向量系统: {memory_item.memory_text[:50]}...") + except Exception as e: + logger.error(f"LLM记忆同步到向量系统失败: {e}") + + async def sync_vector_to_llm(self, content: str, importance: float): + """将向量系统的记忆同步到LLM系统(异步,低优先级)""" + try: + # 只有高重要性的向量记忆才值得同步到LLM系统 + if importance < 0.7: + return + + # 创建MemoryItem + memory_id = f"{self.llm_memory.chat_id}_{int(time.time() * 1000)}_vec_sync" + + # 简化的关键词提取(避免LLM调用) + keywords = self._extract_simple_keywords(content) + + memory_item = MemoryItem( + memory_id=memory_id, + chat_id=self.llm_memory.chat_id, + memory_text=content, + keywords=keywords + ) + + await self.llm_memory.store_memory(memory_item) + logger.debug(f"向量记忆已同步到LLM系统: {content[:50]}...") + + except Exception as e: + logger.error(f"向量记忆同步到LLM系统失败: {e}") + + def _extract_simple_keywords(self, content: str) -> List[str]: + """简单关键词提取(不使用LLM)""" + # 基于常见词汇的简单提取 + import re + + # 移除标点符号,分词 + clean_text = re.sub(r'[,。!?;:""''()【】\s]', ' ', content) + words = [w.strip() for w in clean_text.split() if len(w.strip()) >= 2] + + # 简单去重和筛选 + keywords = list(set(words))[:10] # 最多10个关键词 + return keywords + + +class HybridRetriever: + """融合检索器 - 合并两种检索策略的结果""" + + def __init__(self, llm_memory: InstantMemory, vector_memory: VectorInstantMemoryV2): + self.llm_memory = llm_memory + self.vector_memory = vector_memory + + async def retrieve_memories(self, target: str, strategy: MemoryMode) -> Optional[Union[str, List[str]]]: + """根据策略检索记忆""" + + if strategy == MemoryMode.VECTOR_ONLY: + # 使用V2系统的获取相关记忆上下文方法 + context = await self.vector_memory.get_memory_for_context(target) + return context if context else None + + elif strategy == MemoryMode.LLM_ONLY: + return await self.llm_memory.get_memory(target) + + elif strategy == MemoryMode.LLM_PREFERRED: + # 优先LLM,失败则降级到向量 + llm_result = await self.llm_memory.get_memory(target) + if llm_result: + return llm_result + context = await self.vector_memory.get_memory_for_context(target) + return context if context else None + + elif strategy == MemoryMode.HYBRID: + # 并行查询两个系统 + return await self._hybrid_retrieve(target) + + return None + + async def _hybrid_retrieve(self, target: str) -> Optional[List[str]]: + """混合检索 - 并行查询并融合结果""" + try: + # 并行查询 + results = await asyncio.gather( + self.llm_memory.get_memory(target), + self.vector_memory.get_memory_for_context(target), + return_exceptions=True + ) + + llm_result, vector_result = results + + # 收集有效结果 + combined_memories = set() + + # 处理LLM结果 + if isinstance(llm_result, list) and llm_result: + combined_memories.update(llm_result) + elif isinstance(llm_result, str) and llm_result: + combined_memories.add(llm_result) + elif isinstance(llm_result, Exception): + logger.warning(f"LLM检索出错: {llm_result}") + + # 处理向量结果 + if isinstance(vector_result, str) and vector_result: + combined_memories.add(vector_result) + elif isinstance(vector_result, Exception): + logger.warning(f"向量检索出错: {vector_result}") + + if combined_memories: + # 转换为列表并去重 + return list(combined_memories) + + return None + + except Exception as e: + logger.error(f"混合检索失败: {e}") + return None + + +class HybridInstantMemory: + """混合瞬时记忆系统 V2 + + 智能融合LLM和向量两种记忆策略: + - 快速内容使用向量系统V2 (自动清理过期记忆) + - 复杂内容使用LLM系统 + - 重要内容双重备份 + - 统一检索接口 + - 支持相似度搜索和时间感知记忆 + """ + + def __init__(self, chat_id: str, retention_hours: int = 24): + self.chat_id = chat_id + self.retention_hours = retention_hours + + # 初始化两个子系统 + self.llm_memory = InstantMemory(chat_id) + self.vector_memory = VectorInstantMemoryV2(chat_id, retention_hours) + + # 初始化策略组件 + self.strategy = MemoryStrategy() + self.sync = MemorySync(self.llm_memory, self.vector_memory) + self.retriever = HybridRetriever(self.llm_memory, self.vector_memory) + + logger.info(f"混合瞬时记忆系统初始化完成: {chat_id} (向量保留{retention_hours}小时)") + + async def create_and_store_memory(self, text: str) -> None: + """智能创建和存储记忆""" + if not text.strip(): + return + + try: + # 1. 策略决策 + decision = self.strategy.decide_strategy(text) + + logger.debug(f"记忆策略: {decision.mode.value} (置信度: {decision.confidence:.2f}) - {decision.reason}") + + # 2. 根据策略执行存储 + if decision.mode == MemoryMode.VECTOR_ONLY: + await self._store_vector_only(text) + + elif decision.mode == MemoryMode.LLM_ONLY: + await self._store_llm_only(text) + + elif decision.mode == MemoryMode.LLM_PREFERRED: + await self._store_llm_preferred(text) + + elif decision.mode == MemoryMode.HYBRID: + await self._store_hybrid(text) + + except Exception as e: + logger.error(f"混合记忆存储失败: {e}") + + async def _store_vector_only(self, text: str): + """仅向量存储""" + await self.vector_memory.store_message(text) + + async def _store_llm_only(self, text: str): + """仅LLM存储""" + await self.llm_memory.create_and_store_memory(text) + + async def _store_llm_preferred(self, text: str): + """LLM优先存储,向量备份""" + try: + # 主存储:LLM系统 + await self.llm_memory.create_and_store_memory(text) + + # 异步备份到向量系统 + asyncio.create_task(self.vector_memory.store_message(text)) + + except Exception as e: + logger.error(f"LLM优先存储失败,降级到向量系统: {e}") + await self.vector_memory.store_message(text) + + async def _store_hybrid(self, text: str): + """混合存储 - 并行存储到两个系统""" + try: + await asyncio.gather( + self.llm_memory.create_and_store_memory(text), + self.vector_memory.store_message(text), + return_exceptions=True + ) + except Exception as e: + logger.error(f"混合存储失败: {e}") + + async def get_memory(self, target: str) -> Optional[Union[str, List[str]]]: + """统一记忆检索接口""" + if not target.strip(): + return None + + try: + # 根据查询复杂度选择检索策略 + query_decision = self.strategy.decide_strategy(target) + + # 对于查询,更偏向混合检索以获得更全面的结果 + if query_decision.mode == MemoryMode.VECTOR_ONLY and len(target) > 30: + query_decision.mode = MemoryMode.HYBRID + + logger.debug(f"检索策略: {query_decision.mode.value} - {query_decision.reason}") + + return await self.retriever.retrieve_memories(target, query_decision.mode) + + except Exception as e: + logger.error(f"混合记忆检索失败: {e}") + return None + + def get_stats(self) -> Dict[str, Any]: + """获取系统统计信息""" + llm_stats = {"total_memories": 0} # LLM系统暂无统计接口 + vector_stats = self.vector_memory.get_stats() + + return { + "chat_id": self.chat_id, + "mode": "hybrid", + "retention_hours": self.retention_hours, + "llm_system": llm_stats, + "vector_system": vector_stats, + "strategy_patterns": { + "emotional_keywords": len(self.strategy.emotional_keywords), + "important_keywords": len(self.strategy.important_keywords) + } + } + + async def sync_memories(self, direction: str = "both"): + """手动同步记忆""" + try: + if direction in ["both", "llm_to_vector"]: + # LLM -> 向量的同步需要额外实现 + logger.info("LLM到向量的同步需要进一步开发") + + if direction in ["both", "vector_to_llm"]: + # 向量 -> LLM的同步也需要额外实现 + logger.info("向量到LLM的同步需要进一步开发") + + except Exception as e: + logger.error(f"记忆同步失败: {e}") + + def stop(self): + """停止混合记忆系统""" + try: + self.vector_memory.stop() + logger.info(f"混合瞬时记忆系统已停止: {self.chat_id}") + except Exception as e: + logger.error(f"停止混合记忆系统失败: {e}") + + async def find_similar_memories(self, query: str, top_k: int = 5, similarity_threshold: float = 0.7): + """查找相似记忆 - 利用V2系统的新功能""" + return await self.vector_memory.find_similar_messages(query, top_k, similarity_threshold) + + +# 为了保持向后兼容,提供快捷创建函数 +def create_hybrid_memory(chat_id: str, retention_hours: int = 24) -> HybridInstantMemory: + """创建混合瞬时记忆系统实例 + + Args: + chat_id: 聊天ID + retention_hours: 向量记忆保留时长(小时),默认24小时 + """ + return HybridInstantMemory(chat_id, retention_hours) \ No newline at end of file diff --git a/src/chat/memory_system/hybrid_memory_design.md b/src/chat/memory_system/hybrid_memory_design.md new file mode 100644 index 000000000..f47d3cacc --- /dev/null +++ b/src/chat/memory_system/hybrid_memory_design.md @@ -0,0 +1,168 @@ +# 混合瞬时记忆系统设计 + +## 系统概述 + +融合 `instant_memory.py`(LLM系统)和 `vector_instant_memory.py`(向量系统)的混合记忆系统,智能选择最优策略,无需配置文件控制。 + +## 融合架构 + +``` +聊天输入 → 智能调度器 → 选择策略 → 双重存储 → 融合检索 → 统一输出 +``` + +## 核心组件设计 + +### 1. HybridInstantMemory (主类) + +**职责**: 统一接口,智能调度两套记忆系统 + +**关键方法**: +- `__init__(chat_id)` - 初始化两套子系统 +- `create_and_store_memory(text)` - 智能存储记忆 +- `get_memory(target)` - 融合检索记忆 +- `get_stats()` - 统计信息 + +### 2. MemoryStrategy (策略判断器) + +**职责**: 判断使用哪种记忆策略 + +**判断规则**: +- 文本长度 < 30字符 → 优先向量系统(快速) +- 包含情感词汇/重要信息 → 使用LLM系统(准确) +- 复杂场景 → 双重验证 + +**实现方法**: +```python +def decide_strategy(self, text: str) -> MemoryMode: + # 长度判断 + if len(text) < 30: + return MemoryMode.VECTOR_ONLY + + # 情感关键词检测 + if self._contains_emotional_content(text): + return MemoryMode.LLM_PREFERRED + + # 默认混合模式 + return MemoryMode.HYBRID +``` + +### 3. MemorySync (同步器) + +**职责**: 处理两套系统间的记忆同步和去重 + +**同步策略**: +- 向量系统存储的记忆 → 异步同步到LLM系统 +- LLM系统生成的高质量记忆 → 生成向量存储 +- 定期去重,避免重复记忆 + +### 4. HybridRetriever (检索器) + +**职责**: 融合两种检索方式,提供最优结果 + +**检索策略**: +1. 并行查询向量系统和LLM系统 +2. 按相似度/相关性排序 +3. 去重合并,返回最相关的记忆 + +## 智能调度逻辑 + +### 快速路径 (Vector Path) +- 适用: 短文本、常规对话、快速查询 +- 优势: 响应速度快,资源消耗低 +- 时机: 文本简单、无特殊情感内容 + +### 准确路径 (LLM Path) +- 适用: 重要信息、情感表达、复杂语义 +- 优势: 语义理解深度,记忆质量高 +- 时机: 检测到重要性标志 + +### 混合路径 (Hybrid Path) +- 适用: 中等复杂度内容 +- 策略: 向量快速筛选 + LLM精确处理 +- 平衡: 速度与准确性 + +## 记忆存储策略 + +### 双重备份机制 +1. **主存储**: 根据策略选择主要存储方式 +2. **备份存储**: 异步备份到另一系统 +3. **同步检查**: 定期校验两边数据一致性 + +### 存储优化 +- 向量系统: 立即存储,快速可用 +- LLM系统: 批量处理,高质量整理 +- 重复检测: 跨系统去重 + +## 检索融合策略 + +### 并行检索 +```python +async def get_memory(self, target: str): + # 并行查询两个系统 + vector_task = self.vector_memory.get_memory(target) + llm_task = self.llm_memory.get_memory(target) + + vector_results, llm_results = await asyncio.gather( + vector_task, llm_task, return_exceptions=True + ) + + # 融合结果 + return self._merge_results(vector_results, llm_results) +``` + +### 结果融合 +1. **相似度评分**: 统一两种系统的相似度计算 +2. **权重调整**: 根据查询类型调整系统权重 +3. **去重合并**: 移除重复内容,保留最相关的 + +## 性能优化 + +### 异步处理 +- 向量检索: 同步快速响应 +- LLM处理: 异步后台处理 +- 批量操作: 减少系统调用开销 + +### 缓存策略 +- 热点记忆缓存 +- 查询结果缓存 +- 向量计算缓存 + +### 降级机制 +- 向量系统故障 → 只使用LLM系统 +- LLM系统故障 → 只使用向量系统 +- 全部故障 → 返回空结果,记录错误 + +## 实现计划 + +1. **基础框架**: 创建HybridInstantMemory主类 +2. **策略判断**: 实现智能调度逻辑 +3. **存储融合**: 实现双重存储机制 +4. **检索融合**: 实现并行检索和结果合并 +5. **同步机制**: 实现跨系统数据同步 +6. **性能优化**: 异步处理和缓存优化 +7. **错误处理**: 降级机制和异常处理 + +## 使用接口 + +```python +# 初始化混合记忆系统 +hybrid_memory = HybridInstantMemory(chat_id="user_123") + +# 智能存储记忆 +await hybrid_memory.create_and_store_memory("今天天气真好,我去公园散步了") + +# 融合检索记忆 +memories = await hybrid_memory.get_memory("天气") + +# 获取系统状态 +stats = hybrid_memory.get_stats() +print(f"向量记忆: {stats['vector_count']} 条") +print(f"LLM记忆: {stats['llm_count']} 条") +``` + +## 预期效果 + +- **响应速度**: 比纯LLM系统快60%+ +- **记忆质量**: 比纯向量系统准确30%+ +- **资源使用**: 智能调度,按需使用资源 +- **可靠性**: 双系统备份,单点故障不影响服务 \ No newline at end of file diff --git a/src/chat/memory_system/vector_instant_memory.py b/src/chat/memory_system/vector_instant_memory.py index 7970431a1..7cfa02104 100644 --- a/src/chat/memory_system/vector_instant_memory.py +++ b/src/chat/memory_system/vector_instant_memory.py @@ -4,46 +4,67 @@ import json import hashlib from typing import List, Optional, Tuple, Dict, Any from dataclasses import dataclass +import threading +from datetime import datetime, timedelta import numpy as np import chromadb from chromadb.config import Settings -from sqlalchemy import select - from src.common.logger import get_logger from src.chat.utils.utils import get_embedding from src.config.config import global_config -from src.common.database.sqlalchemy_models import Memory -from src.common.database.sqlalchemy_database_api import get_db_session -logger = get_logger("vector_instant_memory") +logger = get_logger("vector_instant_memory_v2") @dataclass -class MemoryImportancePattern: - """记忆重要性模式""" - description: str - vector: List[float] - threshold: float = 0.6 +class ChatMessage: + """聊天消息数据结构""" + message_id: str + chat_id: str + content: str + timestamp: float + sender: str = "unknown" + message_type: str = "text" -class VectorInstantMemory: - """基于向量的瞬时记忆系统 +class VectorInstantMemoryV2: + """重构的向量瞬时记忆系统 V2 - 完全替换原有的LLM判断方式,使用向量相似度进行: - 1. 记忆重要性判断 - 2. 记忆内容去重 - 3. 记忆检索匹配 + 新设计理念: + 1. 全量存储 - 所有聊天记录都存储为向量 + 2. 定时清理 - 定期清理过期记录 + 3. 实时匹配 - 新消息与历史记录做向量相似度匹配 """ - def __init__(self, chat_id: str): + def __init__(self, chat_id: str, retention_hours: int = 24, cleanup_interval: int = 3600): + """ + 初始化向量瞬时记忆系统 + + Args: + chat_id: 聊天ID + retention_hours: 记忆保留时长(小时) + cleanup_interval: 清理间隔(秒) + """ self.chat_id = chat_id + self.retention_hours = retention_hours + self.cleanup_interval = cleanup_interval + + # ChromaDB相关 self.client = None self.collection = None - self.importance_patterns = [] - self._init_chroma() + # 清理任务相关 + self.cleanup_task = None + self.is_running = True + + # 初始化系统 + self._init_chroma() + self._start_cleanup_task() + + logger.info(f"向量瞬时记忆系统V2初始化完成: {chat_id} (保留{retention_hours}小时)") + def _init_chroma(self): """初始化ChromaDB连接""" try: @@ -53,7 +74,7 @@ class VectorInstantMemory: settings=Settings(anonymized_telemetry=False) ) self.collection = self.client.get_or_create_collection( - name="instant_memories", + name="chat_messages", metadata={"hnsw:space": "cosine"} ) logger.info(f"向量记忆数据库初始化成功: {db_path}") @@ -62,284 +83,289 @@ class VectorInstantMemory: self.client = None self.collection = None - async def _load_importance_patterns(self): - """加载重要性判断模式向量""" - if self.importance_patterns: + def _start_cleanup_task(self): + """启动定时清理任务""" + def cleanup_worker(): + while self.is_running: + try: + self._cleanup_expired_messages() + time.sleep(self.cleanup_interval) + except Exception as e: + logger.error(f"清理任务异常: {e}") + time.sleep(60) # 异常时等待1分钟再继续 + + self.cleanup_task = threading.Thread(target=cleanup_worker, daemon=True) + self.cleanup_task.start() + logger.info(f"定时清理任务已启动,间隔{self.cleanup_interval}秒") + + def _cleanup_expired_messages(self): + """清理过期的聊天记录""" + if not self.collection: return - patterns = [ - "用户分享了重要的个人信息和经历", - "讨论了未来的计划、安排或目标", - "表达了强烈的情感、观点或态度", - "询问了重要的问题需要回答", - "发生了有趣的对话和深入交流", - "出现了新的话题或重要信息", - "用户表现出明显的情绪变化", - "涉及重要的决定或选择" - ] - try: - for i, pattern in enumerate(patterns): - vector = await get_embedding(pattern) - if vector: - self.importance_patterns.append( - MemoryImportancePattern( - description=pattern, - vector=vector, - threshold=0.55 + i * 0.01 # 动态阈值 - ) - ) - - logger.info(f"加载了 {len(self.importance_patterns)} 个重要性判断模式") + # 计算过期时间戳 + expire_time = time.time() - (self.retention_hours * 3600) + + # 查询所有记录 + all_results = self.collection.get( + where={"chat_id": self.chat_id}, + include=["metadatas"] + ) + + # 找出过期的记录ID + expired_ids = [] + metadatas = all_results.get("metadatas") or [] + ids = all_results.get("ids") or [] + + for i, metadata in enumerate(metadatas): + if metadata and isinstance(metadata, dict): + timestamp = metadata.get("timestamp", 0) + if isinstance(timestamp, (int, float)) and timestamp < expire_time: + if i < len(ids): + expired_ids.append(ids[i]) + + # 批量删除过期记录 + if expired_ids: + self.collection.delete(ids=expired_ids) + logger.info(f"清理了 {len(expired_ids)} 条过期聊天记录") + except Exception as e: - logger.error(f"加载重要性模式失败: {e}") + logger.error(f"清理过期记录失败: {e}") - async def should_create_memory(self, chat_history: str) -> Tuple[bool, float]: - """向量化判断是否需要创建记忆 + async def store_message(self, content: str, sender: str = "user") -> bool: + """ + 存储聊天消息到向量库 Args: - chat_history: 聊天历史 + content: 消息内容 + sender: 发送者 Returns: - (是否需要记忆, 重要性分数) + bool: 是否存储成功 """ - if not chat_history.strip(): - return False, 0.0 - - await self._load_importance_patterns() - - try: - # 获取聊天历史的向量表示 - history_vector = await get_embedding(chat_history[-500:]) # 只取最后500字符 - if not history_vector: - return False, 0.0 - - # 与重要性模式向量计算相似度 - max_score = 0.0 - best_pattern = None - - for pattern in self.importance_patterns: - similarity = self._cosine_similarity(history_vector, pattern.vector) - if similarity > max_score: - max_score = similarity - best_pattern = pattern - - should_remember = max_score > 0.6 # 基础阈值 - - if should_remember and best_pattern: - logger.debug(f"触发记忆模式: {best_pattern.description} (相似度: {max_score:.3f})") - - return should_remember, max_score - - except Exception as e: - logger.error(f"向量化判断记忆重要性失败: {e}") - return False, 0.0 - - def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float: - """计算余弦相似度""" - try: - v1 = np.array(vec1) - v2 = np.array(vec2) - - dot_product = np.dot(v1, v2) - norms = np.linalg.norm(v1) * np.linalg.norm(v2) - - if norms == 0: - return 0.0 - - return dot_product / norms - - except Exception as e: - logger.error(f"计算余弦相似度失败: {e}") - return 0.0 - - def _extract_key_content(self, chat_history: str) -> str: - """快速提取关键内容(避免LLM调用)""" - lines = chat_history.strip().split('\n') - - # 简单规则:取最后几行非空对话 - key_lines = [] - for line in reversed(lines): - if line.strip() and ':' in line: # 包含发言者格式 - key_lines.insert(0, line.strip()) - if len(key_lines) >= 3: # 最多3行 - break - - return '\n'.join(key_lines) if key_lines else chat_history[-200:] - - async def _is_duplicate_memory(self, content: str) -> bool: - """检查是否为重复记忆""" - if not self.collection: + if not self.collection or not content.strip(): return False try: - content_vector = await get_embedding(content) - if not content_vector: + # 生成消息向量 + message_vector = await get_embedding(content) + if not message_vector: + logger.warning(f"消息向量生成失败: {content[:50]}...") return False - # 查询最相似的记忆 - results = self.collection.query( - query_embeddings=[content_vector], - n_results=1 + # 生成唯一消息ID + message_id = f"{self.chat_id}_{int(time.time() * 1000)}_{hash(content) % 10000}" + + # 创建消息对象 + message = ChatMessage( + message_id=message_id, + chat_id=self.chat_id, + content=content, + timestamp=time.time(), + sender=sender ) - if results['distances'] and results['distances'][0]: - similarity = 1 - results['distances'][0][0] # ChromaDB用距离,转换为相似度 - return similarity > 0.85 # 85%相似度认为重复 - - except Exception as e: - logger.error(f"检查重复记忆失败: {e}") - - return False - - async def create_and_store_memory(self, chat_history: str): - """创建并存储向量化记忆""" - try: - # 1. 向量化判断重要性 - should_store, importance_score = await self.should_create_memory(chat_history) - - if not should_store: - logger.debug("聊天内容不需要记忆") - return - - # 2. 提取关键内容 - key_content = self._extract_key_content(chat_history) - - # 3. 检查重复 - if await self._is_duplicate_memory(key_content): - logger.debug("发现重复记忆,跳过存储") - return - - # 4. 向量化存储 - await self._store_vector_memory(key_content, importance_score) - - logger.info(f"成功存储向量记忆 (重要性: {importance_score:.3f}): {key_content[:50]}...") - - except Exception as e: - logger.error(f"创建向量记忆失败: {e}") - - async def _store_vector_memory(self, content: str, importance: float): - """存储向量化记忆""" - if not self.collection: - logger.warning("ChromaDB未初始化,无法存储向量记忆") - return - - try: - # 生成向量 - content_vector = await get_embedding(content) - if not content_vector: - logger.error("生成记忆向量失败") - return - - # 生成唯一ID - memory_id = f"{self.chat_id}_{int(time.time() * 1000)}" - # 存储到ChromaDB self.collection.add( - embeddings=[content_vector], + embeddings=[message_vector], documents=[content], metadatas=[{ - "chat_id": self.chat_id, - "timestamp": time.time(), - "importance": importance, - "type": "instant_memory" + "message_id": message.message_id, + "chat_id": message.chat_id, + "timestamp": message.timestamp, + "sender": message.sender, + "message_type": message.message_type }], - ids=[memory_id] + ids=[message_id] ) - # 同时存储到原数据库(保持兼容性) - await self._store_to_db(content, importance) + logger.debug(f"消息已存储: {content[:50]}...") + return True except Exception as e: - logger.error(f"存储向量记忆到ChromaDB失败: {e}") + logger.error(f"存储消息失败: {e}") + return False - async def _store_to_db(self, content: str, importance: float): - """存储到原数据库表""" - try: - with get_db_session() as session: - memory = Memory( - memory_id=f"{self.chat_id}_{int(time.time() * 1000)}", - chat_id=self.chat_id, - memory_text=content, - keywords=[], # 向量版本不需要关键词 - create_time=time.time(), - last_view_time=time.time() - ) - session.add(memory) - session.commit() - except Exception as e: - logger.error(f"存储记忆到数据库失败: {e}") - - async def get_memory(self, target: str) -> Optional[str]: - """向量化检索相关记忆""" - if not self.collection: - return await self._fallback_get_memory(target) + async def find_similar_messages(self, query: str, top_k: int = 5, similarity_threshold: float = 0.7) -> List[Dict[str, Any]]: + """ + 查找与查询相似的历史消息 + + Args: + query: 查询内容 + top_k: 返回的最相似消息数量 + similarity_threshold: 相似度阈值 + + Returns: + List[Dict]: 相似消息列表,包含content、similarity、timestamp等信息 + """ + if not self.collection or not query.strip(): + return [] try: - target_vector = await get_embedding(target) - if not target_vector: - return None + # 生成查询向量 + query_vector = await get_embedding(query) + if not query_vector: + return [] # 向量相似度搜索 results = self.collection.query( - query_embeddings=[target_vector], - n_results=3, # 取前3个最相关的 + query_embeddings=[query_vector], + n_results=top_k, where={"chat_id": self.chat_id} ) if not results['documents'] or not results['documents'][0]: - return None + return [] - # 返回最相关的记忆 - best_memory = results['documents'][0][0] - distance = results['distances'][0][0] if results['distances'] else 1.0 - similarity = 1 - distance + # 处理搜索结果 + similar_messages = [] + documents = results['documents'][0] + distances = results['distances'][0] if results['distances'] else [] + metadatas = results['metadatas'][0] if results['metadatas'] else [] - if similarity > 0.7: # 70%相似度阈值 - logger.debug(f"找到相关记忆 (相似度: {similarity:.3f}): {best_memory[:50]}...") - return best_memory + for i, doc in enumerate(documents): + # 计算相似度(ChromaDB返回距离,需转换) + distance = distances[i] if i < len(distances) else 1.0 + similarity = 1 - distance + + # 过滤低相似度结果 + if similarity < similarity_threshold: + continue + + # 获取元数据 + metadata = metadatas[i] if i < len(metadatas) else {} + + # 安全获取timestamp + timestamp = metadata.get("timestamp", 0) if isinstance(metadata, dict) else 0 + timestamp = float(timestamp) if isinstance(timestamp, (int, float)) else 0.0 + + similar_messages.append({ + "content": doc, + "similarity": similarity, + "timestamp": timestamp, + "sender": metadata.get("sender", "unknown") if isinstance(metadata, dict) else "unknown", + "message_id": metadata.get("message_id", "") if isinstance(metadata, dict) else "", + "time_ago": self._format_time_ago(timestamp) + }) - return None + # 按相似度排序 + similar_messages.sort(key=lambda x: x["similarity"], reverse=True) + + logger.debug(f"找到 {len(similar_messages)} 条相似消息 (查询: {query[:30]}...)") + return similar_messages except Exception as e: - logger.error(f"向量检索记忆失败: {e}") - return await self._fallback_get_memory(target) + logger.error(f"查找相似消息失败: {e}") + return [] - async def _fallback_get_memory(self, target: str) -> Optional[str]: - """回退到数据库检索""" - try: - with get_db_session() as session: - query = session.execute(select(Memory).where( - Memory.chat_id == self.chat_id - ).order_by(Memory.create_time.desc()).limit(10)).scalars() - - memories = list(query) - - # 简单的关键词匹配 - for memory in memories: - if any(word in memory.memory_text for word in target.split() if len(word) > 1): - return memory.memory_text - - return memories[0].memory_text if memories else None + def _format_time_ago(self, timestamp: float) -> str: + """格式化时间差显示""" + if timestamp <= 0: + return "未知时间" - except Exception as e: - logger.error(f"回退检索记忆失败: {e}") - return None + try: + now = time.time() + diff = now - timestamp + + if diff < 60: + return f"{int(diff)}秒前" + elif diff < 3600: + return f"{int(diff/60)}分钟前" + elif diff < 86400: + return f"{int(diff/3600)}小时前" + else: + return f"{int(diff/86400)}天前" + except: + return "时间格式错误" + + async def get_memory_for_context(self, current_message: str, context_size: int = 3) -> str: + """ + 获取与当前消息相关的记忆上下文 + + Args: + current_message: 当前消息 + context_size: 上下文消息数量 + + Returns: + str: 格式化的记忆上下文 + """ + similar_messages = await self.find_similar_messages( + current_message, + top_k=context_size, + similarity_threshold=0.6 # 降低阈值以获得更多上下文 + ) + + if not similar_messages: + return "" + + # 格式化上下文 + context_lines = [] + for msg in similar_messages: + context_lines.append( + f"[{msg['time_ago']}] {msg['sender']}: {msg['content']} (相似度: {msg['similarity']:.2f})" + ) + + return "相关的历史记忆:\n" + "\n".join(context_lines) def get_stats(self) -> Dict[str, Any]: - """获取记忆统计信息""" + """获取记忆系统统计信息""" stats = { "chat_id": self.chat_id, - "vector_enabled": self.collection is not None, - "total_memories": 0, - "importance_patterns": len(self.importance_patterns) + "retention_hours": self.retention_hours, + "cleanup_interval": self.cleanup_interval, + "system_status": "running" if self.is_running else "stopped", + "total_messages": 0, + "db_status": "connected" if self.collection else "disconnected" } if self.collection: try: result = self.collection.count() - stats["total_memories"] = result + stats["total_messages"] = result except: - pass - - return stats \ No newline at end of file + stats["total_messages"] = "查询失败" + + return stats + + def stop(self): + """停止记忆系统""" + self.is_running = False + if self.cleanup_task and self.cleanup_task.is_alive(): + logger.info("正在停止定时清理任务...") + logger.info(f"向量瞬时记忆系统已停止: {self.chat_id}") + + +# 为了兼容现有代码,提供工厂函数 +def create_vector_memory_v2(chat_id: str, retention_hours: int = 24) -> VectorInstantMemoryV2: + """创建向量瞬时记忆系统V2实例""" + return VectorInstantMemoryV2(chat_id, retention_hours) + + +# 使用示例 +async def demo(): + """使用演示""" + memory = VectorInstantMemoryV2("demo_chat") + + # 存储一些测试消息 + await memory.store_message("今天天气不错,出去散步了", "用户") + await memory.store_message("刚才买了个冰淇淋,很好吃", "用户") + await memory.store_message("明天要开会,有点紧张", "用户") + + # 查找相似消息 + similar = await memory.find_similar_messages("天气怎么样") + print("相似消息:", similar) + + # 获取上下文 + context = await memory.get_memory_for_context("今天心情如何") + print("记忆上下文:", context) + + # 查看统计信息 + stats = memory.get_stats() + print("系统状态:", stats) + + memory.stop() + + +if __name__ == "__main__": + asyncio.run(demo()) \ No newline at end of file diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index 1e775e030..9a9f35c34 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -25,7 +25,7 @@ from src.chat.utils.chat_message_builder import ( ) from src.chat.express.expression_selector import expression_selector from src.chat.memory_system.memory_activator import MemoryActivator -from src.chat.memory_system.vector_instant_memory import VectorInstantMemory +from src.chat.memory_system.hybrid_instant_memory import HybridInstantMemory from src.mood.mood_manager import mood_manager from src.person_info.relationship_fetcher import relationship_fetcher_manager from src.person_info.person_info import get_person_info_manager @@ -226,7 +226,11 @@ class DefaultReplyer: self.heart_fc_sender = HeartFCSender() self.memory_activator = MemoryActivator() - self.instant_memory = VectorInstantMemory(chat_id=self.chat_stream.stream_id) + # 使用混合瞬时记忆系统V2,支持自定义保留时间 + self.instant_memory = HybridInstantMemory( + chat_id=self.chat_stream.stream_id, + retention_hours=1 + ) from src.plugin_system.core.tool_use import ToolExecutor # 延迟导入ToolExecutor,不然会循环依赖 @@ -470,23 +474,42 @@ class DefaultReplyer: ) if global_config.memory.enable_instant_memory: + # 异步存储聊天历史到混合记忆系统 asyncio.create_task(self.instant_memory.create_and_store_memory(chat_history)) - instant_memory_list = await self.instant_memory.get_memory(target) - instant_memory = instant_memory_list[0] if instant_memory_list else None - logger.info(f"即时记忆:{instant_memory}") + # 从混合记忆系统获取相关记忆 + instant_memory_result = await self.instant_memory.get_memory(target) + + # 处理不同类型的返回结果 + instant_memory = None + if isinstance(instant_memory_result, list) and instant_memory_result: + instant_memory = instant_memory_result[0] + elif isinstance(instant_memory_result, str) and instant_memory_result: + instant_memory = instant_memory_result + + logger.info(f"混合瞬时记忆:{instant_memory}") - if not running_memories: - return "" + # 构建记忆字符串,即使某种记忆为空也要继续 + memory_str = "" + has_any_memory = False - memory_str = "以下是当前在聊天中,你回忆起的记忆:\n" - for running_memory in running_memories: - memory_str += f"- {running_memory['content']}\n" + # 添加长期记忆 + if running_memories: + if not memory_str: + memory_str = "以下是当前在聊天中,你回忆起的记忆:\n" + for running_memory in running_memories: + memory_str += f"- {running_memory['content']}\n" + has_any_memory = True + # 添加瞬时记忆 if instant_memory: + if not memory_str: + memory_str = "以下是当前在聊天中,你回忆起的记忆:\n" memory_str += f"- {instant_memory}\n" + has_any_memory = True - return memory_str + # 只有当完全没有任何记忆时才返回空字符串 + return memory_str if has_any_memory else "" async def build_tool_info(self, chat_history: str, reply_to: str = "", enable_tool: bool = True) -> str: """构建工具信息块 @@ -1154,7 +1177,7 @@ class DefaultReplyer: """构建单个发送消息""" bot_user_info = UserInfo( - user_id=global_config.bot.qq_account, + user_id=str(global_config.bot.qq_account), user_nickname=global_config.bot.nickname, platform=self.chat_stream.platform, ) diff --git a/src/llm_models/model_client/aiohttp_gemini_client.py b/src/llm_models/model_client/aiohttp_gemini_client.py index d35f54618..6f8415952 100644 --- a/src/llm_models/model_client/aiohttp_gemini_client.py +++ b/src/llm_models/model_client/aiohttp_gemini_client.py @@ -475,6 +475,7 @@ class AiohttpGeminiClient(BaseClient): # 直接重抛项目定义的异常 raise except Exception as e: + logger.debug(e) # 其他异常转换为网络连接错误 raise NetworkConnectionError() from e diff --git a/src/plugins/built_in/maizone_refactored/services/cookie_service.py b/src/plugins/built_in/maizone_refactored/services/cookie_service.py index aee9cd1e2..d4587cda8 100644 --- a/src/plugins/built_in/maizone_refactored/services/cookie_service.py +++ b/src/plugins/built_in/maizone_refactored/services/cookie_service.py @@ -118,4 +118,4 @@ class CookieService: return cookies logger.error("所有Cookie获取方法均失败。") - return None \ No newline at end of file + return None diff --git a/src/plugins/built_in/maizone_refactored/services/qzone_service.py b/src/plugins/built_in/maizone_refactored/services/qzone_service.py index 2fd956888..90b6982af 100644 --- a/src/plugins/built_in/maizone_refactored/services/qzone_service.py +++ b/src/plugins/built_in/maizone_refactored/services/qzone_service.py @@ -312,11 +312,13 @@ class QZoneService: raise RuntimeError(f"无法连接到Napcat服务: 超过最大重试次数({max_retries})") async def _get_api_client(self, qq_account: str, stream_id: Optional[str]) -> Optional[Dict]: - cookies = await self._renew_and_load_cookies(qq_account, stream_id) - if not cookies: return None + cookies = await self.cookie_service.get_cookies(qq_account, stream_id) + if not cookies: + return None p_skey = cookies.get('p_skey') or cookies.get('p_skey'.upper()) - if not p_skey: return None + if not p_skey: + return None gtk = self._generate_gtk(p_skey) uin = cookies.get('uin', '').lstrip('o')