diff --git a/scripts/rebuild_metadata_index.py b/scripts/rebuild_metadata_index.py new file mode 100644 index 000000000..f5cf652ab --- /dev/null +++ b/scripts/rebuild_metadata_index.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +从现有ChromaDB数据重建JSON元数据索引 +""" +import asyncio +import sys +import os + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from src.chat.memory_system.memory_system import MemorySystem +from src.chat.memory_system.memory_metadata_index import MemoryMetadataIndexEntry +from src.common.logger import get_logger + +logger = get_logger(__name__) + +async def rebuild_metadata_index(): + """从ChromaDB重建元数据索引""" + print("="*80) + print("重建JSON元数据索引") + print("="*80) + + # 初始化记忆系统 + print("\n🔧 初始化记忆系统...") + ms = MemorySystem() + await ms.initialize() + print("✅ 记忆系统已初始化") + + if not hasattr(ms.unified_storage, 'metadata_index'): + print("❌ 元数据索引管理器未初始化") + return + + # 获取所有记忆 + print("\n📥 从ChromaDB获取所有记忆...") + from src.common.vector_db import vector_db_service + + try: + # 获取集合中的所有记忆ID + collection_name = ms.unified_storage.config.memory_collection + result = vector_db_service.get( + collection_name=collection_name, + include=["documents", "metadatas", "embeddings"] + ) + + if not result or not result.get("ids"): + print("❌ ChromaDB中没有找到记忆数据") + return + + ids = result["ids"] + metadatas = result.get("metadatas", []) + + print(f"✅ 找到 {len(ids)} 条记忆") + + # 重建元数据索引 + print("\n🔨 开始重建元数据索引...") + entries = [] + success_count = 0 + + for i, (memory_id, metadata) in enumerate(zip(ids, metadatas), 1): + try: + # 从ChromaDB元数据重建索引条目 + import orjson + + entry = MemoryMetadataIndexEntry( + memory_id=memory_id, + user_id=metadata.get("user_id", "unknown"), + memory_type=metadata.get("memory_type", "general"), + subjects=orjson.loads(metadata.get("subjects", "[]")), + objects=[metadata.get("object")] if metadata.get("object") else [], + keywords=orjson.loads(metadata.get("keywords", "[]")), + tags=orjson.loads(metadata.get("tags", "[]")), + importance=2, # 默认NORMAL + confidence=2, # 默认MEDIUM + created_at=metadata.get("created_at", 0.0), + access_count=metadata.get("access_count", 0), + chat_id=metadata.get("chat_id"), + content_preview=None + ) + + # 尝试解析importance和confidence的枚举名称 + if "importance" in metadata: + imp_str = metadata["importance"] + if imp_str == "LOW": + entry.importance = 1 + elif imp_str == "NORMAL": + entry.importance = 2 + elif imp_str == "HIGH": + entry.importance = 3 + elif imp_str == "CRITICAL": + entry.importance = 4 + + if "confidence" in metadata: + conf_str = metadata["confidence"] + if conf_str == "LOW": + entry.confidence = 1 + elif conf_str == "MEDIUM": + entry.confidence = 2 + elif conf_str == "HIGH": + entry.confidence = 3 + elif conf_str == "VERIFIED": + entry.confidence = 4 + + entries.append(entry) + success_count += 1 + + if i % 100 == 0: + print(f" 处理进度: {i}/{len(ids)} ({success_count} 成功)") + + except Exception as e: + logger.warning(f"处理记忆 {memory_id} 失败: {e}") + continue + + print(f"\n✅ 成功解析 {success_count}/{len(ids)} 条记忆元数据") + + # 批量更新索引 + print("\n💾 保存元数据索引...") + ms.unified_storage.metadata_index.batch_add_or_update(entries) + ms.unified_storage.metadata_index.save() + + # 显示统计信息 + stats = ms.unified_storage.metadata_index.get_stats() + print(f"\n📊 重建后的索引统计:") + print(f" - 总记忆数: {stats['total_memories']}") + print(f" - 主语数量: {stats['subjects_count']}") + print(f" - 关键词数量: {stats['keywords_count']}") + print(f" - 标签数量: {stats['tags_count']}") + print(f" - 类型分布:") + for mtype, count in stats['types'].items(): + print(f" - {mtype}: {count}") + + print("\n✅ 元数据索引重建完成!") + + except Exception as e: + logger.error(f"重建索引失败: {e}", exc_info=True) + print(f"❌ 重建索引失败: {e}") + +if __name__ == '__main__': + asyncio.run(rebuild_metadata_index()) diff --git a/scripts/run_multi_stage_smoke.py b/scripts/run_multi_stage_smoke.py new file mode 100644 index 000000000..bfb3c417f --- /dev/null +++ b/scripts/run_multi_stage_smoke.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +轻量烟雾测试:初始化 MemorySystem 并运行一次检索,验证 MemoryMetadata.source 访问不再报错 +""" +import asyncio +import sys +import os + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from src.chat.memory_system.memory_system import MemorySystem + +async def main(): + ms = MemorySystem() + await ms.initialize() + results = await ms.retrieve_relevant_memories(query_text="测试查询:杰瑞喵喜欢什么?", limit=3) + print(f"检索到 {len(results)} 条记忆(如果 >0 则表明运行成功)") + for i, m in enumerate(results, 1): + print(f"{i}. id={m.metadata.memory_id} source={getattr(m.metadata, 'source', None)}") + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/src/chat/memory_system/__init__.py b/src/chat/memory_system/__init__.py index 814017e41..75daf0fb2 100644 --- a/src/chat/memory_system/__init__.py +++ b/src/chat/memory_system/__init__.py @@ -51,14 +51,6 @@ from .enhanced_memory_activator import ( enhanced_memory_activator ) -# 格式化器 -from .memory_formatter import ( - MemoryFormatter, - FormatterConfig, - format_memories_for_llm, - format_memories_bracket_style -) - # 兼容性别名 from .memory_chunk import MemoryChunk as Memory @@ -98,12 +90,6 @@ __all__ = [ "MemoryActivator", "memory_activator", "enhanced_memory_activator", # 兼容性别名 - - # 格式化器 - "MemoryFormatter", - "FormatterConfig", - "format_memories_for_llm", - "format_memories_bracket_style", ] # 版本信息 diff --git a/src/chat/memory_system/memory_builder.py b/src/chat/memory_system/memory_builder.py index 774c26f10..da7dcb043 100644 --- a/src/chat/memory_system/memory_builder.py +++ b/src/chat/memory_system/memory_builder.py @@ -96,19 +96,8 @@ class MemoryBuilder: try: logger.debug(f"开始从对话构建记忆,文本长度: {len(conversation_text)}") - # 预处理文本 - processed_text = self._preprocess_text(conversation_text) - - # 确定提取策略 - strategy = self._determine_extraction_strategy(processed_text, context) - - # 根据策略提取记忆 - if strategy == ExtractionStrategy.LLM_BASED: - memories = await self._extract_with_llm(processed_text, context, user_id, timestamp) - elif strategy == ExtractionStrategy.RULE_BASED: - memories = self._extract_with_rules(processed_text, context, user_id, timestamp) - else: # HYBRID - memories = await self._extract_with_hybrid(processed_text, context, user_id, timestamp) + # 使用LLM提取记忆 + memories = await self._extract_with_llm(conversation_text, context, user_id, timestamp) # 后处理和验证 validated_memories = self._validate_and_enhance_memories(memories, context) @@ -129,41 +118,6 @@ class MemoryBuilder: self.extraction_stats["failed_extractions"] += 1 raise - def _preprocess_text(self, text: str) -> str: - """预处理文本""" - # 移除多余的空白字符 - text = re.sub(r'\s+', ' ', text.strip()) - - # 移除特殊字符,但保留基本标点 - text = re.sub(r'[^\w\s\u4e00-\u9fff,。!?、;:""''()【】]', '', text) - - # 截断过长的文本 - if len(text) > 2000: - text = text[:2000] + "..." - - return text - - def _determine_extraction_strategy(self, text: str, context: Dict[str, Any]) -> ExtractionStrategy: - """确定提取策略""" - text_length = len(text) - has_structured_data = any(key in context for key in ["structured_data", "entities", "keywords"]) - message_type = context.get("message_type", "normal") - - # 短文本使用规则提取 - if text_length < 50: - return ExtractionStrategy.RULE_BASED - - # 包含结构化数据使用混合策略 - if has_structured_data: - return ExtractionStrategy.HYBRID - - # 系统消息或命令使用规则提取 - if message_type in ["command", "system"]: - return ExtractionStrategy.RULE_BASED - - # 默认使用LLM提取 - return ExtractionStrategy.LLM_BASED - async def _extract_with_llm( self, text: str, @@ -190,79 +144,10 @@ class MemoryBuilder: logger.error(f"LLM提取失败: {e}") raise MemoryExtractionError(str(e)) from e - def _extract_with_rules( - self, - text: str, - context: Dict[str, Any], - user_id: str, - timestamp: float - ) -> List[MemoryChunk]: - """使用规则提取记忆""" - memories = [] - - subjects = self._resolve_conversation_participants(context, user_id) - - # 规则1: 检测个人信息 - personal_info = self._extract_personal_info(text, user_id, timestamp, context, subjects) - memories.extend(personal_info) - - # 规则2: 检测偏好信息 - preferences = self._extract_preferences(text, user_id, timestamp, context, subjects) - memories.extend(preferences) - - # 规则3: 检测事件信息 - events = self._extract_events(text, user_id, timestamp, context, subjects) - memories.extend(events) - - return memories - - async def _extract_with_hybrid( - self, - text: str, - context: Dict[str, Any], - user_id: str, - timestamp: float - ) -> List[MemoryChunk]: - """混合策略提取记忆""" - all_memories = [] - - # 首先使用规则提取 - rule_memories = self._extract_with_rules(text, context, user_id, timestamp) - all_memories.extend(rule_memories) - - # 然后使用LLM提取 - llm_memories = await self._extract_with_llm(text, context, user_id, timestamp) - - # 合并和去重 - final_memories = self._merge_hybrid_results(all_memories, llm_memories) - - return final_memories - def _build_llm_extraction_prompt(self, text: str, context: Dict[str, Any]) -> str: """构建LLM提取提示""" current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - chat_id = context.get("chat_id", "unknown") message_type = context.get("message_type", "normal") - target_user_id = context.get("user_id", "用户") - target_user_id = str(target_user_id) - - target_user_name = ( - context.get("user_display_name") - or context.get("user_name") - or context.get("nickname") - or context.get("sender_name") - ) - if isinstance(target_user_name, str): - target_user_name = target_user_name.strip() - else: - target_user_name = "" - - if not target_user_name or self._looks_like_system_identifier(target_user_name): - target_user_name = "该用户" - - target_user_id_display = target_user_id - if self._looks_like_system_identifier(target_user_id_display): - target_user_id_display = "(系统ID,勿写入记忆)" bot_name = context.get("bot_name") bot_identity = context.get("bot_identity") @@ -966,145 +851,6 @@ class MemoryBuilder: return f"{subject_phrase}{predicate}".strip() return subject_phrase - def _extract_personal_info( - self, - text: str, - user_id: str, - timestamp: float, - context: Dict[str, Any], - subjects: List[str] - ) -> List[MemoryChunk]: - """提取个人信息""" - memories = [] - - # 常见个人信息模式 - patterns = { - r"我叫(\w+)": ("is_named", {"name": "$1"}), - r"我今年(\d+)岁": ("is_age", {"age": "$1"}), - r"我是(\w+)": ("is_profession", {"profession": "$1"}), - r"我住在(\w+)": ("lives_in", {"location": "$1"}), - r"我的电话是(\d+)": ("has_phone", {"phone": "$1"}), - r"我的邮箱是(\w+@\w+\.\w+)": ("has_email", {"email": "$1"}), - } - - for pattern, (predicate, obj_template) in patterns.items(): - match = re.search(pattern, text) - if match: - obj = obj_template - for i, group in enumerate(match.groups(), 1): - obj = {k: v.replace(f"${i}", group) for k, v in obj.items()} - - memory = create_memory_chunk( - user_id=user_id, - subject=subjects, - predicate=predicate, - obj=obj, - memory_type=MemoryType.PERSONAL_FACT, - chat_id=context.get("chat_id"), - importance=ImportanceLevel.HIGH, - confidence=ConfidenceLevel.HIGH, - display=self._compose_display_text(subjects, predicate, obj) - ) - - memories.append(memory) - - return memories - - def _extract_preferences( - self, - text: str, - user_id: str, - timestamp: float, - context: Dict[str, Any], - subjects: List[str] - ) -> List[MemoryChunk]: - """提取偏好信息""" - memories = [] - - # 偏好模式 - preference_patterns = [ - (r"我喜欢(.+)", "likes"), - (r"我不喜欢(.+)", "dislikes"), - (r"我爱吃(.+)", "likes_food"), - (r"我讨厌(.+)", "hates"), - (r"我最喜欢的(.+)", "favorite_is"), - ] - - for pattern, predicate in preference_patterns: - match = re.search(pattern, text) - if match: - memory = create_memory_chunk( - user_id=user_id, - subject=subjects, - predicate=predicate, - obj=match.group(1), - memory_type=MemoryType.PREFERENCE, - chat_id=context.get("chat_id"), - importance=ImportanceLevel.NORMAL, - confidence=ConfidenceLevel.MEDIUM, - display=self._compose_display_text(subjects, predicate, match.group(1)) - ) - - memories.append(memory) - - return memories - - def _extract_events( - self, - text: str, - user_id: str, - timestamp: float, - context: Dict[str, Any], - subjects: List[str] - ) -> List[MemoryChunk]: - """提取事件信息""" - memories = [] - - # 事件关键词 - event_keywords = ["明天", "今天", "昨天", "上周", "下周", "约会", "会议", "活动", "旅行", "生日"] - - if any(keyword in text for keyword in event_keywords): - memory = create_memory_chunk( - user_id=user_id, - subject=subjects, - predicate="mentioned_event", - obj={"event_text": text, "timestamp": timestamp}, - memory_type=MemoryType.EVENT, - chat_id=context.get("chat_id"), - importance=ImportanceLevel.NORMAL, - confidence=ConfidenceLevel.MEDIUM, - display=self._compose_display_text(subjects, "mentioned_event", text) - ) - - memories.append(memory) - - return memories - - def _merge_hybrid_results( - self, - rule_memories: List[MemoryChunk], - llm_memories: List[MemoryChunk] - ) -> List[MemoryChunk]: - """合并混合策略结果""" - all_memories = rule_memories.copy() - - # 添加LLM记忆,避免重复 - for llm_memory in llm_memories: - is_duplicate = False - for rule_memory in rule_memories: - if llm_memory.is_similar_to(rule_memory, threshold=0.7): - is_duplicate = True - # 合并置信度 - rule_memory.metadata.confidence = ConfidenceLevel( - max(rule_memory.metadata.confidence.value, llm_memory.metadata.confidence.value) - ) - break - - if not is_duplicate: - all_memories.append(llm_memory) - - return all_memories - def _validate_and_enhance_memories( self, memories: List[MemoryChunk], diff --git a/src/chat/memory_system/memory_chunk.py b/src/chat/memory_system/memory_chunk.py index 54138412d..0ddf6bd0f 100644 --- a/src/chat/memory_system/memory_chunk.py +++ b/src/chat/memory_system/memory_chunk.py @@ -127,6 +127,8 @@ class MemoryMetadata: # 来源信息 source_context: Optional[str] = None # 来源上下文片段 + # 兼容旧字段: 一些代码或旧版本可能直接访问 metadata.source + source: Optional[str] = None def __post_init__(self): """后初始化处理""" @@ -150,6 +152,19 @@ class MemoryMetadata: if self.last_forgetting_check == 0: self.last_forgetting_check = current_time + # 兼容性:如果旧字段 source 被使用,保证 source 与 source_context 同步 + if not getattr(self, 'source', None) and getattr(self, 'source_context', None): + try: + self.source = str(self.source_context) + except Exception: + self.source = None + # 如果有 source 字段但 source_context 为空,也同步回去 + if not getattr(self, 'source_context', None) and getattr(self, 'source', None): + try: + self.source_context = str(self.source) + except Exception: + self.source_context = None + def update_access(self): """更新访问信息""" current_time = time.time() diff --git a/src/chat/memory_system/memory_formatter.py b/src/chat/memory_system/memory_formatter.py deleted file mode 100644 index 87339c823..000000000 --- a/src/chat/memory_system/memory_formatter.py +++ /dev/null @@ -1,331 +0,0 @@ -# -*- coding: utf-8 -*- -""" -记忆格式化器 -将召回的记忆转化为LLM友好的Markdown格式 -""" - -from typing import List, Dict, Any, Optional -from datetime import datetime -from dataclasses import dataclass - -from src.common.logger import get_logger -from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType - -logger = get_logger(__name__) - - -@dataclass -class FormatterConfig: - """格式化器配置""" - include_timestamps: bool = True # 是否包含时间信息 - include_memory_types: bool = True # 是否包含记忆类型 - include_confidence: bool = False # 是否包含置信度信息 - max_display_length: int = 200 # 单条记忆最大显示长度 - datetime_format: str = "%Y年%m月%d日" # 时间格式 - use_emoji_icons: bool = True # 是否使用emoji图标 - group_by_type: bool = False # 是否按类型分组 - use_bracket_format: bool = False # 是否使用方括号格式 [类型] 内容 - compact_format: bool = False # 是否使用紧凑格式 - - -class MemoryFormatter: - """记忆格式化器 - 将记忆转化为提示词友好的格式""" - - # 记忆类型对应的emoji图标 - TYPE_EMOJI_MAP = { - MemoryType.PERSONAL_FACT: "👤", - MemoryType.EVENT: "📅", - MemoryType.PREFERENCE: "❤️", - MemoryType.OPINION: "💭", - MemoryType.RELATIONSHIP: "👥", - MemoryType.EMOTION: "😊", - MemoryType.KNOWLEDGE: "📚", - MemoryType.SKILL: "🛠️", - MemoryType.GOAL: "🎯", - MemoryType.EXPERIENCE: "🌟", - MemoryType.CONTEXTUAL: "💬" - } - - # 记忆类型的中文标签 - 优化格式 - TYPE_LABELS = { - MemoryType.PERSONAL_FACT: "个人事实", - MemoryType.EVENT: "事件", - MemoryType.PREFERENCE: "偏好", - MemoryType.OPINION: "观点", - MemoryType.RELATIONSHIP: "关系", - MemoryType.EMOTION: "情感", - MemoryType.KNOWLEDGE: "知识", - MemoryType.SKILL: "技能", - MemoryType.GOAL: "目标", - MemoryType.EXPERIENCE: "经验", - MemoryType.CONTEXTUAL: "上下文" - } - - def __init__(self, config: Optional[FormatterConfig] = None): - self.config = config or FormatterConfig() - - def format_memories_for_prompt( - self, - memories: List[MemoryChunk], - query_context: Optional[str] = None - ) -> str: - """ - 将记忆列表格式化为LLM提示词 - - Args: - memories: 记忆列表 - query_context: 查询上下文(可选) - - Returns: - 格式化的Markdown文本 - """ - if not memories: - return "" - - lines = ["## 🧠 相关记忆回顾", ""] - - if self.config.group_by_type: - lines.extend(self._format_memories_by_type(memories)) - else: - lines.extend(self._format_memories_chronologically(memories)) - - return "\n".join(lines) - - def _format_memories_by_type(self, memories: List[MemoryChunk]) -> List[str]: - """按类型分组格式化记忆""" - # 按类型分组 - grouped_memories = {} - for memory in memories: - memory_type = memory.memory_type - if memory_type not in grouped_memories: - grouped_memories[memory_type] = [] - grouped_memories[memory_type].append(memory) - - lines = [] - - # 为每个类型生成格式化文本 - for memory_type, type_memories in grouped_memories.items(): - emoji = self.TYPE_EMOJI_MAP.get(memory_type, "📝") - label = self.TYPE_LABELS.get(memory_type, memory_type.value) - - lines.extend([ - f"### {emoji} {label}", - "" - ]) - - for memory in type_memories: - formatted_item = self._format_single_memory(memory, include_type=False) - lines.append(formatted_item) - - lines.append("") # 类型间空行 - - return lines - - def _format_memories_chronologically(self, memories: List[MemoryChunk]) -> List[str]: - """按时间顺序格式化记忆""" - lines = [] - - for i, memory in enumerate(memories, 1): - formatted_item = self._format_single_memory(memory, include_type=True, index=i) - lines.append(formatted_item) - - return lines - - def _format_single_memory( - self, - memory: MemoryChunk, - include_type: bool = True, - index: Optional[int] = None - ) -> str: - """格式化单条记忆""" - # 如果启用方括号格式,使用新格式 - if self.config.use_bracket_format: - return self._format_single_memory_bracket(memory) - - # 获取显示文本 - display_text = memory.display or memory.text_content - if len(display_text) > self.config.max_display_length: - display_text = display_text[:self.config.max_display_length - 3] + "..." - - # 构建前缀 - prefix_parts = [] - - # 添加序号 - if index is not None: - prefix_parts.append(f"{index}.") - - # 添加类型标签 - if include_type and self.config.include_memory_types: - if self.config.use_emoji_icons: - emoji = self.TYPE_EMOJI_MAP.get(memory.memory_type, "📝") - prefix_parts.append(f"**{emoji}") - else: - label = self.TYPE_LABELS.get(memory.memory_type, memory.memory_type.value) - prefix_parts.append(f"**[{label}]") - - # 添加时间信息 - if self.config.include_timestamps: - timestamp = memory.metadata.created_at - if timestamp > 0: - dt = datetime.fromtimestamp(timestamp) - time_str = dt.strftime(self.config.datetime_format) - if self.config.use_emoji_icons: - prefix_parts.append(f"⏰ {time_str}") - else: - prefix_parts.append(f"({time_str})") - - # 添加置信度信息 - if self.config.include_confidence: - confidence = memory.metadata.confidence.value - confidence_stars = "★" * confidence + "☆" * (4 - confidence) - prefix_parts.append(f"信度:{confidence_stars}") - - # 构建完整格式 - if prefix_parts: - if self.config.include_memory_types and self.config.use_emoji_icons: - prefix = " ".join(prefix_parts) + "** " - else: - prefix = " ".join(prefix_parts) + " " - return f"- {prefix}{display_text}" - else: - return f"- {display_text}" - - def _format_single_memory_bracket(self, memory: MemoryChunk) -> str: - """格式化单条记忆 - 使用方括号格式 [类型] 内容""" - # 获取显示文本 - display_text = memory.display or memory.text_content - - # 如果启用紧凑格式,只显示核心内容 - if self.config.compact_format: - if len(display_text) > self.config.max_display_length: - display_text = display_text[:self.config.max_display_length - 3] + "..." - else: - # 非紧凑格式可以包含时间信息 - if self.config.include_timestamps: - timestamp = memory.metadata.created_at - if timestamp > 0: - dt = datetime.fromtimestamp(timestamp) - time_str = dt.strftime("%Y年%m月%d日") - # 将时间信息自然地整合到内容中 - if "在" not in display_text and "当" not in display_text: - display_text = f"在{time_str},{display_text}" - - # 获取类型标签 - label = self.TYPE_LABELS.get(memory.memory_type, memory.memory_type.value) - - # 构建方括号格式: **[类型]** 内容 - return f"- **[{label}]** {display_text}" - - def format_memory_summary(self, memories: List[MemoryChunk]) -> str: - """生成记忆摘要统计""" - if not memories: - return "暂无相关记忆。" - - # 统计信息 - total_count = len(memories) - type_counts = {} - - for memory in memories: - memory_type = memory.memory_type - type_counts[memory_type] = type_counts.get(memory_type, 0) + 1 - - # 生成摘要 - lines = [f"**记忆摘要**: 共找到 {total_count} 条相关记忆"] - - if len(type_counts) > 1: - type_summaries = [] - for memory_type, count in type_counts.items(): - emoji = self.TYPE_EMOJI_MAP.get(memory_type, "📝") - label = self.TYPE_LABELS.get(memory_type, memory_type.value) - type_summaries.append(f"{emoji}{label} {count}条") - - lines.append(f"包括: {', '.join(type_summaries)}") - - return " | ".join(lines) - - def format_for_debug(self, memories: List[MemoryChunk]) -> str: - """生成调试格式的记忆列表""" - if not memories: - return "无记忆数据" - - lines = ["### 记忆调试信息", ""] - - for i, memory in enumerate(memories, 1): - lines.extend([ - f"**记忆 {i}** (ID: {memory.memory_id[:8]})", - f"- 类型: {memory.memory_type.value}", - f"- 内容: {memory.display[:100]}{'...' if len(memory.display) > 100 else ''}", - f"- 访问次数: {memory.metadata.access_count}", - f"- 置信度: {memory.metadata.confidence.value}/4", - f"- 重要性: {memory.metadata.importance.value}/4", - f"- 创建时间: {datetime.fromtimestamp(memory.metadata.created_at).strftime('%Y-%m-%d %H:%M')}", - "" - ]) - - return "\n".join(lines) - - -# 创建默认格式化器实例 -default_formatter = MemoryFormatter() - - -def format_memories_for_llm( - memories: List[MemoryChunk], - query_context: Optional[str] = None, - config: Optional[FormatterConfig] = None -) -> str: - """ - 便捷函数:将记忆格式化为LLM提示词 - """ - if config: - formatter = MemoryFormatter(config) - else: - formatter = default_formatter - - return formatter.format_memories_for_prompt(memories, query_context) - - -def format_memory_summary( - memories: List[MemoryChunk], - config: Optional[FormatterConfig] = None -) -> str: - """ - 便捷函数:生成记忆摘要 - """ - if config: - formatter = MemoryFormatter(config) - else: - formatter = default_formatter - - return formatter.format_memory_summary(memories) - - -def format_memories_bracket_style( - memories: List[MemoryChunk], - query_context: Optional[str] = None, - compact: bool = True, - include_timestamps: bool = True -) -> str: - """ - 便捷函数:使用方括号格式格式化记忆 - - Args: - memories: 记忆列表 - query_context: 查询上下文 - compact: 是否使用紧凑格式 - include_timestamps: 是否包含时间信息 - - Returns: - 格式化的Markdown文本 - """ - config = FormatterConfig( - use_bracket_format=True, - compact_format=compact, - include_timestamps=include_timestamps, - include_memory_types=True, - use_emoji_icons=False, - group_by_type=False - ) - - formatter = MemoryFormatter(config) - return formatter.format_memories_for_prompt(memories, query_context) \ No newline at end of file diff --git a/src/chat/memory_system/memory_metadata_index.py b/src/chat/memory_system/memory_metadata_index.py new file mode 100644 index 000000000..32104ffab --- /dev/null +++ b/src/chat/memory_system/memory_metadata_index.py @@ -0,0 +1,496 @@ +# -*- coding: utf-8 -*- +""" +记忆元数据索引管理器 +使用JSON文件存储记忆元数据,支持快速模糊搜索和过滤 +""" + +import orjson +import threading +from pathlib import Path +from typing import Dict, List, Optional, Set, Any +from dataclasses import dataclass, asdict +from datetime import datetime + +from src.common.logger import get_logger +from src.chat.memory_system.memory_chunk import MemoryType, ImportanceLevel, ConfidenceLevel + +logger = get_logger(__name__) + + +@dataclass +class MemoryMetadataIndexEntry: + """元数据索引条目(轻量级,只用于快速过滤)""" + memory_id: str + user_id: str + + # 分类信息 + memory_type: str # MemoryType.value + subjects: List[str] # 主语列表 + objects: List[str] # 宾语列表 + keywords: List[str] # 关键词列表 + tags: List[str] # 标签列表 + + # 数值字段(用于范围过滤) + importance: int # ImportanceLevel.value (1-4) + confidence: int # ConfidenceLevel.value (1-4) + created_at: float # 创建时间戳 + access_count: int # 访问次数 + + # 可选字段 + chat_id: Optional[str] = None + content_preview: Optional[str] = None # 内容预览(前100字符) + + +class MemoryMetadataIndex: + """记忆元数据索引管理器""" + + def __init__(self, index_file: str = "data/memory_metadata_index.json"): + self.index_file = Path(index_file) + self.index: Dict[str, MemoryMetadataIndexEntry] = {} # memory_id -> entry + + # 倒排索引(用于快速查找) + self.type_index: Dict[str, Set[str]] = {} # type -> {memory_ids} + self.subject_index: Dict[str, Set[str]] = {} # subject -> {memory_ids} + self.keyword_index: Dict[str, Set[str]] = {} # keyword -> {memory_ids} + self.tag_index: Dict[str, Set[str]] = {} # tag -> {memory_ids} + + self.lock = threading.RLock() + + # 加载已有索引 + self._load_index() + + def _load_index(self): + """从文件加载索引""" + if not self.index_file.exists(): + logger.info(f"元数据索引文件不存在,将创建新索引: {self.index_file}") + return + + try: + with open(self.index_file, 'rb') as f: + data = orjson.loads(f.read()) + + # 重建内存索引 + for entry_data in data.get('entries', []): + entry = MemoryMetadataIndexEntry(**entry_data) + self.index[entry.memory_id] = entry + self._update_inverted_indices(entry) + + logger.info(f"✅ 加载元数据索引: {len(self.index)} 条记录") + + except Exception as e: + logger.error(f"加载元数据索引失败: {e}", exc_info=True) + + def _save_index(self): + """保存索引到文件""" + try: + # 确保目录存在 + self.index_file.parent.mkdir(parents=True, exist_ok=True) + + # 序列化所有条目 + entries = [asdict(entry) for entry in self.index.values()] + data = { + 'version': '1.0', + 'count': len(entries), + 'last_updated': datetime.now().isoformat(), + 'entries': entries + } + + # 写入文件(使用临时文件 + 原子重命名) + temp_file = self.index_file.with_suffix('.tmp') + with open(temp_file, 'wb') as f: + f.write(orjson.dumps(data, option=orjson.OPT_INDENT_2)) + + temp_file.replace(self.index_file) + logger.debug(f"元数据索引已保存: {len(entries)} 条记录") + + except Exception as e: + logger.error(f"保存元数据索引失败: {e}", exc_info=True) + + def _update_inverted_indices(self, entry: MemoryMetadataIndexEntry): + """更新倒排索引""" + memory_id = entry.memory_id + + # 类型索引 + self.type_index.setdefault(entry.memory_type, set()).add(memory_id) + + # 主语索引 + for subject in entry.subjects: + subject_norm = subject.strip().lower() + if subject_norm: + self.subject_index.setdefault(subject_norm, set()).add(memory_id) + + # 关键词索引 + for keyword in entry.keywords: + keyword_norm = keyword.strip().lower() + if keyword_norm: + self.keyword_index.setdefault(keyword_norm, set()).add(memory_id) + + # 标签索引 + for tag in entry.tags: + tag_norm = tag.strip().lower() + if tag_norm: + self.tag_index.setdefault(tag_norm, set()).add(memory_id) + + def add_or_update(self, entry: MemoryMetadataIndexEntry): + """添加或更新索引条目""" + with self.lock: + # 如果已存在,先从倒排索引中移除旧记录 + if entry.memory_id in self.index: + self._remove_from_inverted_indices(entry.memory_id) + + # 添加新记录 + self.index[entry.memory_id] = entry + self._update_inverted_indices(entry) + + def _remove_from_inverted_indices(self, memory_id: str): + """从倒排索引中移除记录""" + if memory_id not in self.index: + return + + entry = self.index[memory_id] + + # 从类型索引移除 + if entry.memory_type in self.type_index: + self.type_index[entry.memory_type].discard(memory_id) + + # 从主语索引移除 + for subject in entry.subjects: + subject_norm = subject.strip().lower() + if subject_norm in self.subject_index: + self.subject_index[subject_norm].discard(memory_id) + + # 从关键词索引移除 + for keyword in entry.keywords: + keyword_norm = keyword.strip().lower() + if keyword_norm in self.keyword_index: + self.keyword_index[keyword_norm].discard(memory_id) + + # 从标签索引移除 + for tag in entry.tags: + tag_norm = tag.strip().lower() + if tag_norm in self.tag_index: + self.tag_index[tag_norm].discard(memory_id) + + def remove(self, memory_id: str): + """移除索引条目""" + with self.lock: + if memory_id in self.index: + self._remove_from_inverted_indices(memory_id) + del self.index[memory_id] + + def batch_add_or_update(self, entries: List[MemoryMetadataIndexEntry]): + """批量添加或更新""" + with self.lock: + for entry in entries: + self.add_or_update(entry) + + def save(self): + """保存索引到磁盘""" + with self.lock: + self._save_index() + + def search( + self, + memory_types: Optional[List[str]] = None, + subjects: Optional[List[str]] = None, + keywords: Optional[List[str]] = None, + tags: Optional[List[str]] = None, + importance_min: Optional[int] = None, + importance_max: Optional[int] = None, + created_after: Optional[float] = None, + created_before: Optional[float] = None, + user_id: Optional[str] = None, + limit: Optional[int] = None, + flexible_mode: bool = True # 新增:灵活匹配模式 + ) -> List[str]: + """ + 搜索符合条件的记忆ID列表(支持模糊匹配) + + Returns: + List[str]: 符合条件的 memory_id 列表 + """ + with self.lock: + if flexible_mode: + return self._search_flexible( + memory_types=memory_types, + subjects=subjects, + keywords=keywords, # 保留用于兼容性 + tags=tags, # 保留用于兼容性 + created_after=created_after, + created_before=created_before, + user_id=user_id, + limit=limit + ) + else: + return self._search_strict( + memory_types=memory_types, + subjects=subjects, + keywords=keywords, + tags=tags, + importance_min=importance_min, + importance_max=importance_max, + created_after=created_after, + created_before=created_before, + user_id=user_id, + limit=limit + ) + + def _search_flexible( + self, + memory_types: Optional[List[str]] = None, + subjects: Optional[List[str]] = None, + created_after: Optional[float] = None, + created_before: Optional[float] = None, + user_id: Optional[str] = None, + limit: Optional[int] = None, + **kwargs # 接受但不使用的参数 + ) -> List[str]: + """ + 灵活搜索模式:2/4项匹配即可,支持部分匹配 + + 评分维度: + 1. 记忆类型匹配 (0-1分) + 2. 主语匹配 (0-1分) + 3. 宾语匹配 (0-1分) + 4. 时间范围匹配 (0-1分) + + 总分 >= 2分即视为有效 + """ + # 用户过滤(必选) + if user_id: + base_candidates = { + mid for mid, entry in self.index.items() + if entry.user_id == user_id + } + else: + base_candidates = set(self.index.keys()) + + scored_candidates = [] + + for memory_id in base_candidates: + entry = self.index[memory_id] + score = 0 + match_details = [] + + # 1. 记忆类型匹配 + if memory_types: + type_score = 0 + for mtype in memory_types: + if entry.memory_type == mtype: + type_score = 1 + break + # 部分匹配:类型名称包含 + if mtype.lower() in entry.memory_type.lower() or entry.memory_type.lower() in mtype.lower(): + type_score = 0.5 + break + score += type_score + if type_score > 0: + match_details.append(f"类型:{entry.memory_type}") + else: + match_details.append("类型:未指定") + + # 2. 主语匹配(支持部分匹配) + if subjects: + subject_score = 0 + for subject in subjects: + subject_norm = subject.strip().lower() + for entry_subject in entry.subjects: + entry_subject_norm = entry_subject.strip().lower() + # 精确匹配 + if subject_norm == entry_subject_norm: + subject_score = 1 + break + # 部分匹配:包含关系 + if subject_norm in entry_subject_norm or entry_subject_norm in subject_norm: + subject_score = 0.6 + break + if subject_score == 1: + break + score += subject_score + if subject_score > 0: + match_details.append("主语:匹配") + else: + match_details.append("主语:未指定") + + # 3. 宾语匹配(支持部分匹配) + object_score = 0 + if entry.objects: + for entry_object in entry.objects: + entry_object_norm = str(entry_object).strip().lower() + # 检查是否与主语相关(主宾关联) + for subject in subjects or []: + subject_norm = subject.strip().lower() + if subject_norm in entry_object_norm or entry_object_norm in subject_norm: + object_score = 0.8 + match_details.append("宾语:主宾关联") + break + if object_score > 0: + break + + score += object_score + if object_score > 0: + match_details.append("宾语:匹配") + elif not entry.objects: + match_details.append("宾语:无") + + # 4. 时间范围匹配 + time_score = 0 + if created_after is not None or created_before is not None: + time_match = True + if created_after is not None and entry.created_at < created_after: + time_match = False + if created_before is not None and entry.created_at > created_before: + time_match = False + if time_match: + time_score = 1 + match_details.append("时间:匹配") + else: + match_details.append("时间:不匹配") + else: + match_details.append("时间:未指定") + + score += time_score + + # 只有总分 >= 2 的记忆才会被返回 + if score >= 2: + scored_candidates.append((memory_id, score, match_details)) + + # 按分数和时间排序 + scored_candidates.sort(key=lambda x: (x[1], self.index[x[0]].created_at), reverse=True) + + if limit: + result_ids = [mid for mid, _, _ in scored_candidates[:limit]] + else: + result_ids = [mid for mid, _, _ in scored_candidates] + + logger.debug( + f"[灵活搜索] 过滤条件: types={memory_types}, subjects={subjects}, " + f"time_range=[{created_after}, {created_before}], 返回={len(result_ids)}条" + ) + + # 记录匹配统计 + if scored_candidates and len(scored_candidates) > 0: + avg_score = sum(score for _, score, _ in scored_candidates) / len(scored_candidates) + logger.debug(f"[灵活搜索] 平均匹配分数: {avg_score:.2f}, 最高分: {scored_candidates[0][1]:.2f}") + + return result_ids + + def _search_strict( + self, + memory_types: Optional[List[str]] = None, + subjects: Optional[List[str]] = None, + keywords: Optional[List[str]] = None, + tags: Optional[List[str]] = None, + importance_min: Optional[int] = None, + importance_max: Optional[int] = None, + created_after: Optional[float] = None, + created_before: Optional[float] = None, + user_id: Optional[str] = None, + limit: Optional[int] = None + ) -> List[str]: + """严格搜索模式(原有逻辑)""" + # 初始候选集(所有记忆) + candidate_ids: Optional[Set[str]] = None + + # 用户过滤(必选) + if user_id: + candidate_ids = { + mid for mid, entry in self.index.items() + if entry.user_id == user_id + } + else: + candidate_ids = set(self.index.keys()) + + # 类型过滤(OR关系) + if memory_types: + type_ids = set() + for mtype in memory_types: + type_ids.update(self.type_index.get(mtype, set())) + candidate_ids &= type_ids + + # 主语过滤(OR关系,支持模糊匹配) + if subjects: + subject_ids = set() + for subject in subjects: + subject_norm = subject.strip().lower() + # 精确匹配 + if subject_norm in self.subject_index: + subject_ids.update(self.subject_index[subject_norm]) + # 模糊匹配(包含) + for indexed_subject, ids in self.subject_index.items(): + if subject_norm in indexed_subject or indexed_subject in subject_norm: + subject_ids.update(ids) + candidate_ids &= subject_ids + + # 关键词过滤(OR关系,支持模糊匹配) + if keywords: + keyword_ids = set() + for keyword in keywords: + keyword_norm = keyword.strip().lower() + # 精确匹配 + if keyword_norm in self.keyword_index: + keyword_ids.update(self.keyword_index[keyword_norm]) + # 模糊匹配(包含) + for indexed_keyword, ids in self.keyword_index.items(): + if keyword_norm in indexed_keyword or indexed_keyword in keyword_norm: + keyword_ids.update(ids) + candidate_ids &= keyword_ids + + # 标签过滤(OR关系) + if tags: + tag_ids = set() + for tag in tags: + tag_norm = tag.strip().lower() + tag_ids.update(self.tag_index.get(tag_norm, set())) + candidate_ids &= tag_ids + + # 重要性过滤 + if importance_min is not None or importance_max is not None: + importance_ids = { + mid for mid in candidate_ids + if (importance_min is None or self.index[mid].importance >= importance_min) + and (importance_max is None or self.index[mid].importance <= importance_max) + } + candidate_ids &= importance_ids + + # 时间范围过滤 + if created_after is not None or created_before is not None: + time_ids = { + mid for mid in candidate_ids + if (created_after is None or self.index[mid].created_at >= created_after) + and (created_before is None or self.index[mid].created_at <= created_before) + } + candidate_ids &= time_ids + + # 转换为列表并排序(按创建时间倒序) + result_ids = sorted( + candidate_ids, + key=lambda mid: self.index[mid].created_at, + reverse=True + ) + + # 限制数量 + if limit: + result_ids = result_ids[:limit] + + logger.debug( + f"[严格搜索] types={memory_types}, subjects={subjects}, " + f"keywords={keywords}, 返回={len(result_ids)}条" + ) + + return result_ids + + def get_entry(self, memory_id: str) -> Optional[MemoryMetadataIndexEntry]: + """获取单个索引条目""" + return self.index.get(memory_id) + + def get_stats(self) -> Dict[str, Any]: + """获取索引统计信息""" + with self.lock: + return { + 'total_memories': len(self.index), + 'types': {mtype: len(ids) for mtype, ids in self.type_index.items()}, + 'subjects_count': len(self.subject_index), + 'keywords_count': len(self.keyword_index), + 'tags_count': len(self.tag_index), + } diff --git a/src/chat/memory_system/memory_query_planner.py b/src/chat/memory_system/memory_query_planner.py index e6f64c97b..690e26627 100644 --- a/src/chat/memory_system/memory_query_planner.py +++ b/src/chat/memory_system/memory_query_planner.py @@ -135,22 +135,76 @@ class MemoryQueryPlanner: persona = context.get("bot_personality") or context.get("bot_identity") or "未知" + # 构建未读消息上下文信息 + context_section = "" + if context.get("has_unread_context") and context.get("unread_messages_context"): + unread_context = context["unread_messages_context"] + unread_messages = unread_context.get("messages", []) + unread_keywords = unread_context.get("keywords", []) + unread_participants = unread_context.get("participants", []) + context_summary = unread_context.get("context_summary", "") + + if unread_messages: + # 构建未读消息摘要 + message_previews = [] + for msg in unread_messages[:5]: # 最多显示5条 + sender = msg.get("sender", "未知") + content = msg.get("content", "")[:100] # 限制每条消息长度 + message_previews.append(f"{sender}: {content}") + + context_section = f""" + +## 📋 未读消息上下文 (共{unread_context.get('total_count', 0)}条未读消息) +### 最近消息预览: +{chr(10).join(message_previews)} + +### 上下文关键词: +{', '.join(unread_keywords[:15]) if unread_keywords else '无'} + +### 对话参与者: +{', '.join(unread_participants) if unread_participants else '无'} + +### 上下文摘要: +{context_summary[:300] if context_summary else '无'} +""" + else: + context_section = """ + +## 📋 未读消息上下文: +无未读消息或上下文信息不可用 +""" + return f""" 你是一名记忆检索规划助手,请基于输入生成一个简洁的 JSON 检索计划。 +你的任务是分析当前查询并结合未读消息的上下文,生成更精准的记忆检索策略。 + 仅需提供以下字段: -- semantic_query: 用于向量召回的自然语言描述,要求具体且贴合当前查询; +- semantic_query: 用于向量召回的自然语言描述,要求具体且贴合当前查询和上下文; - memory_types: 建议检索的记忆类型列表,取值范围来自 MemoryType 枚举 (personal_fact,event,preference,opinion,relationship,emotion,knowledge,skill,goal,experience,contextual); - subject_includes: 建议出现在记忆主语中的人物或角色; - object_includes: 建议关注的对象、主题或关键信息; +- required_keywords: 建议必须包含的关键词(从上下文中提取); - recency: 推荐的时间偏好,可选 recent/any/historical; - limit: 推荐的最大返回数量 (1-15); -- notes: 额外补充说明(可选)。 +- emphasis: 检索重点,可选 balanced/contextual/recent/comprehensive。 请不要生成谓语字段,也不要额外补充其它参数。 -当前查询: "{query_text}" -已知的对话参与者: {participant_preview} -机器人设定: {persona} +## 当前查询: +"{query_text}" + +## 已知对话参与者: +{participant_preview} + +## 机器人设定: +{persona}{context_section} + +## 🎯 指导原则: +1. **上下文关联**: 优先分析与当前查询相关的未读消息内容和关键词 +2. **语义理解**: 结合上下文理解查询的真实意图,而非字面意思 +3. **参与者感知**: 考虑未读消息中的参与者,检索与他们相关的记忆 +4. **主题延续**: 关注未读消息中讨论的主题,检索相关的历史记忆 +5. **时间相关性**: 如果未读消息讨论最近的事件,偏向检索相关时期的记忆 请直接输出符合要求的 JSON 对象,禁止添加额外文本或 Markdown 代码块。 """ diff --git a/src/chat/memory_system/memory_system.py b/src/chat/memory_system/memory_system.py index b5f6af77e..6f82ef184 100644 --- a/src/chat/memory_system/memory_system.py +++ b/src/chat/memory_system/memory_system.py @@ -380,11 +380,11 @@ class MemorySystem: self.status = original_status return [] - # 2. 构建记忆块 + # 2. 构建记忆块(所有记忆统一使用 global 作用域,实现完全共享) memory_chunks = await self.memory_builder.build_memories( conversation_text, normalized_context, - GLOBAL_MEMORY_SCOPE, + GLOBAL_MEMORY_SCOPE, # 强制使用 global,不区分用户 timestamp or time.time() ) @@ -609,7 +609,7 @@ class MemorySystem: limit: int = 5, **kwargs ) -> List[MemoryChunk]: - """检索相关记忆(简化版,使用统一存储)""" + """检索相关记忆(三阶段召回:元数据粗筛 → 向量精筛 → 综合重排)""" raw_query = query_text or kwargs.get("query") if not raw_query: raise ValueError("query_text 或 query 参数不能为空") @@ -619,6 +619,8 @@ class MemorySystem: return [] context = context or {} + + # 所有记忆完全共享,统一使用 global 作用域,不区分用户 resolved_user_id = GLOBAL_MEMORY_SCOPE self.status = MemorySystemStatus.RETRIEVING @@ -626,48 +628,152 @@ class MemorySystem: try: normalized_context = self._normalize_context(context, GLOBAL_MEMORY_SCOPE, None) - - effective_limit = limit or self.config.final_recall_limit - - # 构建过滤器 - filters = { - "user_id": resolved_user_id + effective_limit = self.config.final_recall_limit + + # === 阶段一:元数据粗筛(软性过滤) === + coarse_filters = { + "user_id": GLOBAL_MEMORY_SCOPE, # 必选:确保作用域正确 } - # 应用查询规划结果 + # 应用查询规划(优化查询语句并构建元数据过滤) + optimized_query = raw_query + metadata_filters = {} + if self.query_planner: try: - query_plan = await self.query_planner.plan_query(raw_query, normalized_context) - if getattr(query_plan, "memory_types", None): - filters["memory_types"] = [mt.value for mt in query_plan.memory_types] - if getattr(query_plan, "subject_includes", None): - filters["keywords"] = query_plan.subject_includes + # 构建包含未读消息的增强上下文 + enhanced_context = await self._build_enhanced_query_context(raw_query, normalized_context) + query_plan = await self.query_planner.plan_query(raw_query, enhanced_context) + + # 使用LLM优化后的查询语句(更精确的语义表达) if getattr(query_plan, "semantic_query", None): - raw_query = query_plan.semantic_query + optimized_query = query_plan.semantic_query + + # 构建JSON元数据过滤条件(用于阶段一粗筛) + # 将查询规划的结果转换为元数据过滤条件 + if getattr(query_plan, "memory_types", None): + metadata_filters['memory_types'] = [mt.value for mt in query_plan.memory_types] + + if getattr(query_plan, "subject_includes", None): + metadata_filters['subjects'] = query_plan.subject_includes + + if getattr(query_plan, "required_keywords", None): + metadata_filters['keywords'] = query_plan.required_keywords + + # 时间范围过滤 + recency = getattr(query_plan, "recency_preference", "any") + current_time = time.time() + if recency == "recent": + # 最近7天 + metadata_filters['created_after'] = current_time - (7 * 24 * 3600) + elif recency == "historical": + # 30天以前 + metadata_filters['created_before'] = current_time - (30 * 24 * 3600) + + # 添加用户ID到元数据过滤 + metadata_filters['user_id'] = GLOBAL_MEMORY_SCOPE + + logger.debug(f"[阶段一] 查询优化: '{raw_query}' → '{optimized_query}'") + logger.debug(f"[阶段一] 元数据过滤条件: {metadata_filters}") + except Exception as plan_exc: - logger.warning("查询规划失败,使用默认检索策略: %s", plan_exc, exc_info=True) + logger.warning("查询规划失败,使用原始查询: %s", plan_exc, exc_info=True) + # 即使查询规划失败,也保留基本的user_id过滤 + metadata_filters = {'user_id': GLOBAL_MEMORY_SCOPE} - # 使用Vector DB存储搜索 + # === 阶段二:向量精筛 === + coarse_limit = self.config.coarse_recall_limit # 粗筛阶段返回更多候选 + + logger.debug(f"[阶段二] 开始向量搜索: query='{optimized_query[:60]}...', limit={coarse_limit}") + search_results = await self.unified_storage.search_similar_memories( - query_text=raw_query, - limit=effective_limit, - filters=filters + query_text=optimized_query, + limit=coarse_limit, + filters=coarse_filters, # ChromaDB where条件(保留兼容) + metadata_filters=metadata_filters # JSON元数据索引过滤 ) + + logger.info(f"[阶段二] 向量搜索完成: 返回 {len(search_results)} 条候选") - # 转换为记忆对象 - search_results 返回 List[Tuple[MemoryChunk, float]] - final_memories = [] - for memory, similarity_score in search_results: + # === 阶段三:综合重排 === + scored_memories = [] + current_time = time.time() + + for memory, vector_similarity in search_results: + # 1. 向量相似度得分(已归一化到 0-1) + vector_score = vector_similarity + + # 2. 时效性得分(指数衰减,30天半衰期) + age_seconds = current_time - memory.metadata.created_at + age_days = age_seconds / (24 * 3600) + # 使用 math.exp 而非 np.exp(避免依赖numpy) + import math + recency_score = math.exp(-age_days / 30) + + # 3. 重要性得分(枚举值转换为归一化得分 0-1) + # ImportanceLevel: LOW=1, NORMAL=2, HIGH=3, CRITICAL=4 + importance_enum = memory.metadata.importance + if hasattr(importance_enum, 'value'): + # 枚举类型,转换为0-1范围:(value - 1) / 3 + importance_score = (importance_enum.value - 1) / 3.0 + else: + # 如果已经是数值,直接使用 + importance_score = float(importance_enum) if importance_enum else 0.5 + + # 4. 访问频率得分(归一化,访问10次以上得满分) + access_count = memory.metadata.access_count + frequency_score = min(access_count / 10.0, 1.0) + + # 综合得分(加权平均) + final_score = ( + self.config.vector_weight * vector_score + + self.config.recency_weight * recency_score + + self.config.context_weight * importance_score + + 0.1 * frequency_score # 访问频率权重(固定10%) + ) + + scored_memories.append((memory, final_score, { + "vector": vector_score, + "recency": recency_score, + "importance": importance_score, + "frequency": frequency_score, + "final": final_score + })) + + # 更新访问记录 memory.update_access() - final_memories.append(memory) + # 按综合得分排序 + scored_memories.sort(key=lambda x: x[1], reverse=True) + + # 返回 Top-K + final_memories = [mem for mem, score, details in scored_memories[:effective_limit]] + retrieval_time = time.time() - start_time + # 详细日志 + if scored_memories: + logger.info(f"[阶段三] 综合重排完成: Top 3 得分详情") + for i, (mem, score, details) in enumerate(scored_memories[:3], 1): + try: + summary = mem.content[:60] if hasattr(mem, 'content') and mem.content else "" + except: + summary = "" + logger.info( + f" #{i} | final={details['final']:.3f} " + f"(vec={details['vector']:.3f}, rec={details['recency']:.3f}, " + f"imp={details['importance']:.3f}, freq={details['frequency']:.3f}) " + f"| {summary}" + ) + logger.info( - "✅ 简化记忆检索完成" + "✅ 三阶段记忆检索完成" f" | user={resolved_user_id}" - f" | count={len(final_memories)}" + f" | 粗筛={len(search_results)}" + f" | 精筛={len(scored_memories)}" + f" | 返回={len(final_memories)}" f" | duration={retrieval_time:.3f}s" - f" | query='{raw_query}'" + f" | query='{optimized_query[:60]}...'" ) self.last_retrieval_time = time.time() @@ -717,8 +823,8 @@ class MemorySystem: except Exception: context = dict(raw_context or {}) - # 基础字段(统一使用全局作用域) - context["user_id"] = GLOBAL_MEMORY_SCOPE + # 基础字段:强制使用传入的 user_id 参数(已统一为 GLOBAL_MEMORY_SCOPE) + context["user_id"] = user_id or GLOBAL_MEMORY_SCOPE context["timestamp"] = context.get("timestamp") or timestamp or time.time() context["message_type"] = context.get("message_type") or "normal" context["platform"] = context.get("platform") or context.get("source_platform") or "unknown" @@ -758,6 +864,150 @@ class MemorySystem: return context + async def _build_enhanced_query_context(self, raw_query: str, normalized_context: Dict[str, Any]) -> Dict[str, Any]: + """构建包含未读消息综合上下文的增强查询上下文 + + Args: + raw_query: 原始查询文本 + normalized_context: 标准化后的基础上下文 + + Returns: + Dict[str, Any]: 包含未读消息综合信息的增强上下文 + """ + enhanced_context = dict(normalized_context) # 复制基础上下文 + + try: + # 获取stream_id以查找未读消息 + stream_id = normalized_context.get("stream_id") + if not stream_id: + logger.debug("未找到stream_id,使用基础上下文进行查询规划") + return enhanced_context + + # 获取未读消息作为上下文 + unread_messages_summary = await self._collect_unread_messages_context(stream_id) + + if unread_messages_summary: + enhanced_context["unread_messages_context"] = unread_messages_summary + enhanced_context["has_unread_context"] = True + + logger.debug(f"为查询规划构建了增强上下文,包含 {len(unread_messages_summary.get('messages', []))} 条未读消息") + else: + enhanced_context["has_unread_context"] = False + logger.debug("未找到未读消息,使用基础上下文进行查询规划") + + except Exception as e: + logger.warning(f"构建增强查询上下文失败: {e}", exc_info=True) + enhanced_context["has_unread_context"] = False + + return enhanced_context + + async def _collect_unread_messages_context(self, stream_id: str) -> Optional[Dict[str, Any]]: + """收集未读消息的综合上下文信息 + + Args: + stream_id: 流ID + + Returns: + Optional[Dict[str, Any]]: 未读消息的综合信息,包含消息列表、关键词、主题等 + """ + try: + from src.chat.message_receive.chat_stream import get_chat_manager + + chat_manager = get_chat_manager() + chat_stream = chat_manager.get_stream(stream_id) + + if not chat_stream or not hasattr(chat_stream, "context_manager"): + logger.debug(f"未找到stream_id={stream_id}的聊天流或上下文管理器") + return None + + # 获取未读消息 + context_manager = chat_stream.context_manager + unread_messages = context_manager.get_unread_messages() + + if not unread_messages: + logger.debug(f"stream_id={stream_id}没有未读消息") + return None + + # 构建未读消息摘要 + messages_summary = [] + all_keywords = set() + participant_names = set() + + for msg in unread_messages[:10]: # 限制处理最近10条未读消息 + try: + # 提取消息内容 + content = (getattr(msg, "processed_plain_text", None) or + getattr(msg, "display_message", None) or "") + if not content: + continue + + # 提取发送者信息 + sender_name = "未知用户" + if hasattr(msg, "user_info") and msg.user_info: + sender_name = (getattr(msg.user_info, "user_nickname", None) or + getattr(msg.user_info, "user_cardname", None) or + getattr(msg.user_info, "user_id", None) or "未知用户") + + participant_names.add(sender_name) + + # 添加到消息摘要 + messages_summary.append({ + "sender": sender_name, + "content": content[:200], # 限制长度避免过长 + "timestamp": getattr(msg, "time", None) + }) + + # 提取关键词(简单实现) + content_lower = content.lower() + # 这里可以添加更复杂的关键词提取逻辑 + words = [w.strip() for w in content_lower.split() if len(w.strip()) > 1] + all_keywords.update(words[:5]) # 每条消息最多取5个词 + + except Exception as msg_e: + logger.debug(f"处理未读消息时出错: {msg_e}") + continue + + if not messages_summary: + return None + + # 构建综合上下文信息 + unread_context = { + "messages": messages_summary, + "total_count": len(unread_messages), + "processed_count": len(messages_summary), + "keywords": list(all_keywords)[:20], # 最多20个关键词 + "participants": list(participant_names), + "context_summary": self._build_unread_context_summary(messages_summary) + } + + logger.debug(f"收集到未读消息上下文: {len(messages_summary)}条消息,{len(all_keywords)}个关键词,{len(participant_names)}个参与者") + return unread_context + + except Exception as e: + logger.warning(f"收集未读消息上下文失败: {e}", exc_info=True) + return None + + def _build_unread_context_summary(self, messages_summary: List[Dict[str, Any]]) -> str: + """构建未读消息的文本摘要 + + Args: + messages_summary: 未读消息摘要列表 + + Returns: + str: 未读消息的文本摘要 + """ + if not messages_summary: + return "" + + summary_parts = [] + for msg_info in messages_summary: + sender = msg_info.get("sender", "未知") + content = msg_info.get("content", "") + if content: + summary_parts.append(f"{sender}: {content}") + + return " | ".join(summary_parts) + async def _resolve_conversation_context(self, fallback_text: str, context: Optional[Dict[str, Any]]) -> str: """使用 stream_id 历史消息和相关记忆充实对话文本,默认回退到传入文本""" if not context: diff --git a/src/chat/memory_system/vector_memory_storage_v2.py b/src/chat/memory_system/vector_memory_storage_v2.py index 7722324dd..6c590d888 100644 --- a/src/chat/memory_system/vector_memory_storage_v2.py +++ b/src/chat/memory_system/vector_memory_storage_v2.py @@ -24,11 +24,63 @@ import numpy as np from src.common.logger import get_logger from src.common.vector_db import vector_db_service from src.chat.utils.utils import get_embedding -from src.chat.memory_system.memory_chunk import MemoryChunk +from src.chat.memory_system.memory_chunk import MemoryChunk, ConfidenceLevel, ImportanceLevel from src.chat.memory_system.memory_forgetting_engine import MemoryForgettingEngine +from src.chat.memory_system.memory_metadata_index import MemoryMetadataIndex, MemoryMetadataIndexEntry logger = get_logger(__name__) +# 全局枚举映射表缓存 +_ENUM_MAPPINGS_CACHE = {} + +def _build_enum_mapping(enum_class: type) -> Dict[str, Any]: + """构建枚举类的完整映射表 + + Args: + enum_class: 枚举类 + + Returns: + Dict[str, Any]: 包含各种映射格式的字典 + """ + cache_key = f"{enum_class.__module__}.{enum_class.__name__}" + + # 如果已经缓存过,直接返回 + if cache_key in _ENUM_MAPPINGS_CACHE: + return _ENUM_MAPPINGS_CACHE[cache_key] + + mapping = { + "name_to_enum": {}, # 枚举名称 -> 枚举实例 (HIGH -> ImportanceLevel.HIGH) + "value_to_enum": {}, # 整数值 -> 枚举实例 (3 -> ImportanceLevel.HIGH) + "value_str_to_enum": {}, # 字符串value -> 枚举实例 ("3" -> ImportanceLevel.HIGH) + "enum_value_to_name": {}, # 枚举实例 -> 名称映射 (反向) + "all_possible_strings": set(), # 所有可能的字符串表示 + } + + for member in enum_class: + # 名称映射 (支持大小写) + mapping["name_to_enum"][member.name] = member + mapping["name_to_enum"][member.name.lower()] = member + mapping["name_to_enum"][member.name.upper()] = member + + # 值映射 + mapping["value_to_enum"][member.value] = member + mapping["value_str_to_enum"][str(member.value)] = member + + # 反向映射 + mapping["enum_value_to_name"][member] = member.name + + # 收集所有可能的字符串表示 + mapping["all_possible_strings"].add(member.name) + mapping["all_possible_strings"].add(member.name.lower()) + mapping["all_possible_strings"].add(member.name.upper()) + mapping["all_possible_strings"].add(str(member.value)) + + # 缓存结果 + _ENUM_MAPPINGS_CACHE[cache_key] = mapping + logger.debug(f"构建枚举映射表: {enum_class.__name__} -> {len(mapping['name_to_enum'])} 个名称映射, {len(mapping['value_to_enum'])} 个值映射") + + return mapping + @dataclass class VectorStorageConfig: @@ -38,7 +90,7 @@ class VectorStorageConfig: metadata_collection: str = "memory_metadata_v2" # 检索配置 - similarity_threshold: float = 0.8 + similarity_threshold: float = 0.5 # 降低阈值以提高召回率(0.5-0.6 是合理范围) search_limit: int = 20 batch_size: int = 100 @@ -50,6 +102,26 @@ class VectorStorageConfig: # 遗忘配置 enable_forgetting: bool = True retention_hours: int = 24 * 30 # 30天 + + @classmethod + def from_global_config(cls): + """从全局配置创建实例""" + from src.config.config import global_config + + memory_cfg = global_config.memory + + return cls( + memory_collection=getattr(memory_cfg, 'vector_db_memory_collection', 'unified_memory_v2'), + metadata_collection=getattr(memory_cfg, 'vector_db_metadata_collection', 'memory_metadata_v2'), + similarity_threshold=getattr(memory_cfg, 'vector_db_similarity_threshold', 0.5), + search_limit=getattr(memory_cfg, 'vector_db_search_limit', 20), + batch_size=getattr(memory_cfg, 'vector_db_batch_size', 100), + enable_caching=getattr(memory_cfg, 'vector_db_enable_caching', True), + cache_size_limit=getattr(memory_cfg, 'vector_db_cache_size_limit', 1000), + auto_cleanup_interval=getattr(memory_cfg, 'vector_db_auto_cleanup_interval', 3600), + enable_forgetting=getattr(memory_cfg, 'enable_memory_forgetting', True), + retention_hours=getattr(memory_cfg, 'vector_db_retention_hours', 720), + ) class VectorMemoryStorage: @@ -71,11 +143,29 @@ class VectorMemoryStorage: """基于Vector DB的记忆存储系统""" def __init__(self, config: Optional[VectorStorageConfig] = None): - self.config = config or VectorStorageConfig() + # 默认从全局配置读取,如果没有传入config + if config is None: + try: + self.config = VectorStorageConfig.from_global_config() + logger.info("✅ Vector存储配置已从全局配置加载") + except Exception as e: + logger.warning(f"从全局配置加载失败,使用默认配置: {e}") + self.config = VectorStorageConfig() + else: + self.config = config + + # 从配置中获取批处理大小和集合名称 + self.batch_size = self.config.batch_size + self.collection_name = self.config.memory_collection + self.vector_db_service = vector_db_service # 内存缓存 self.memory_cache: Dict[str, MemoryChunk] = {} self.cache_timestamps: Dict[str, float] = {} + self._cache = self.memory_cache # 别名,兼容旧代码 + + # 元数据索引管理器(JSON文件索引) + self.metadata_index = MemoryMetadataIndex() # 遗忘引擎 self.forgetting_engine: Optional[MemoryForgettingEngine] = None @@ -180,29 +270,59 @@ class VectorMemoryStorage: except Exception as e: logger.error(f"自动清理失败: {e}") - def _memory_to_vector_format(self, memory: MemoryChunk) -> Tuple[Dict[str, Any], str]: - """将MemoryChunk转换为Vector DB格式""" - # 选择用于向量化的文本 - content = getattr(memory, 'display', None) or getattr(memory, 'text_content', None) or "" + def _memory_to_vector_format(self, memory: MemoryChunk) -> Dict[str, Any]: + """将MemoryChunk转换为向量存储格式""" + try: + # 获取memory_id + memory_id = getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', None) + + # 生成向量表示的文本 + display_text = getattr(memory, 'display', None) or getattr(memory, 'text_content', None) or str(memory.content) + if not display_text.strip(): + logger.warning(f"记忆 {memory_id} 缺少有效的显示文本") + display_text = f"{memory.memory_type.value}: {', '.join(memory.subjects)}" - # 构建元数据(全部从memory.metadata获取) - meta = getattr(memory, 'metadata', None) - metadata = { - "user_id": getattr(meta, 'user_id', None), - "chat_id": getattr(meta, 'chat_id', 'unknown'), - "memory_type": memory.memory_type.value, - "keywords": orjson.dumps(getattr(memory, 'keywords', [])).decode("utf-8"), - "importance": getattr(meta, 'importance', None), - "timestamp": getattr(meta, 'created_at', None), - "access_count": getattr(meta, 'access_count', 0), - "last_access_time": getattr(meta, 'last_accessed', 0), - "confidence": getattr(meta, 'confidence', None), - "source": "vector_storage_v2", - # 存储完整的记忆数据 - "memory_data": orjson.dumps(memory.to_dict()).decode("utf-8") - } + # 构建元数据 - 修复枚举值和列表序列化 + metadata = { + "memory_id": memory_id, + "user_id": memory.metadata.user_id or "unknown", + "memory_type": memory.memory_type.value, + "importance": memory.metadata.importance.name, # 使用 .name 而不是枚举对象 + "confidence": memory.metadata.confidence.name, # 使用 .name 而不是枚举对象 + "created_at": memory.metadata.created_at, + "last_accessed": memory.metadata.last_accessed or memory.metadata.created_at, + "access_count": memory.metadata.access_count, + "subjects": orjson.dumps(memory.subjects).decode("utf-8"), # 列表转JSON字符串 + "keywords": orjson.dumps(memory.keywords).decode("utf-8"), # 列表转JSON字符串 + "tags": orjson.dumps(memory.tags).decode("utf-8"), # 列表转JSON字符串 + "categories": orjson.dumps(memory.categories).decode("utf-8"), # 列表转JSON字符串 + "relevance_score": memory.metadata.relevance_score + } - return metadata, content + # 添加可选字段 + if memory.metadata.source_context: + metadata["source_context"] = str(memory.metadata.source_context) + + if memory.content.predicate: + metadata["predicate"] = memory.content.predicate + + if memory.content.object: + if isinstance(memory.content.object, (dict, list)): + metadata["object"] = orjson.dumps(memory.content.object).decode() + else: + metadata["object"] = str(memory.content.object) + + return { + "id": memory_id, + "embedding": None, # 将由vector_db_service生成 + "metadata": metadata, + "document": display_text + } + + except Exception as e: + memory_id = getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', 'unknown') + logger.error(f"转换记忆 {memory_id} 到向量格式失败: {e}", exc_info=True) + raise def _vector_result_to_memory(self, document: str, metadata: Dict[str, Any]) -> Optional[MemoryChunk]: """将Vector DB结果转换为MemoryChunk""" @@ -212,27 +332,108 @@ class VectorMemoryStorage: memory_dict = orjson.loads(metadata["memory_data"]) return MemoryChunk.from_dict(memory_dict) - # 兜底:从基础字段重建 + # 兜底:从基础字段重建(使用新的结构化格式) + logger.warning(f"未找到memory_data,使用兜底逻辑重建记忆 (id={metadata.get('memory_id', 'unknown')})") + + # 构建符合MemoryChunk.from_dict期望的结构 memory_dict = { - "memory_id": metadata.get("memory_id", f"recovered_{int(time.time())}"), - "user_id": metadata.get("user_id", "unknown"), - "text_content": document, - "display": document, - "memory_type": metadata.get("memory_type", "general"), - "keywords": orjson.loads(metadata.get("keywords", "[]")), - "importance": metadata.get("importance", 0.5), - "timestamp": metadata.get("timestamp", time.time()), - "access_count": metadata.get("access_count", 0), - "last_access_time": metadata.get("last_access_time", 0), - "confidence": metadata.get("confidence", 0.8), - "metadata": {} + "metadata": { + "memory_id": metadata.get("memory_id", f"recovered_{int(time.time())}"), + "user_id": metadata.get("user_id", "unknown"), + "created_at": metadata.get("timestamp", time.time()), + "last_accessed": metadata.get("last_access_time", time.time()), + "last_modified": metadata.get("timestamp", time.time()), + "access_count": metadata.get("access_count", 0), + "relevance_score": 0.0, + "confidence": self._parse_enum_value(metadata.get("confidence", 2), ConfidenceLevel, ConfidenceLevel.MEDIUM), + "importance": self._parse_enum_value(metadata.get("importance", 2), ImportanceLevel, ImportanceLevel.NORMAL), + "source_context": None, + }, + "content": { + "subject": "", + "predicate": "", + "object": "", + "display": document # 使用document作为显示文本 + }, + "memory_type": metadata.get("memory_type", "contextual"), + "keywords": orjson.loads(metadata.get("keywords", "[]")) if isinstance(metadata.get("keywords"), str) else metadata.get("keywords", []), + "tags": [], + "categories": [], + "embedding": None, + "semantic_hash": None, + "related_memories": [], + "temporal_context": None } - + return MemoryChunk.from_dict(memory_dict) - + except Exception as e: - logger.warning(f"转换Vector结果到MemoryChunk失败: {e}") + logger.error(f"转换Vector结果到MemoryChunk失败: {e}", exc_info=True) return None + + def _parse_enum_value(self, value: Any, enum_class: type, default: Any) -> Any: + """解析枚举值,支持字符串、整数和枚举实例 + + Args: + value: 要解析的值(可能是字符串、整数或枚举实例) + enum_class: 目标枚举类 + default: 默认值 + + Returns: + 解析后的枚举实例 + """ + if value is None: + return default + + # 如果已经是枚举实例,直接返回 + if isinstance(value, enum_class): + return value + + # 如果是整数,尝试按value值匹配 + if isinstance(value, int): + try: + for member in enum_class: + if member.value == value: + return member + # 如果没找到匹配的,返回默认值 + logger.warning(f"无法找到{enum_class.__name__}中value={value}的枚举项,使用默认值") + return default + except Exception as e: + logger.warning(f"解析{enum_class.__name__}整数值{value}时出错: {e},使用默认值") + return default + + # 如果是字符串,尝试按名称或value值匹配 + if isinstance(value, str): + str_value = value.strip().upper() + + # 先尝试按枚举名称匹配 + try: + if hasattr(enum_class, str_value): + return getattr(enum_class, str_value) + except AttributeError: + pass + + # 再尝试按value值匹配(如果value是字符串形式的数字) + try: + int_value = int(str_value) + return self._parse_enum_value(int_value, enum_class, default) + except ValueError: + pass + + # 最后尝试按小写名称匹配 + try: + for member in enum_class: + if member.value.upper() == str_value: + return member + logger.warning(f"无法找到{enum_class.__name__}中名称或value为'{value}'的枚举项,使用默认值") + return default + except Exception as e: + logger.warning(f"解析{enum_class.__name__}字符串值'{value}'时出错: {e},使用默认值") + return default + + # 其他类型,返回默认值 + logger.warning(f"不支持的{enum_class.__name__}值类型: {type(value)},使用默认值") + return default def _get_from_cache(self, memory_id: str) -> Optional[MemoryChunk]: """从缓存获取记忆""" @@ -262,70 +463,124 @@ class VectorMemoryStorage: self.memory_cache.pop(oldest_id, None) self.cache_timestamps.pop(oldest_id, None) - self.memory_cache[memory.memory_id] = memory - self.cache_timestamps[memory.memory_id] = time.time() + memory_id = getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', None) + if memory_id: + self.memory_cache[memory_id] = memory + self.cache_timestamps[memory_id] = time.time() async def store_memories(self, memories: List[MemoryChunk]) -> int: """批量存储记忆""" if not memories: return 0 + + start_time = datetime.now() + success_count = 0 try: - # 准备批量数据 - embeddings = [] - documents = [] - metadatas = [] - ids = [] - + # 转换为向量格式 + vector_data_list = [] for memory in memories: try: - # 转换格式 - metadata, content = self._memory_to_vector_format(memory) - - if not content.strip(): - logger.warning(f"记忆 {memory.memory_id} 内容为空,跳过") - continue - - # 生成向量 - embedding = await get_embedding(content) - if not embedding: - logger.warning(f"生成向量失败,跳过记忆: {memory.memory_id}") - continue - - embeddings.append(embedding) - documents.append(content) - metadatas.append(metadata) - ids.append(memory.memory_id) - - # 添加到缓存 - self._add_to_cache(memory) - + vector_data = self._memory_to_vector_format(memory) + vector_data_list.append(vector_data) except Exception as e: - logger.error(f"处理记忆 {memory.memory_id} 失败: {e}") + memory_id = getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', 'unknown') + logger.error(f"处理记忆 {memory_id} 失败: {e}") continue - - # 批量插入Vector DB - if embeddings: - vector_db_service.add( - collection_name=self.config.memory_collection, - embeddings=embeddings, - documents=documents, - metadatas=metadatas, - ids=ids - ) + + if not vector_data_list: + logger.warning("没有有效的记忆数据可存储") + return 0 + + # 批量存储到向量数据库 + for i in range(0, len(vector_data_list), self.batch_size): + batch = vector_data_list[i:i + self.batch_size] - stored_count = len(embeddings) - self.stats["total_stores"] += stored_count - self.stats["total_memories"] += stored_count - - logger.info(f"成功存储 {stored_count}/{len(memories)} 条记忆") - return stored_count + try: + # 生成embeddings + embeddings = [] + for item in batch: + try: + embedding = await get_embedding(item["document"]) + embeddings.append(embedding) + except Exception as e: + logger.error(f"生成embedding失败: {e}") + # 使用零向量作为后备 + embeddings.append([0.0] * 768) # 默认维度 + + # vector_db_service.add 需要embeddings参数 + self.vector_db_service.add( + collection_name=self.collection_name, + embeddings=embeddings, + ids=[item["id"] for item in batch], + documents=[item["document"] for item in batch], + metadatas=[item["metadata"] for item in batch] + ) + success = True + + if success: + # 更新缓存和元数据索引 + metadata_entries = [] + for item in batch: + memory_id = item["id"] + # 从原始 memories 列表中找到对应的 MemoryChunk + memory = next((m for m in memories if (getattr(m.metadata, 'memory_id', None) or getattr(m, 'memory_id', None)) == memory_id), None) + if memory: + # 更新缓存 + self._cache[memory_id] = memory + success_count += 1 + + # 创建元数据索引条目 + try: + index_entry = MemoryMetadataIndexEntry( + memory_id=memory_id, + user_id=memory.metadata.user_id or "unknown", + memory_type=memory.memory_type.value, + subjects=memory.subjects, + objects=[str(memory.content.object)] if memory.content.object else [], + keywords=memory.keywords, + tags=memory.tags, + importance=memory.metadata.importance.value, + confidence=memory.metadata.confidence.value, + created_at=memory.metadata.created_at, + access_count=memory.metadata.access_count, + chat_id=memory.metadata.chat_id, + content_preview=str(memory.content)[:100] if memory.content else None + ) + metadata_entries.append(index_entry) + except Exception as e: + logger.warning(f"创建元数据索引条目失败 (memory_id={memory_id}): {e}") + + # 批量更新元数据索引 + if metadata_entries: + try: + self.metadata_index.batch_add_or_update(metadata_entries) + logger.debug(f"更新元数据索引: {len(metadata_entries)} 条") + except Exception as e: + logger.error(f"批量更新元数据索引失败: {e}") + else: + logger.warning(f"批次存储失败,跳过 {len(batch)} 条记忆") + + except Exception as e: + logger.error(f"批量存储失败: {e}", exc_info=True) + continue + + duration = (datetime.now() - start_time).total_seconds() + logger.info(f"成功存储 {success_count}/{len(memories)} 条记忆,耗时 {duration:.2f}秒") - return 0 + # 保存元数据索引到磁盘 + if success_count > 0: + try: + self.metadata_index.save() + logger.debug("元数据索引已保存到磁盘") + except Exception as e: + logger.error(f"保存元数据索引失败: {e}") + return success_count + except Exception as e: - logger.error(f"批量存储记忆失败: {e}") - return 0 + logger.error(f"批量存储记忆失败: {e}", exc_info=True) + return success_count async def store_memory(self, memory: MemoryChunk) -> bool: """存储单条记忆""" @@ -337,13 +592,62 @@ class VectorMemoryStorage: query_text: str, limit: int = 10, similarity_threshold: Optional[float] = None, - filters: Optional[Dict[str, Any]] = None + filters: Optional[Dict[str, Any]] = None, + # 新增:元数据过滤参数(用于JSON索引粗筛) + metadata_filters: Optional[Dict[str, Any]] = None ) -> List[Tuple[MemoryChunk, float]]: - """搜索相似记忆""" + """ + 搜索相似记忆(混合索引模式) + + Args: + query_text: 查询文本 + limit: 返回数量限制 + similarity_threshold: 相似度阈值 + filters: ChromaDB where条件(保留用于兼容) + metadata_filters: JSON元数据索引过滤条件,支持: + - memory_types: List[str] + - subjects: List[str] + - keywords: List[str] + - tags: List[str] + - importance_min: int + - importance_max: int + - created_after: float + - created_before: float + - user_id: str + """ if not query_text.strip(): return [] try: + # === 阶段一:JSON元数据粗筛(可选) === + candidate_ids: Optional[List[str]] = None + if metadata_filters: + logger.debug(f"[JSON元数据粗筛] 开始,过滤条件: {metadata_filters}") + candidate_ids = self.metadata_index.search( + memory_types=metadata_filters.get('memory_types'), + subjects=metadata_filters.get('subjects'), + keywords=metadata_filters.get('keywords'), + tags=metadata_filters.get('tags'), + importance_min=metadata_filters.get('importance_min'), + importance_max=metadata_filters.get('importance_max'), + created_after=metadata_filters.get('created_after'), + created_before=metadata_filters.get('created_before'), + user_id=metadata_filters.get('user_id'), + limit=self.config.search_limit * 2, # 粗筛返回更多候选 + flexible_mode=True # 使用灵活匹配模式 + ) + logger.info(f"[JSON元数据粗筛] 完成,筛选出 {len(candidate_ids)} 个候选ID") + + # 如果粗筛后没有结果,回退到全部记忆搜索 + if not candidate_ids: + total_memories = len(self.metadata_index.index) + logger.warning(f"JSON元数据粗筛后无候选,启用回退机制:在全部 {total_memories} 条记忆中进行向量搜索") + logger.info("💡 提示:这可能是因为查询条件过于严格,或相关记忆的元数据与查询条件不完全匹配") + candidate_ids = None # 设为None表示不限制候选ID + else: + logger.debug(f"[JSON元数据粗筛] 成功筛选出候选,进入向量精筛阶段") + + # === 阶段二:向量精筛 === # 生成查询向量 query_embedding = await get_embedding(query_text) if not query_embedding: @@ -354,7 +658,16 @@ class VectorMemoryStorage: # 构建where条件 where_conditions = filters or {} + # 如果有候选ID列表,添加到where条件 + if candidate_ids: + # ChromaDB的where条件需要使用$in操作符 + where_conditions["memory_id"] = {"$in": candidate_ids} + logger.debug(f"[向量精筛] 限制在 {len(candidate_ids)} 个候选ID内搜索") + else: + logger.info("[向量精筛] 在全部记忆中搜索(元数据筛选无结果回退)") + # 查询Vector DB + logger.debug(f"[向量精筛] 开始,limit={min(limit, self.config.search_limit)}") results = vector_db_service.query( collection_name=self.config.memory_collection, query_embeddings=[query_embedding], @@ -371,6 +684,7 @@ class VectorMemoryStorage: metadatas = results.get("metadatas", [[]])[0] ids = results.get("ids", [[]])[0] + logger.info(f"向量检索返回原始结果:documents={len(documents)}, ids={len(ids)}, metadatas={len(metadatas)}") for i, (doc, metadata, memory_id) in enumerate(zip(documents, metadatas, ids)): # 计算相似度 distance = distances[i] if i < len(distances) else 1.0 @@ -390,12 +704,19 @@ class VectorMemoryStorage: if memory: similar_memories.append((memory, similarity)) + # 记录单条结果的关键日志(id,相似度,简短文本) + try: + short_text = (str(memory.content)[:120]) if hasattr(memory, 'content') else (doc[:120] if isinstance(doc, str) else '') + except Exception: + short_text = '' + logger.info(f"检索结果 - id={memory_id}, similarity={similarity:.4f}, summary={short_text}") # 按相似度排序 similar_memories.sort(key=lambda x: x[1], reverse=True) self.stats["total_searches"] += 1 - logger.debug(f"搜索相似记忆: 查询='{query_text[:30]}...', 结果数={len(similar_memories)}") + logger.info(f"搜索相似记忆: query='{query_text[:60]}...', limit={limit}, threshold={threshold}, filters={where_conditions}, 返回数={len(similar_memories)}") + logger.debug(f"搜索相似记忆 详细结果数={len(similar_memories)}") return similar_memories @@ -451,6 +772,7 @@ class VectorMemoryStorage: metadatas = results.get("metadatas", [{}] * len(documents)) ids = results.get("ids", []) + logger.info(f"按过滤条件获取返回: docs={len(documents)}, ids={len(ids)}") for i, (doc, metadata) in enumerate(zip(documents, metadatas)): memory_id = ids[i] if i < len(ids) else None @@ -459,6 +781,7 @@ class VectorMemoryStorage: memory = self._get_from_cache(memory_id) if memory: memories.append(memory) + logger.debug(f"过滤获取命中缓存: id={memory_id}") continue # 从Vector结果重建 @@ -467,6 +790,7 @@ class VectorMemoryStorage: memories.append(memory) if memory_id: self._add_to_cache(memory) + logger.debug(f"过滤获取结果: id={memory_id}, meta_keys={list(metadata.keys())}") return memories @@ -477,14 +801,20 @@ class VectorMemoryStorage: async def update_memory(self, memory: MemoryChunk) -> bool: """更新记忆""" try: + memory_id = getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', None) + if not memory_id: + logger.error("无法更新记忆:缺少memory_id") + return False + # 先删除旧记忆 - await self.delete_memory(memory.memory_id) + await self.delete_memory(memory_id) # 重新存储更新后的记忆 return await self.store_memory(memory) except Exception as e: - logger.error(f"更新记忆 {memory.memory_id} 失败: {e}") + memory_id = getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', 'unknown') + logger.error(f"更新记忆 {memory_id} 失败: {e}") return False async def delete_memory(self, memory_id: str) -> bool: @@ -658,7 +988,7 @@ class VectorMemoryStorageAdapter: query_text, limit, filters=filters ) # 转换为原格式:(memory_id, similarity) - return [(memory.memory_id, similarity) for memory, similarity in results] + return [(getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', 'unknown'), similarity) for memory, similarity in results] def get_stats(self) -> Dict[str, Any]: return self.storage.get_storage_stats() diff --git a/src/chat/replyer/default_generator.py b/src/chat/replyer/default_generator.py index d44e0a354..ecd57639b 100644 --- a/src/chat/replyer/default_generator.py +++ b/src/chat/replyer/default_generator.py @@ -18,6 +18,7 @@ from src.individuality.individuality import get_individuality from src.llm_models.utils_model import LLMRequest from src.chat.message_receive.message import UserInfo, Seg, MessageRecv, MessageSending from src.chat.message_receive.chat_stream import ChatStream +from src.chat.utils.memory_mappings import get_memory_type_chinese_label from src.chat.message_receive.uni_message_sender import HeartFCSender from src.chat.utils.timer_calculator import Timer from src.chat.utils.utils import get_chat_type_and_target_info @@ -587,15 +588,25 @@ class DefaultReplyer: # 转换格式以兼容现有代码 running_memories = [] if enhanced_memories: - for memory_chunk in enhanced_memories: + logger.debug(f"[记忆转换] 收到 {len(enhanced_memories)} 条原始记忆") + for idx, memory_chunk in enumerate(enhanced_memories, 1): + # 获取结构化内容的字符串表示 + structure_display = str(memory_chunk.content) if hasattr(memory_chunk, 'content') else "unknown" + + # 获取记忆内容,优先使用display + content = memory_chunk.display or memory_chunk.text_content or "" + + # 调试:记录每条记忆的内容获取情况 + logger.debug(f"[记忆转换] 第{idx}条: display={repr(memory_chunk.display)[:80]}, text_content={repr(memory_chunk.text_content)[:80]}, final_content={repr(content)[:80]}") + running_memories.append({ - "content": memory_chunk.display or memory_chunk.text_content or "", + "content": content, "memory_type": memory_chunk.memory_type.value, "confidence": memory_chunk.metadata.confidence.value, "importance": memory_chunk.metadata.importance.value, - "relevance": getattr(memory_chunk, 'relevance_score', 0.5), + "relevance": getattr(memory_chunk.metadata, 'relevance_score', 0.5), "source": memory_chunk.metadata.source, - "structure": memory_chunk.content_structure.value if memory_chunk.content_structure else "unknown", + "structure": structure_display, }) # 构建瞬时记忆字符串 @@ -604,27 +615,13 @@ class DefaultReplyer: if top_memory: instant_memory = top_memory[0].get("content", "") - logger.info(f"增强记忆系统检索到 {len(running_memories)} 条记忆") + logger.info(f"增强记忆系统检索到 {len(enhanced_memories)} 条原始记忆,转换为 {len(running_memories)} 条可用记忆") except Exception as e: logger.warning(f"增强记忆系统检索失败: {e}") running_memories = [] instant_memory = "" - def _format_confidence_label(value: Optional[float]) -> str: - if value is None: - return "未知" - mapping = {4: "已验证", 3: "高", 2: "中等", 1: "较低"} - rounded = int(value) - return mapping.get(rounded, f"{value:.2f}") - - def _format_importance_label(value: Optional[float]) -> str: - if value is None: - return "未知" - mapping = {4: "关键", 3: "高", 2: "一般", 1: "较低"} - rounded = int(value) - return mapping.get(rounded, f"{value:.2f}") - # 构建记忆字符串,使用方括号格式 memory_str = "" has_any_memory = False @@ -640,31 +637,32 @@ class DefaultReplyer: # 调试相关度信息 relevance_info = [(m.get('memory_type', 'unknown'), m.get('relevance', 0.0)) for m in sorted_memories] logger.debug(f"记忆相关度信息: {relevance_info}") + logger.debug(f"[记忆构建] 准备将 {len(sorted_memories)} 条记忆添加到提示词") - for running_memory in sorted_memories: + for idx, running_memory in enumerate(sorted_memories, 1): content = running_memory.get('content', '') memory_type = running_memory.get('memory_type', 'unknown') + + # 跳过空内容 + if not content or not content.strip(): + logger.warning(f"[记忆构建] 跳过第 {idx} 条记忆:内容为空 (type={memory_type})") + logger.debug(f"[记忆构建] 空记忆详情: {running_memory}") + continue - # 映射记忆类型到中文标签 - type_mapping = { - "personal_fact": "个人事实", - "preference": "偏好", - "event": "事件", - "opinion": "观点", - "relationship": "个人事实", - "unknown": "未知" - } - chinese_type = type_mapping.get(memory_type, "未知") + # 使用全局记忆类型映射表 + chinese_type = get_memory_type_chinese_label(memory_type) # 提取纯净内容(如果包含旧格式的元数据) clean_content = content if "(类型:" in content and ")" in content: clean_content = content.split("(类型:")[0].strip() + logger.debug(f"[记忆构建] 添加第 {idx} 条记忆: [{chinese_type}] {clean_content[:50]}...") memory_parts.append(f"- **[{chinese_type}]** {clean_content}") memory_str = "\n".join(memory_parts) + "\n" has_any_memory = True + logger.debug(f"[记忆构建] 成功构建记忆字符串,包含 {len(memory_parts) - 2} 条记忆") # 添加瞬时记忆 if instant_memory: diff --git a/src/chat/utils/memory_mappings.py b/src/chat/utils/memory_mappings.py new file mode 100644 index 000000000..79ce50ade --- /dev/null +++ b/src/chat/utils/memory_mappings.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- +""" +记忆系统相关的映射表和工具函数 +提供记忆类型、置信度、重要性等的中文标签映射 +""" + +# 记忆类型到中文标签的完整映射表 +MEMORY_TYPE_CHINESE_MAPPING = { + "personal_fact": "个人事实", + "preference": "偏好", + "event": "事件", + "opinion": "观点", + "relationship": "人际关系", + "emotion": "情感状态", + "knowledge": "知识信息", + "skill": "技能能力", + "goal": "目标计划", + "experience": "经验教训", + "contextual": "上下文信息", + "unknown": "未知" +} + +# 置信度等级到中文标签的映射表 +CONFIDENCE_LEVEL_CHINESE_MAPPING = { + 1: "低置信度", + 2: "中等置信度", + 3: "高置信度", + 4: "已验证", + "LOW": "低置信度", + "MEDIUM": "中等置信度", + "HIGH": "高置信度", + "VERIFIED": "已验证", + "unknown": "未知" +} + +# 重要性等级到中文标签的映射表 +IMPORTANCE_LEVEL_CHINESE_MAPPING = { + 1: "低重要性", + 2: "一般重要性", + 3: "高重要性", + 4: "关键重要性", + "LOW": "低重要性", + "NORMAL": "一般重要性", + "HIGH": "高重要性", + "CRITICAL": "关键重要性", + "unknown": "未知" +} + + +def get_memory_type_chinese_label(memory_type: str) -> str: + """获取记忆类型的中文标签 + + Args: + memory_type: 记忆类型字符串 + + Returns: + str: 对应的中文标签,如果找不到则返回"未知" + """ + return MEMORY_TYPE_CHINESE_MAPPING.get(memory_type, "未知") + + +def get_confidence_level_chinese_label(level) -> str: + """获取置信度等级的中文标签 + + Args: + level: 置信度等级(可以是数字、字符串或枚举实例) + + Returns: + str: 对应的中文标签,如果找不到则返回"未知" + """ + # 处理枚举实例 + if hasattr(level, 'value'): + level = level.value + + # 处理数字 + if isinstance(level, int): + return CONFIDENCE_LEVEL_CHINESE_MAPPING.get(level, "未知") + + # 处理字符串 + if isinstance(level, str): + level_upper = level.upper() + return CONFIDENCE_LEVEL_CHINESE_MAPPING.get(level_upper, "未知") + + return "未知" + + +def get_importance_level_chinese_label(level) -> str: + """获取重要性等级的中文标签 + + Args: + level: 重要性等级(可以是数字、字符串或枚举实例) + + Returns: + str: 对应的中文标签,如果找不到则返回"未知" + """ + # 处理枚举实例 + if hasattr(level, 'value'): + level = level.value + + # 处理数字 + if isinstance(level, int): + return IMPORTANCE_LEVEL_CHINESE_MAPPING.get(level, "未知") + + # 处理字符串 + if isinstance(level, str): + level_upper = level.upper() + return IMPORTANCE_LEVEL_CHINESE_MAPPING.get(level_upper, "未知") + + return "未知" \ No newline at end of file diff --git a/src/common/database/sqlalchemy_database_api.py b/src/common/database/sqlalchemy_database_api.py index 7f740818c..210882dc8 100644 --- a/src/common/database/sqlalchemy_database_api.py +++ b/src/common/database/sqlalchemy_database_api.py @@ -124,6 +124,10 @@ async def db_query( raise ValueError("query_type must be 'get', 'create', 'update', 'delete' or 'count'") async with get_db_session() as session: + if not session: + logger.error("[SQLAlchemy] 无法获取数据库会话") + return None if single_result else [] + if query_type == "get": query = select(model_class) @@ -221,7 +225,7 @@ async def db_query( # 删除记录 affected_rows = 0 for record in records_to_delete: - session.delete(record) + await session.delete(record) affected_rows += 1 return affected_rows @@ -274,6 +278,9 @@ async def db_save( """ try: async with get_db_session() as session: + if not session: + logger.error("[SQLAlchemy] 无法获取数据库会话") + return None # 如果提供了key_field和key_value,尝试更新现有记录 if key_field and key_value is not None: if hasattr(model_class, key_field): diff --git a/src/config/config.py b/src/config/config.py index eb98238d4..50b826bf0 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -420,7 +420,9 @@ class Config(ValidatedConfigBase): default_factory=lambda: CrossContextConfig(), description="跨群聊上下文共享配置" ) affinity_flow: AffinityFlowConfig = Field(default_factory=lambda: AffinityFlowConfig(), description="亲和流配置") - ProactiveThinking: ProactiveThinkingConfig = Field(default_factory=lambda: AffinityFlowConfig(), description="主动思考配置") + proactive_thinking: ProactiveThinkingConfig = Field( + default_factory=lambda: ProactiveThinkingConfig(), description="主动思考配置" + ) class APIAdapterConfig(ValidatedConfigBase): diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 6f12acf77..48cec6599 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -311,11 +311,12 @@ class MemoryConfig(ValidatedConfigBase): enable_vector_instant_memory: bool = Field(default=True, description="启用基于向量的瞬时记忆") # Vector DB配置 - vector_db_memory_collection: str = Field(default="unified_memory_v2", description="Vector DB集合名称") - vector_db_similarity_threshold: float = Field(default=0.8, description="Vector DB相似度阈值") + vector_db_memory_collection: str = Field(default="unified_memory_v2", description="Vector DB记忆集合名称") + vector_db_metadata_collection: str = Field(default="memory_metadata_v2", description="Vector DB元数据集合名称") + vector_db_similarity_threshold: float = Field(default=0.5, description="Vector DB相似度阈值(推荐0.5-0.6,过高会导致检索不到结果)") vector_db_search_limit: int = Field(default=20, description="Vector DB搜索限制") vector_db_batch_size: int = Field(default=100, description="批处理大小") - vector_db_enable_caching: bool = Field(default=True, description="启用缓存") + vector_db_enable_caching: bool = Field(default=True, description="启用内存缓存") vector_db_cache_size_limit: int = Field(default=1000, description="缓存大小限制") vector_db_auto_cleanup_interval: int = Field(default=3600, description="自动清理间隔(秒)") vector_db_retention_hours: int = Field(default=720, description="记忆保留时间(小时,默认30天)") diff --git a/src/plugin_system/base/base_action.py b/src/plugin_system/base/base_action.py index 9fd7c0002..26b79d4df 100644 --- a/src/plugin_system/base/base_action.py +++ b/src/plugin_system/base/base_action.py @@ -382,21 +382,19 @@ class BaseAction(ABC): # 构造命令数据 command_data = {"name": command_name, "args": args or {}} - response = await send_api.adapter_command_to_stream( - action=command_name, - params=args or {}, + success = await send_api.command_to_stream( + command=command_data, stream_id=self.chat_id, - platform=self.platform + storage_message=storage_message, + display_message=display_message, ) - # 根据响应判断成功与否 - if response and response.get("status") == "ok": - logger.info(f"{self.log_prefix} 成功执行适配器命令: {command_name}, 响应: {response.get('data')}") - return True + if success: + logger.info(f"{self.log_prefix} 成功发送命令: {command_name}") else: - error_message = response.get('message', '未知错误') - logger.error(f"{self.log_prefix} 执行适配器命令失败: {command_name}, 错误: {error_message}") - return False + logger.error(f"{self.log_prefix} 发送命令失败: {command_name}") + + return success except Exception as e: logger.error(f"{self.log_prefix} 发送命令时出错: {e}") diff --git a/src/plugins/built_in/affinity_flow_chatter/planner.py b/src/plugins/built_in/affinity_flow_chatter/planner.py index 412c33512..420487d4f 100644 --- a/src/plugins/built_in/affinity_flow_chatter/planner.py +++ b/src/plugins/built_in/affinity_flow_chatter/planner.py @@ -127,7 +127,7 @@ class ChatterActionPlanner: } ) - logger.debug( + logger.info( f"消息 {message.message_id} 兴趣度: {message_interest:.3f}, 应回复: {message.should_reply}" ) diff --git a/src/plugins/built_in/proactive_thinker/plugin.py b/src/plugins/built_in/proactive_thinker/plugin.py index 0a37531ff..c04f82927 100644 --- a/src/plugins/built_in/proactive_thinker/plugin.py +++ b/src/plugins/built_in/proactive_thinker/plugin.py @@ -23,7 +23,7 @@ logger = get_logger(__name__) class ProactiveThinkerPlugin(BasePlugin): """一个主动思考的插件,但现在还只是个空壳子""" plugin_name: str = "proactive_thinker" - enable_plugin: bool = True + enable_plugin: bool = False dependencies: list[str] = [] python_dependencies: list[str] = [] config_file_name: str = "config.toml" diff --git a/src/plugins/built_in/proactive_thinker/proacive_thinker_event.py b/src/plugins/built_in/proactive_thinker/proacive_thinker_event.py index 91b802afc..55492c7f9 100644 --- a/src/plugins/built_in/proactive_thinker/proacive_thinker_event.py +++ b/src/plugins/built_in/proactive_thinker/proacive_thinker_event.py @@ -1,23 +1,220 @@ +import asyncio +import random +import time +from datetime import datetime from typing import List, Union, Type, Optional + +from maim_message import UserInfo + +from src.chat.message_receive.chat_stream import get_chat_manager from src.common.logger import get_logger +from src.config.config import global_config +from src.manager.async_task_manager import async_task_manager, AsyncTask +from src.plugin_system import EventType, BaseEventHandler +from src.plugin_system.apis import chat_api, person_api +from src.plugin_system.base.base_event import HandlerResult +from .proactive_thinker_executor import ProactiveThinkerExecutor logger = get_logger(__name__) -from src.plugin_system import ( - EventType, - BaseEventHandler, - HandlerResult, -) + + +class ColdStartTask(AsyncTask): + """ + 冷启动任务,专门用于处理那些在白名单里,但从未与机器人发生过交互的用户。 + 它的核心职责是“破冰”,主动创建聊天流并发起第一次问候。 + """ + + def __init__(self): + super().__init__(task_name="ColdStartTask") + self.chat_manager = get_chat_manager() + self.executor = ProactiveThinkerExecutor() + + async def run(self): + """任务主循环,周期性地检查是否有需要“破冰”的新用户。""" + logger.info("冷启动任务已启动,将周期性检查白名单中的新朋友。") + # 初始等待一段时间,确保其他服务(如数据库)完全启动 + await asyncio.sleep(20) + + while True: + try: + logger.info("【冷启动】开始扫描白名单,寻找从未聊过的用户...") + + # 从全局配置中获取私聊白名单 + enabled_private_chats = global_config.proactive_thinking.enabled_private_chats + if not enabled_private_chats: + logger.debug("【冷启动】私聊白名单为空,任务暂停一小时。") + await asyncio.sleep(3600) # 白名单为空时,没必要频繁检查 + continue + + # 遍历白名单中的每一个用户 + for chat_id in enabled_private_chats: + try: + platform, user_id_str = chat_id.split(":") + user_id = int(user_id_str) + + # 【核心逻辑】使用 chat_api 检查该用户是否已经存在聊天流(ChatStream) + # 如果返回了 ChatStream 对象,说明已经聊过天了,不是本次任务的目标 + if chat_api.get_stream_by_user_id(user_id_str, platform): + continue # 跳过已存在的用户 + + logger.info(f"【冷启动】发现白名单新用户 {chat_id},准备发起第一次问候。") + + # 【增强体验】尝试从关系数据库中获取该用户的昵称 + # 这样打招呼时可以更亲切,而不是只知道一个冷冰冰的ID + person_id = person_api.get_person_id(platform, user_id) + nickname = await person_api.get_person_value(person_id, "nickname") + + # 如果数据库里有昵称,就用数据库里的;如果没有,就用 "用户+ID" 作为备用 + user_nickname = nickname or f"用户{user_id}" + + # 创建 UserInfo 对象,这是创建聊天流的必要信息 + user_info = UserInfo(platform=platform, user_id=str(user_id), user_nickname=user_nickname) + + # 【关键步骤】主动创建聊天流。 + # 创建后,该用户就进入了机器人的“好友列表”,后续将由 ProactiveThinkingTask 接管 + stream = await self.chat_manager.get_or_create_stream(platform, user_info) + + await self.executor.execute(stream_id=stream.stream_id, start_mode="cold_start") + logger.info(f"【冷启动】已为新用户 {chat_id} (昵称: {user_nickname}) 创建聊天流并发送问候。") + + except ValueError: + logger.warning(f"【冷启动】白名单条目格式错误或用户ID无效,已跳过: {chat_id}") + except Exception as e: + logger.error(f"【冷启动】处理用户 {chat_id} 时发生未知错误: {e}", exc_info=True) + + # 完成一轮检查后,进入长时休眠 + await asyncio.sleep(3600) + + except asyncio.CancelledError: + logger.info("冷启动任务被正常取消。") + break + except Exception as e: + logger.error(f"【冷启动】任务出现严重错误,将在5分钟后重试: {e}", exc_info=True) + await asyncio.sleep(300) + + +class ProactiveThinkingTask(AsyncTask): + """ + 主动思考的后台任务(日常唤醒),负责在聊天“冷却”后重新活跃气氛。 + 它只处理已经存在的聊天流。 + """ + + def __init__(self): + super().__init__(task_name="ProactiveThinkingTask") + self.chat_manager = get_chat_manager() + self.executor = ProactiveThinkerExecutor() + + def _get_next_interval(self) -> float: + """ + 动态计算下一次执行的时间间隔,模拟人类行为的随机性。 + 结合了基础间隔、随机偏移和每日不同时段的活跃度调整。 + """ + # 从配置中读取基础间隔和随机范围 + base_interval = global_config.proactive_thinking.interval + sigma = global_config.proactive_thinking.interval_sigma + + # 1. 在 [base - sigma, base + sigma] 范围内随机取一个值 + interval = random.uniform(base_interval - sigma, base_interval + sigma) + + # 2. 根据当前时间,应用活跃度调整因子 + now = datetime.now() + current_time_str = now.strftime("%H:%M") + + adjust_rules = global_config.proactive_thinking.talk_frequency_adjust + if adjust_rules and adjust_rules[0]: + # 按时间对规则排序,确保能找到正确的时间段 + rules = sorted([rule.split(",") for rule in adjust_rules[0][1:]], key=lambda x: x[0]) + + factor = 1.0 + # 找到最后一个小于等于当前时间的规则 + for time_str, factor_str in rules: + if current_time_str >= time_str: + factor = float(factor_str) + else: + break # 后面的时间都比当前晚,无需再找 + # factor > 1 表示更活跃,所以用除法来缩短间隔 + interval /= factor + + # 保证最小间隔,防止过于频繁的骚扰 + return max(60.0, interval) + + async def run(self): + """任务主循环,周期性地检查所有已存在的聊天是否需要“唤醒”。""" + logger.info("日常唤醒任务已启动,将根据动态间隔检查聊天活跃度。") + await asyncio.sleep(15) # 初始等待 + + while True: + # 计算下一次检查前的休眠时间 + next_interval = self._get_next_interval() + try: + logger.debug(f"【日常唤醒】下一次检查将在 {next_interval:.2f} 秒后进行。") + await asyncio.sleep(next_interval) + + logger.info("【日常唤醒】开始检查不活跃的聊天...") + + # 加载白名单配置 + enabled_private = set(global_config.proactive_thinking.enabled_private_chats) + enabled_groups = set(global_config.proactive_thinking.enabled_group_chats) + + # 获取当前所有聊天流的快照 + all_streams = list(self.chat_manager.streams.values()) + + for stream in all_streams: + # 1. 检查该聊天是否在白名单内(或白名单为空时默认允许) + is_whitelisted = False + if stream.group_info: # 群聊 + if not enabled_groups or f"qq:{stream.group_info.group_id}" in enabled_groups: + is_whitelisted = True + else: # 私聊 + if not enabled_private or f"qq:{stream.user_info.user_id}" in enabled_private: + is_whitelisted = True + + if not is_whitelisted: + continue # 不在白名单内,跳过 + + # 2. 【核心逻辑】检查聊天冷却时间是否足够长 + time_since_last_active = time.time() - stream.last_active_time + if time_since_last_active > next_interval: + logger.info(f"【日常唤醒】聊天流 {stream.stream_id} 已冷却 {time_since_last_active:.2f} 秒,触发主动对话。") + + await self.executor.execute(stream_id=stream.stream_id, start_mode="wake_up") + + # 【关键步骤】在触发后,立刻更新活跃时间并保存。 + # 这可以防止在同一个检查周期内,对同一个目标因为意外的延迟而发送多条消息。 + stream.update_active_time() + await self.chat_manager._save_stream(stream) + + except asyncio.CancelledError: + logger.info("日常唤醒任务被正常取消。") + break + except Exception as e: + logger.error(f"【日常唤醒】任务出现错误,将在60秒后重试: {e}", exc_info=True) + await asyncio.sleep(60) class ProactiveThinkerEventHandler(BaseEventHandler): - """主动思考需要的启动时触发的事件处理器""" + """主动思考插件的启动事件处理器,负责根据配置启动一个或两个后台任务。""" handler_name: str = "proactive_thinker_on_start" handler_description: str = "主动思考插件的启动事件处理器" init_subscribe: List[Union[EventType, str]] = [EventType.ON_START] async def execute(self, kwargs: dict | None) -> "HandlerResult": - """执行事件处理""" - logger.info("ProactiveThinkerPlugin on_start event triggered.") - # 返回 (是否执行成功, 是否需要继续处理, 可选的返回消息) + """在机器人启动时执行,根据配置决定是否启动后台任务。""" + logger.info("检测到插件启动事件,正在初始化【主动思考】插件...") + # 检查总开关 + if global_config.proactive_thinking.enable: + # 启动负责“日常唤醒”的核心任务 + logger.info("【主动思考】功能已启用,正在启动“日常唤醒”任务...") + proactive_task = ProactiveThinkingTask() + await async_task_manager.add_task(proactive_task) + + # 检查“冷启动”功能的独立开关 + if global_config.proactive_thinking.enable_cold_start: + logger.info("“冷启动”功能已启用,正在启动“破冰”任务...") + cold_start_task = ColdStartTask() + await async_task_manager.add_task(cold_start_task) + + else: + logger.info("【主动思考】功能未启用,所有任务均跳过启动。") return HandlerResult(success=True, continue_process=True, message=None) diff --git a/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py b/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py new file mode 100644 index 000000000..b5f280fee --- /dev/null +++ b/src/plugins/built_in/proactive_thinker/proactive_thinker_executor.py @@ -0,0 +1,284 @@ +import orjson +from typing import Optional, Dict, Any +from datetime import datetime + +from src.common.logger import get_logger +from src.plugin_system.apis import chat_api, person_api, schedule_api, send_api, llm_api, message_api, generator_api, database_api +from src.config.config import global_config, model_config +from src.person_info.person_info import get_person_info_manager + +logger = get_logger(__name__) + + +class ProactiveThinkerExecutor: + """ + 主动思考执行器 V2 + - 统一执行入口 + - 引入决策模块,判断是否及如何发起对话 + - 结合人设、日程、关系信息生成更具情境的对话 + """ + + def __init__(self): + # 可以在此初始化所需模块,例如LLM请求器等 + pass + + async def execute(self, stream_id: str, start_mode: str = "wake_up"): + """ + 统一执行入口 + Args: + stream_id: 聊天流ID + start_mode: 启动模式, 'cold_start' 或 'wake_up' + """ + logger.info(f"开始为聊天流 {stream_id} 执行主动思考,模式: {start_mode}") + + # 1. 信息收集 + context = await self._gather_context(stream_id) + if not context: + return + + # 2. 决策阶段 + decision_result = await self._make_decision(context, start_mode) + + + if not decision_result or not decision_result.get("should_reply"): + reason = decision_result.get("reason", "未提供") if decision_result else "决策过程返回None" + logger.info(f"决策结果为:不回复。原因: {reason}") + await database_api.store_action_info( + chat_stream=self._get_stream_from_id(stream_id), + action_name="proactive_decision", + action_prompt_display=f"主动思考决定不回复,原因: {reason}", + action_done = True, + action_data=decision_result + ) + return + + # 3. 规划与执行阶段 + topic = decision_result.get("topic", "打个招呼") + reason = decision_result.get("reason", "无") + await database_api.store_action_info( + chat_stream=self._get_stream_from_id(stream_id), + action_name="proactive_decision", + action_prompt_display=f"主动思考决定回复,原因: {reason},话题:{topic}", + action_done = True, + action_data=decision_result + ) + logger.info(f"决策结果为:回复。话题: {topic}") + + plan_prompt = self._build_plan_prompt(context, start_mode, topic, reason) + + is_success, response, _, _ = await llm_api.generate_with_model(prompt=plan_prompt, model_config=model_config.model_task_config.utils) + + if is_success and response: + stream = self._get_stream_from_id(stream_id) + if stream: + # 使用消息分割器处理并发送消息 + reply_set = generator_api.process_human_text(response, enable_splitter=True, enable_chinese_typo=False) + for reply_type, content in reply_set: + if reply_type == "text": + await send_api.text_to_stream(stream_id=stream.stream_id, text=content) + else: + logger.warning(f"无法发送消息,因为找不到 stream_id 为 {stream_id} 的聊天流") + + def _get_stream_from_id(self, stream_id: str): + """根据stream_id解析并获取stream对象""" + try: + platform, chat_id, stream_type = stream_id.split(":") + if stream_type == "private": + return chat_api.ChatManager.get_private_stream_by_user_id(platform, chat_id) + elif stream_type == "group": + return chat_api.ChatManager.get_group_stream_by_group_id(platform, chat_id) + except Exception as e: + logger.error(f"解析 stream_id ({stream_id}) 或获取 stream 失败: {e}") + return None + + async def _gather_context(self, stream_id: str) -> Optional[Dict[str, Any]]: + """ + 收集构建提示词所需的所有上下文信息 + """ + stream = self._get_stream_from_id(stream_id) + if not stream: + logger.warning(f"无法找到 stream_id 为 {stream_id} 的聊天流") + return None + + user_info = stream.user_info + if not user_info or not user_info.platform or not user_info.user_id: + logger.warning(f"Stream {stream_id} 的 user_info 不完整") + return None + + person_id = person_api.get_person_id(user_info.platform, int(user_info.user_id)) + person_info_manager = get_person_info_manager() + + # 获取日程 + schedules = await schedule_api.ScheduleAPI.get_today_schedule() + schedule_context = "\n".join([f"- {s['title']} ({s['start_time']}-{s['end_time']})" for s in schedules]) if schedules else "今天没有日程安排。" + + # 获取关系信息 + short_impression = await person_info_manager.get_value(person_id, "short_impression") or "无" + impression = await person_info_manager.get_value(person_id, "impression") or "无" + attitude = await person_info_manager.get_value(person_id, "attitude") or 50 + + # 获取最近聊天记录 + recent_messages = await message_api.get_recent_messages(stream_id, limit=10) + recent_chat_history = await message_api.build_readable_messages_to_str(recent_messages) if recent_messages else "无" + + # 获取最近的动作历史 + action_history = await database_api.db_query( + database_api.MODEL_MAPPING["ActionRecords"], + filters={"chat_id": stream_id, "action_name": "proactive_decision"}, + limit=3, + order_by=["-time"] + ) + action_history_context = "无" + if isinstance(action_history, list): + action_history_context = "\n".join([f"- {a['action_data']}" for a in action_history if isinstance(a, dict)]) or "无" + + return { + "person_id": person_id, + "user_info": user_info, + "schedule_context": schedule_context, + "recent_chat_history": recent_chat_history, + "action_history_context": action_history_context, + "relationship": { + "short_impression": short_impression, + "impression": impression, + "attitude": attitude + }, + "persona": { + "core": global_config.personality.personality_core, + "side": global_config.personality.personality_side, + "identity": global_config.personality.identity, + }, + "current_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + async def _make_decision(self, context: Dict[str, Any], start_mode: str) -> Optional[Dict[str, Any]]: + """ + 决策模块:判断是否应该主动发起对话,以及聊什么话题 + """ + persona = context['persona'] + user_info = context['user_info'] + relationship = context['relationship'] + + prompt = f""" +# 角色 +你的名字是{global_config.bot.nickname},你的人设如下: +- 核心人设: {persona['core']} +- 侧面人设: {persona['side']} +- 身份: {persona['identity']} + +# 任务 +现在是 {context['current_time']},你需要根据当前的情境,决定是否要主动向用户 '{user_info.user_nickname}' 发起对话。 + +# 情境分析 +1. **启动模式**: {start_mode} ({'初次见面/很久未见' if start_mode == 'cold_start' else '日常唤醒'}) +2. **你的日程**: +{context['schedule_context']} +3. **你和Ta的关系**: + - 简短印象: {relationship['short_impression']} + - 详细印象: {relationship['impression']} + - 好感度: {relationship['attitude']}/100 +4. **最近的聊天摘要**: +{context['recent_chat_history']} + +# 决策指令 +请综合以上所有信息,做出决策。你的决策需要以JSON格式输出,包含以下字段: +- `should_reply`: bool, 是否应该发起对话。 +- `topic`: str, 如果 `should_reply` 为 true,你打算聊什么话题?(例如:问候一下今天的日程、关心一下昨天的某件事、分享一个你自己的趣事等) +- `reason`: str, 做出此决策的简要理由。 + +--- +示例1 (应该回复): +{{ + "should_reply": true, + "topic": "提醒Ta今天下午有'项目会议'的日程", + "reason": "现在是上午,Ta下午有个重要会议,我觉得应该主动提醒一下,这会显得我很贴心。" +}} + +示例2 (不应回复): +{{ + "should_reply": false, + "topic": null, + "reason": "虽然我们的关系不错,但现在是深夜,而且Ta今天的日程都已经完成了,我没有合适的理由去打扰Ta。" +}} +--- + +请输出你的决策: +""" + + is_success, response, _, _ = await llm_api.generate_with_model(prompt=prompt, model_config=model_config.model_task_config.utils) + + if not is_success: + return {"should_reply": False, "reason": "决策模型生成失败"} + + try: + # 假设LLM返回JSON格式的决策结果 + decision = orjson.loads(response) + return decision + except orjson.JSONDecodeError: + logger.error(f"决策LLM返回的JSON格式无效: {response}") + return {"should_reply": False, "reason": "决策模型返回格式错误"} + + def _build_plan_prompt(self, context: Dict[str, Any], start_mode: str, topic: str, reason: str) -> str: + """ + 根据启动模式和决策话题,构建最终的规划提示词 + """ + persona = context['persona'] + user_info = context['user_info'] + relationship = context['relationship'] + + if start_mode == "cold_start": + prompt = f""" +# 角色 +你的名字是{global_config.bot.nickname},你的人设如下: +- 核心人设: {persona['core']} +- 侧面人设: {persona['side']} +- 身份: {persona['identity']} + +# 任务 +你需要主动向一个新朋友 '{user_info.user_nickname}' 发起对话。这是你们的第一次交流,或者很久没聊了。 + +# 决策上下文 +- **决策理由**: {reason} +- **你和Ta的关系**: + - 简短印象: {relationship['short_impression']} + - 详细印象: {relationship['impression']} + - 好感度: {relationship['attitude']}/100 + +# 对话指引 +- 你的目标是“破冰”,让对话自然地开始。 +- 你应该围绕这个话题展开: {topic} +- 你的语气应该符合你的人设,友好且真诚。 +- 直接输出你要说的第一句话,不要包含任何额外的前缀或解释。 +""" + else: # wake_up + prompt = f""" +# 角色 +你的名字是{global_config.bot.nickname},你的人设如下: +- 核心人设: {persona['core']} +- 侧面人设: {persona['side']} +- 身份: {persona['identity']} + +# 任务 +现在是 {context['current_time']},你需要主动向你的朋友 '{user_info.user_nickname}' 发起对话。 + +# 决策上下文 +- **决策理由**: {reason} + +# 情境分析 +1. **你的日程**: +{context['schedule_context']} +2. **你和Ta的关系**: + - 详细印象: {relationship['impression']} + - 好感度: {relationship['attitude']}/100 +3. **最近的聊天摘要**: +{context['recent_chat_history']} +4. **你最近的相关动作**: +{context['action_history_context']} + +# 对话指引 +- 你决定和Ta聊聊关于“{topic}”的话题。 +- 请结合以上所有情境信息,自然地开启对话。 +- 你的语气应该符合你的人设以及你对Ta的好感度。 +- 直接输出你要说的第一句话,不要包含任何额外的前缀或解释。 +""" + return prompt diff --git a/src/plugins/built_in/social_toolkit_plugin/plugin.py b/src/plugins/built_in/social_toolkit_plugin/plugin.py index d9ea64f64..347c1c504 100644 --- a/src/plugins/built_in/social_toolkit_plugin/plugin.py +++ b/src/plugins/built_in/social_toolkit_plugin/plugin.py @@ -319,7 +319,7 @@ class SetEmojiLikeAction(BaseAction): try: success = await self.send_command( - command_name="set_msg_emoji_like", + command_name="set_emoji_like", args={"message_id": message_id, "emoji_id": emoji_id, "set": set_like}, storage_message=False, ) diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 325728696..d4efa618d 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "7.1.3" +version = "7.1.4" #----以下是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -133,54 +133,6 @@ dynamic_distribution_max_interval = 30.0 # 最大分发间隔(秒) dynamic_distribution_jitter_factor = 0.2 # 分发间隔随机扰动因子 max_concurrent_distributions = 10 # 最大并发处理的消息流数量,可以根据API性能和服务器负载调整 -talk_frequency_adjust = [ - ["", "8:00,1", "12:00,1.2", "18:00,1.5", "01:00,0.6"], - ["qq:114514:group", "12:20,1", "16:10,2", "20:10,1", "00:10,0.3"], - ["qq:1919810:private", "8:20,1", "12:10,2", "20:10,1.5", "00:10,0.2"] -] -# 基于聊天流的个性化活跃度配置 -# 格式:[["platform:chat_id:type", "HH:MM,frequency", "HH:MM,frequency", ...], ...] - -# 全局配置示例: -# [["", "8:00,1", "12:00,2", "18:00,1.5", "00:00,0.5"]] - -# 特定聊天流配置示例: -# [ -# ["", "8:00,1", "12:00,1.2", "18:00,1.5", "01:00,0.6"], # 全局默认配置 -# ["qq:1026294844:group", "12:20,1", "16:10,2", "20:10,1", "00:10,0.3"], # 特定群聊配置 -# ["qq:729957033:private", "8:20,1", "12:10,2", "20:10,1.5", "00:10,0.2"] # 特定私聊配置 -# ] - -# 说明: -# - 当第一个元素为空字符串""时,表示全局默认配置 -# - 当第一个元素为"platform:id:type"格式时,表示特定聊天流配置 -# - 后续元素是"时间,频率"格式,表示从该时间开始使用该活跃度,直到下一个时间点 -# - 优先级:特定聊天流配置 > 全局配置 > 默认 talk_frequency - -# 主动思考功能配置(仅在focus模式下生效) - -enable_proactive_thinking = false # 是否启用主动思考功能 -proactive_thinking_interval = 1500 # 主动思考触发间隔时间(秒),默认1500秒(25分钟) -# TIPS: -# 创意玩法:可以设置为0!设置为0时将基于delta_sigma生成纯随机间隔 -# 负数保险:如果出现了负数,会自动使用绝对值 - -proactive_thinking_in_private = true # 主动思考可以在私聊里面启用 -proactive_thinking_in_group = true # 主动思考可以在群聊里面启用 -# 主动思考启用范围配置 - 按平台和类型分别配置,建议平台配置为小写 -# 格式:["platform:user_id", "platform:user_id", ...] -# 示例:["qq:123456789", "telegram:user123", "bilibili:987654321"] -proactive_thinking_enable_in_private = [] # 启用主动思考的私聊范围,为空则不限制 -proactive_thinking_enable_in_groups = [] # 启用主动思考的群聊范围,为空则不限制 - -delta_sigma = 120 # 正态分布的标准差,控制时间间隔的随机程度 -# 特殊用法: -# - 设置为0:禁用正态分布,使用固定间隔 -# - 设置得很大(如6000):产生高度随机的间隔,即使基础间隔为0也能工作 -# - 负数会自动转换为正数,不用担心配置错误以及极端边界情况 -# 实验建议:试试 proactive_thinking_interval=0 + delta_sigma 非常大 的纯随机模式! -# 结果保证:生成的间隔永远为正数(负数会取绝对值),最小1秒,最大24小时 - [relationship] enable_relationship = true # 是否启用关系系统 @@ -303,11 +255,46 @@ max_frequency_bonus = 10.0 # 最大激活频率奖励天数 # 休眠机制 dormant_threshold_days = 90 # 休眠状态判定天数(超过此天数未访问的记忆进入休眠状态) -# 统一存储配置 (新增) -unified_storage_path = "data/unified_memory" # 统一存储数据路径 -unified_storage_cache_limit = 10000 # 内存缓存大小限制 -unified_storage_auto_save_interval = 50 # 自动保存间隔(记忆条数) -unified_storage_enable_compression = true # 是否启用数据压缩 +# 统一存储配置 (已弃用 - 请使用Vector DB配置) +# DEPRECATED: unified_storage_path = "data/unified_memory" +# DEPRECATED: unified_storage_cache_limit = 10000 +# DEPRECATED: unified_storage_auto_save_interval = 50 +# DEPRECATED: unified_storage_enable_compression = true + +# Vector DB存储配置 (新增 - 替代JSON存储) +enable_vector_memory_storage = true # 启用Vector DB存储 +enable_llm_instant_memory = true # 启用基于LLM的瞬时记忆 +enable_vector_instant_memory = true # 启用基于向量的瞬时记忆 + +# Vector DB配置 +vector_db_memory_collection = "unified_memory_v2" # Vector DB主记忆集合名称 +vector_db_metadata_collection = "memory_metadata_v2" # Vector DB元数据集合名称 +vector_db_similarity_threshold = 0.5 # Vector DB相似度阈值 (推荐范围: 0.5-0.6, 过高会导致检索不到结果) +vector_db_search_limit = 20 # Vector DB单次搜索返回的最大结果数 +vector_db_batch_size = 100 # 批处理大小 (批量存储记忆时每批处理的记忆条数) +vector_db_enable_caching = true # 启用内存缓存 +vector_db_cache_size_limit = 1000 # 缓存大小限制 (内存缓存最多保存的记忆条数) +vector_db_auto_cleanup_interval = 3600 # 自动清理间隔(秒) +vector_db_retention_hours = 720 # 记忆保留时间(小时,默认30天) + +# 多阶段召回配置(可选) +# 取消注释以启用更严格的粗筛,适用于大规模记忆库(>10万条) +# memory_importance_threshold = 0.3 # 重要性阈值(过滤低价值记忆,范围0.0-1.0) +# memory_recency_days = 30 # 时间范围(只搜索最近N天的记忆,0表示不限制) + +# Vector DB配置 (ChromaDB) +[vector_db] +type = "chromadb" # Vector DB类型 +path = "data/chroma_db" # Vector DB数据路径 + +[vector_db.settings] +anonymized_telemetry = false # 禁用匿名遥测 +allow_reset = true # 允许重置 + +[vector_db.collections] +unified_memory_v2 = { description = "统一记忆存储V2", hnsw_space = "cosine", version = "2.0" } +memory_metadata_v2 = { description = "记忆元数据索引", hnsw_space = "cosine", version = "2.0" } +semantic_cache = { description = "语义缓存", hnsw_space = "cosine" } [voice] enable_asr = true # 是否启用语音识别,启用后MoFox-Bot可以识别语音消息,启用该功能需要配置语音识别模型[model.voice] @@ -570,4 +557,33 @@ relationship_weight = 0.3 # 人物关系分数权重 # 提及bot相关参数 mention_bot_adjustment_threshold = 0.3 # 提及bot后的调整阈值 mention_bot_interest_score = 0.6 # 提及bot的兴趣分 -base_relationship_score = 0.3 # 基础人物关系分 \ No newline at end of file +base_relationship_score = 0.3 # 基础人物关系分 + +[proactive_thinking] # 主动思考(主动发起对话)功能配置 +# --- 总开关 --- +enable = true # 是否启用主动发起对话功能 + +# --- 触发时机 --- +# 基础触发间隔(秒),AI会围绕这个时间点主动发起对话 +interval = 1500 # 默认25分钟 +# 间隔随机化标准差(秒),让触发时间更自然。设为0则为固定间隔。 +interval_sigma = 120 +# 每日活跃度调整,格式:[["", "HH:MM,factor", ...], ["stream_id", ...]] +# factor > 1.0 会缩短思考间隔,更活跃;factor < 1.0 会延长间隔。 +talk_frequency_adjust = [['', '8:00,1', '12:00,1.2', '18:00,1.5', '01:00,0.6']] + +# --- 作用范围 --- +enable_in_private = true # 是否允许在私聊中主动发起对话 +enable_in_group = true # 是否允许在群聊中主动发起对话 +# 私聊白名单,为空则对所有私聊生效 +# 格式: ["platform:user_id", ...] e.g., ["qq:123456"] +enabled_private_chats = [] +# 群聊白名单,为空则对所有群聊生效 +# 格式: ["platform:group_id", ...] e.g., ["qq:7891011"] +enabled_group_chats = [] + +# --- 冷启动配置 (针对私聊) --- +# 对于白名单中不活跃的私聊,是否允许进行一次“冷启动”问候 +enable_cold_start = true +# 冷启动后,该私聊的下一次主动思考需要等待的最小时间(秒) +cold_start_cooldown = 86400 # 默认24小时 \ No newline at end of file