This commit is contained in:
tt-P607
2025-10-02 17:13:56 +08:00
22 changed files with 2164 additions and 844 deletions

View File

@@ -0,0 +1,139 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
从现有ChromaDB数据重建JSON元数据索引
"""
import asyncio
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.chat.memory_system.memory_system import MemorySystem
from src.chat.memory_system.memory_metadata_index import MemoryMetadataIndexEntry
from src.common.logger import get_logger
logger = get_logger(__name__)
async def rebuild_metadata_index():
"""从ChromaDB重建元数据索引"""
print("="*80)
print("重建JSON元数据索引")
print("="*80)
# 初始化记忆系统
print("\n🔧 初始化记忆系统...")
ms = MemorySystem()
await ms.initialize()
print("✅ 记忆系统已初始化")
if not hasattr(ms.unified_storage, 'metadata_index'):
print("❌ 元数据索引管理器未初始化")
return
# 获取所有记忆
print("\n📥 从ChromaDB获取所有记忆...")
from src.common.vector_db import vector_db_service
try:
# 获取集合中的所有记忆ID
collection_name = ms.unified_storage.config.memory_collection
result = vector_db_service.get(
collection_name=collection_name,
include=["documents", "metadatas", "embeddings"]
)
if not result or not result.get("ids"):
print("❌ ChromaDB中没有找到记忆数据")
return
ids = result["ids"]
metadatas = result.get("metadatas", [])
print(f"✅ 找到 {len(ids)} 条记忆")
# 重建元数据索引
print("\n🔨 开始重建元数据索引...")
entries = []
success_count = 0
for i, (memory_id, metadata) in enumerate(zip(ids, metadatas), 1):
try:
# 从ChromaDB元数据重建索引条目
import orjson
entry = MemoryMetadataIndexEntry(
memory_id=memory_id,
user_id=metadata.get("user_id", "unknown"),
memory_type=metadata.get("memory_type", "general"),
subjects=orjson.loads(metadata.get("subjects", "[]")),
objects=[metadata.get("object")] if metadata.get("object") else [],
keywords=orjson.loads(metadata.get("keywords", "[]")),
tags=orjson.loads(metadata.get("tags", "[]")),
importance=2, # 默认NORMAL
confidence=2, # 默认MEDIUM
created_at=metadata.get("created_at", 0.0),
access_count=metadata.get("access_count", 0),
chat_id=metadata.get("chat_id"),
content_preview=None
)
# 尝试解析importance和confidence的枚举名称
if "importance" in metadata:
imp_str = metadata["importance"]
if imp_str == "LOW":
entry.importance = 1
elif imp_str == "NORMAL":
entry.importance = 2
elif imp_str == "HIGH":
entry.importance = 3
elif imp_str == "CRITICAL":
entry.importance = 4
if "confidence" in metadata:
conf_str = metadata["confidence"]
if conf_str == "LOW":
entry.confidence = 1
elif conf_str == "MEDIUM":
entry.confidence = 2
elif conf_str == "HIGH":
entry.confidence = 3
elif conf_str == "VERIFIED":
entry.confidence = 4
entries.append(entry)
success_count += 1
if i % 100 == 0:
print(f" 处理进度: {i}/{len(ids)} ({success_count} 成功)")
except Exception as e:
logger.warning(f"处理记忆 {memory_id} 失败: {e}")
continue
print(f"\n✅ 成功解析 {success_count}/{len(ids)} 条记忆元数据")
# 批量更新索引
print("\n💾 保存元数据索引...")
ms.unified_storage.metadata_index.batch_add_or_update(entries)
ms.unified_storage.metadata_index.save()
# 显示统计信息
stats = ms.unified_storage.metadata_index.get_stats()
print(f"\n📊 重建后的索引统计:")
print(f" - 总记忆数: {stats['total_memories']}")
print(f" - 主语数量: {stats['subjects_count']}")
print(f" - 关键词数量: {stats['keywords_count']}")
print(f" - 标签数量: {stats['tags_count']}")
print(f" - 类型分布:")
for mtype, count in stats['types'].items():
print(f" - {mtype}: {count}")
print("\n✅ 元数据索引重建完成!")
except Exception as e:
logger.error(f"重建索引失败: {e}", exc_info=True)
print(f"❌ 重建索引失败: {e}")
if __name__ == '__main__':
asyncio.run(rebuild_metadata_index())

View File

@@ -0,0 +1,23 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
轻量烟雾测试:初始化 MemorySystem 并运行一次检索,验证 MemoryMetadata.source 访问不再报错
"""
import asyncio
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.chat.memory_system.memory_system import MemorySystem
async def main():
ms = MemorySystem()
await ms.initialize()
results = await ms.retrieve_relevant_memories(query_text="测试查询:杰瑞喵喜欢什么?", limit=3)
print(f"检索到 {len(results)} 条记忆(如果 >0 则表明运行成功)")
for i, m in enumerate(results, 1):
print(f"{i}. id={m.metadata.memory_id} source={getattr(m.metadata, 'source', None)}")
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -51,14 +51,6 @@ from .enhanced_memory_activator import (
enhanced_memory_activator
)
# 格式化器
from .memory_formatter import (
MemoryFormatter,
FormatterConfig,
format_memories_for_llm,
format_memories_bracket_style
)
# 兼容性别名
from .memory_chunk import MemoryChunk as Memory
@@ -98,12 +90,6 @@ __all__ = [
"MemoryActivator",
"memory_activator",
"enhanced_memory_activator", # 兼容性别名
# 格式化器
"MemoryFormatter",
"FormatterConfig",
"format_memories_for_llm",
"format_memories_bracket_style",
]
# 版本信息

View File

@@ -96,19 +96,8 @@ class MemoryBuilder:
try:
logger.debug(f"开始从对话构建记忆,文本长度: {len(conversation_text)}")
# 预处理文本
processed_text = self._preprocess_text(conversation_text)
# 确定提取策略
strategy = self._determine_extraction_strategy(processed_text, context)
# 根据策略提取记忆
if strategy == ExtractionStrategy.LLM_BASED:
memories = await self._extract_with_llm(processed_text, context, user_id, timestamp)
elif strategy == ExtractionStrategy.RULE_BASED:
memories = self._extract_with_rules(processed_text, context, user_id, timestamp)
else: # HYBRID
memories = await self._extract_with_hybrid(processed_text, context, user_id, timestamp)
# 使用LLM提取记忆
memories = await self._extract_with_llm(conversation_text, context, user_id, timestamp)
# 后处理和验证
validated_memories = self._validate_and_enhance_memories(memories, context)
@@ -129,41 +118,6 @@ class MemoryBuilder:
self.extraction_stats["failed_extractions"] += 1
raise
def _preprocess_text(self, text: str) -> str:
"""预处理文本"""
# 移除多余的空白字符
text = re.sub(r'\s+', ' ', text.strip())
# 移除特殊字符,但保留基本标点
text = re.sub(r'[^\w\s\u4e00-\u9fff""''()【】]', '', text)
# 截断过长的文本
if len(text) > 2000:
text = text[:2000] + "..."
return text
def _determine_extraction_strategy(self, text: str, context: Dict[str, Any]) -> ExtractionStrategy:
"""确定提取策略"""
text_length = len(text)
has_structured_data = any(key in context for key in ["structured_data", "entities", "keywords"])
message_type = context.get("message_type", "normal")
# 短文本使用规则提取
if text_length < 50:
return ExtractionStrategy.RULE_BASED
# 包含结构化数据使用混合策略
if has_structured_data:
return ExtractionStrategy.HYBRID
# 系统消息或命令使用规则提取
if message_type in ["command", "system"]:
return ExtractionStrategy.RULE_BASED
# 默认使用LLM提取
return ExtractionStrategy.LLM_BASED
async def _extract_with_llm(
self,
text: str,
@@ -190,79 +144,10 @@ class MemoryBuilder:
logger.error(f"LLM提取失败: {e}")
raise MemoryExtractionError(str(e)) from e
def _extract_with_rules(
self,
text: str,
context: Dict[str, Any],
user_id: str,
timestamp: float
) -> List[MemoryChunk]:
"""使用规则提取记忆"""
memories = []
subjects = self._resolve_conversation_participants(context, user_id)
# 规则1: 检测个人信息
personal_info = self._extract_personal_info(text, user_id, timestamp, context, subjects)
memories.extend(personal_info)
# 规则2: 检测偏好信息
preferences = self._extract_preferences(text, user_id, timestamp, context, subjects)
memories.extend(preferences)
# 规则3: 检测事件信息
events = self._extract_events(text, user_id, timestamp, context, subjects)
memories.extend(events)
return memories
async def _extract_with_hybrid(
self,
text: str,
context: Dict[str, Any],
user_id: str,
timestamp: float
) -> List[MemoryChunk]:
"""混合策略提取记忆"""
all_memories = []
# 首先使用规则提取
rule_memories = self._extract_with_rules(text, context, user_id, timestamp)
all_memories.extend(rule_memories)
# 然后使用LLM提取
llm_memories = await self._extract_with_llm(text, context, user_id, timestamp)
# 合并和去重
final_memories = self._merge_hybrid_results(all_memories, llm_memories)
return final_memories
def _build_llm_extraction_prompt(self, text: str, context: Dict[str, Any]) -> str:
"""构建LLM提取提示"""
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
chat_id = context.get("chat_id", "unknown")
message_type = context.get("message_type", "normal")
target_user_id = context.get("user_id", "用户")
target_user_id = str(target_user_id)
target_user_name = (
context.get("user_display_name")
or context.get("user_name")
or context.get("nickname")
or context.get("sender_name")
)
if isinstance(target_user_name, str):
target_user_name = target_user_name.strip()
else:
target_user_name = ""
if not target_user_name or self._looks_like_system_identifier(target_user_name):
target_user_name = "该用户"
target_user_id_display = target_user_id
if self._looks_like_system_identifier(target_user_id_display):
target_user_id_display = "系统ID勿写入记忆"
bot_name = context.get("bot_name")
bot_identity = context.get("bot_identity")
@@ -966,145 +851,6 @@ class MemoryBuilder:
return f"{subject_phrase}{predicate}".strip()
return subject_phrase
def _extract_personal_info(
self,
text: str,
user_id: str,
timestamp: float,
context: Dict[str, Any],
subjects: List[str]
) -> List[MemoryChunk]:
"""提取个人信息"""
memories = []
# 常见个人信息模式
patterns = {
r"我叫(\w+)": ("is_named", {"name": "$1"}),
r"我今年(\d+)岁": ("is_age", {"age": "$1"}),
r"我是(\w+)": ("is_profession", {"profession": "$1"}),
r"我住在(\w+)": ("lives_in", {"location": "$1"}),
r"我的电话是(\d+)": ("has_phone", {"phone": "$1"}),
r"我的邮箱是(\w+@\w+\.\w+)": ("has_email", {"email": "$1"}),
}
for pattern, (predicate, obj_template) in patterns.items():
match = re.search(pattern, text)
if match:
obj = obj_template
for i, group in enumerate(match.groups(), 1):
obj = {k: v.replace(f"${i}", group) for k, v in obj.items()}
memory = create_memory_chunk(
user_id=user_id,
subject=subjects,
predicate=predicate,
obj=obj,
memory_type=MemoryType.PERSONAL_FACT,
chat_id=context.get("chat_id"),
importance=ImportanceLevel.HIGH,
confidence=ConfidenceLevel.HIGH,
display=self._compose_display_text(subjects, predicate, obj)
)
memories.append(memory)
return memories
def _extract_preferences(
self,
text: str,
user_id: str,
timestamp: float,
context: Dict[str, Any],
subjects: List[str]
) -> List[MemoryChunk]:
"""提取偏好信息"""
memories = []
# 偏好模式
preference_patterns = [
(r"我喜欢(.+)", "likes"),
(r"我不喜欢(.+)", "dislikes"),
(r"我爱吃(.+)", "likes_food"),
(r"我讨厌(.+)", "hates"),
(r"我最喜欢的(.+)", "favorite_is"),
]
for pattern, predicate in preference_patterns:
match = re.search(pattern, text)
if match:
memory = create_memory_chunk(
user_id=user_id,
subject=subjects,
predicate=predicate,
obj=match.group(1),
memory_type=MemoryType.PREFERENCE,
chat_id=context.get("chat_id"),
importance=ImportanceLevel.NORMAL,
confidence=ConfidenceLevel.MEDIUM,
display=self._compose_display_text(subjects, predicate, match.group(1))
)
memories.append(memory)
return memories
def _extract_events(
self,
text: str,
user_id: str,
timestamp: float,
context: Dict[str, Any],
subjects: List[str]
) -> List[MemoryChunk]:
"""提取事件信息"""
memories = []
# 事件关键词
event_keywords = ["明天", "今天", "昨天", "上周", "下周", "约会", "会议", "活动", "旅行", "生日"]
if any(keyword in text for keyword in event_keywords):
memory = create_memory_chunk(
user_id=user_id,
subject=subjects,
predicate="mentioned_event",
obj={"event_text": text, "timestamp": timestamp},
memory_type=MemoryType.EVENT,
chat_id=context.get("chat_id"),
importance=ImportanceLevel.NORMAL,
confidence=ConfidenceLevel.MEDIUM,
display=self._compose_display_text(subjects, "mentioned_event", text)
)
memories.append(memory)
return memories
def _merge_hybrid_results(
self,
rule_memories: List[MemoryChunk],
llm_memories: List[MemoryChunk]
) -> List[MemoryChunk]:
"""合并混合策略结果"""
all_memories = rule_memories.copy()
# 添加LLM记忆避免重复
for llm_memory in llm_memories:
is_duplicate = False
for rule_memory in rule_memories:
if llm_memory.is_similar_to(rule_memory, threshold=0.7):
is_duplicate = True
# 合并置信度
rule_memory.metadata.confidence = ConfidenceLevel(
max(rule_memory.metadata.confidence.value, llm_memory.metadata.confidence.value)
)
break
if not is_duplicate:
all_memories.append(llm_memory)
return all_memories
def _validate_and_enhance_memories(
self,
memories: List[MemoryChunk],

View File

@@ -127,6 +127,8 @@ class MemoryMetadata:
# 来源信息
source_context: Optional[str] = None # 来源上下文片段
# 兼容旧字段: 一些代码或旧版本可能直接访问 metadata.source
source: Optional[str] = None
def __post_init__(self):
"""后初始化处理"""
@@ -150,6 +152,19 @@ class MemoryMetadata:
if self.last_forgetting_check == 0:
self.last_forgetting_check = current_time
# 兼容性:如果旧字段 source 被使用,保证 source 与 source_context 同步
if not getattr(self, 'source', None) and getattr(self, 'source_context', None):
try:
self.source = str(self.source_context)
except Exception:
self.source = None
# 如果有 source 字段但 source_context 为空,也同步回去
if not getattr(self, 'source_context', None) and getattr(self, 'source', None):
try:
self.source_context = str(self.source)
except Exception:
self.source_context = None
def update_access(self):
"""更新访问信息"""
current_time = time.time()

View File

@@ -1,331 +0,0 @@
# -*- coding: utf-8 -*-
"""
记忆格式化器
将召回的记忆转化为LLM友好的Markdown格式
"""
from typing import List, Dict, Any, Optional
from datetime import datetime
from dataclasses import dataclass
from src.common.logger import get_logger
from src.chat.memory_system.memory_chunk import MemoryChunk, MemoryType
logger = get_logger(__name__)
@dataclass
class FormatterConfig:
"""格式化器配置"""
include_timestamps: bool = True # 是否包含时间信息
include_memory_types: bool = True # 是否包含记忆类型
include_confidence: bool = False # 是否包含置信度信息
max_display_length: int = 200 # 单条记忆最大显示长度
datetime_format: str = "%Y年%m月%d" # 时间格式
use_emoji_icons: bool = True # 是否使用emoji图标
group_by_type: bool = False # 是否按类型分组
use_bracket_format: bool = False # 是否使用方括号格式 [类型] 内容
compact_format: bool = False # 是否使用紧凑格式
class MemoryFormatter:
"""记忆格式化器 - 将记忆转化为提示词友好的格式"""
# 记忆类型对应的emoji图标
TYPE_EMOJI_MAP = {
MemoryType.PERSONAL_FACT: "👤",
MemoryType.EVENT: "📅",
MemoryType.PREFERENCE: "❤️",
MemoryType.OPINION: "💭",
MemoryType.RELATIONSHIP: "👥",
MemoryType.EMOTION: "😊",
MemoryType.KNOWLEDGE: "📚",
MemoryType.SKILL: "🛠️",
MemoryType.GOAL: "🎯",
MemoryType.EXPERIENCE: "🌟",
MemoryType.CONTEXTUAL: "💬"
}
# 记忆类型的中文标签 - 优化格式
TYPE_LABELS = {
MemoryType.PERSONAL_FACT: "个人事实",
MemoryType.EVENT: "事件",
MemoryType.PREFERENCE: "偏好",
MemoryType.OPINION: "观点",
MemoryType.RELATIONSHIP: "关系",
MemoryType.EMOTION: "情感",
MemoryType.KNOWLEDGE: "知识",
MemoryType.SKILL: "技能",
MemoryType.GOAL: "目标",
MemoryType.EXPERIENCE: "经验",
MemoryType.CONTEXTUAL: "上下文"
}
def __init__(self, config: Optional[FormatterConfig] = None):
self.config = config or FormatterConfig()
def format_memories_for_prompt(
self,
memories: List[MemoryChunk],
query_context: Optional[str] = None
) -> str:
"""
将记忆列表格式化为LLM提示词
Args:
memories: 记忆列表
query_context: 查询上下文(可选)
Returns:
格式化的Markdown文本
"""
if not memories:
return ""
lines = ["## 🧠 相关记忆回顾", ""]
if self.config.group_by_type:
lines.extend(self._format_memories_by_type(memories))
else:
lines.extend(self._format_memories_chronologically(memories))
return "\n".join(lines)
def _format_memories_by_type(self, memories: List[MemoryChunk]) -> List[str]:
"""按类型分组格式化记忆"""
# 按类型分组
grouped_memories = {}
for memory in memories:
memory_type = memory.memory_type
if memory_type not in grouped_memories:
grouped_memories[memory_type] = []
grouped_memories[memory_type].append(memory)
lines = []
# 为每个类型生成格式化文本
for memory_type, type_memories in grouped_memories.items():
emoji = self.TYPE_EMOJI_MAP.get(memory_type, "📝")
label = self.TYPE_LABELS.get(memory_type, memory_type.value)
lines.extend([
f"### {emoji} {label}",
""
])
for memory in type_memories:
formatted_item = self._format_single_memory(memory, include_type=False)
lines.append(formatted_item)
lines.append("") # 类型间空行
return lines
def _format_memories_chronologically(self, memories: List[MemoryChunk]) -> List[str]:
"""按时间顺序格式化记忆"""
lines = []
for i, memory in enumerate(memories, 1):
formatted_item = self._format_single_memory(memory, include_type=True, index=i)
lines.append(formatted_item)
return lines
def _format_single_memory(
self,
memory: MemoryChunk,
include_type: bool = True,
index: Optional[int] = None
) -> str:
"""格式化单条记忆"""
# 如果启用方括号格式,使用新格式
if self.config.use_bracket_format:
return self._format_single_memory_bracket(memory)
# 获取显示文本
display_text = memory.display or memory.text_content
if len(display_text) > self.config.max_display_length:
display_text = display_text[:self.config.max_display_length - 3] + "..."
# 构建前缀
prefix_parts = []
# 添加序号
if index is not None:
prefix_parts.append(f"{index}.")
# 添加类型标签
if include_type and self.config.include_memory_types:
if self.config.use_emoji_icons:
emoji = self.TYPE_EMOJI_MAP.get(memory.memory_type, "📝")
prefix_parts.append(f"**{emoji}")
else:
label = self.TYPE_LABELS.get(memory.memory_type, memory.memory_type.value)
prefix_parts.append(f"**[{label}]")
# 添加时间信息
if self.config.include_timestamps:
timestamp = memory.metadata.created_at
if timestamp > 0:
dt = datetime.fromtimestamp(timestamp)
time_str = dt.strftime(self.config.datetime_format)
if self.config.use_emoji_icons:
prefix_parts.append(f"{time_str}")
else:
prefix_parts.append(f"({time_str})")
# 添加置信度信息
if self.config.include_confidence:
confidence = memory.metadata.confidence.value
confidence_stars = "" * confidence + "" * (4 - confidence)
prefix_parts.append(f"信度:{confidence_stars}")
# 构建完整格式
if prefix_parts:
if self.config.include_memory_types and self.config.use_emoji_icons:
prefix = " ".join(prefix_parts) + "** "
else:
prefix = " ".join(prefix_parts) + " "
return f"- {prefix}{display_text}"
else:
return f"- {display_text}"
def _format_single_memory_bracket(self, memory: MemoryChunk) -> str:
"""格式化单条记忆 - 使用方括号格式 [类型] 内容"""
# 获取显示文本
display_text = memory.display or memory.text_content
# 如果启用紧凑格式,只显示核心内容
if self.config.compact_format:
if len(display_text) > self.config.max_display_length:
display_text = display_text[:self.config.max_display_length - 3] + "..."
else:
# 非紧凑格式可以包含时间信息
if self.config.include_timestamps:
timestamp = memory.metadata.created_at
if timestamp > 0:
dt = datetime.fromtimestamp(timestamp)
time_str = dt.strftime("%Y年%m月%d")
# 将时间信息自然地整合到内容中
if "" not in display_text and "" not in display_text:
display_text = f"{time_str}{display_text}"
# 获取类型标签
label = self.TYPE_LABELS.get(memory.memory_type, memory.memory_type.value)
# 构建方括号格式: **[类型]** 内容
return f"- **[{label}]** {display_text}"
def format_memory_summary(self, memories: List[MemoryChunk]) -> str:
"""生成记忆摘要统计"""
if not memories:
return "暂无相关记忆。"
# 统计信息
total_count = len(memories)
type_counts = {}
for memory in memories:
memory_type = memory.memory_type
type_counts[memory_type] = type_counts.get(memory_type, 0) + 1
# 生成摘要
lines = [f"**记忆摘要**: 共找到 {total_count} 条相关记忆"]
if len(type_counts) > 1:
type_summaries = []
for memory_type, count in type_counts.items():
emoji = self.TYPE_EMOJI_MAP.get(memory_type, "📝")
label = self.TYPE_LABELS.get(memory_type, memory_type.value)
type_summaries.append(f"{emoji}{label} {count}")
lines.append(f"包括: {', '.join(type_summaries)}")
return " | ".join(lines)
def format_for_debug(self, memories: List[MemoryChunk]) -> str:
"""生成调试格式的记忆列表"""
if not memories:
return "无记忆数据"
lines = ["### 记忆调试信息", ""]
for i, memory in enumerate(memories, 1):
lines.extend([
f"**记忆 {i}** (ID: {memory.memory_id[:8]})",
f"- 类型: {memory.memory_type.value}",
f"- 内容: {memory.display[:100]}{'...' if len(memory.display) > 100 else ''}",
f"- 访问次数: {memory.metadata.access_count}",
f"- 置信度: {memory.metadata.confidence.value}/4",
f"- 重要性: {memory.metadata.importance.value}/4",
f"- 创建时间: {datetime.fromtimestamp(memory.metadata.created_at).strftime('%Y-%m-%d %H:%M')}",
""
])
return "\n".join(lines)
# 创建默认格式化器实例
default_formatter = MemoryFormatter()
def format_memories_for_llm(
memories: List[MemoryChunk],
query_context: Optional[str] = None,
config: Optional[FormatterConfig] = None
) -> str:
"""
便捷函数将记忆格式化为LLM提示词
"""
if config:
formatter = MemoryFormatter(config)
else:
formatter = default_formatter
return formatter.format_memories_for_prompt(memories, query_context)
def format_memory_summary(
memories: List[MemoryChunk],
config: Optional[FormatterConfig] = None
) -> str:
"""
便捷函数:生成记忆摘要
"""
if config:
formatter = MemoryFormatter(config)
else:
formatter = default_formatter
return formatter.format_memory_summary(memories)
def format_memories_bracket_style(
memories: List[MemoryChunk],
query_context: Optional[str] = None,
compact: bool = True,
include_timestamps: bool = True
) -> str:
"""
便捷函数:使用方括号格式格式化记忆
Args:
memories: 记忆列表
query_context: 查询上下文
compact: 是否使用紧凑格式
include_timestamps: 是否包含时间信息
Returns:
格式化的Markdown文本
"""
config = FormatterConfig(
use_bracket_format=True,
compact_format=compact,
include_timestamps=include_timestamps,
include_memory_types=True,
use_emoji_icons=False,
group_by_type=False
)
formatter = MemoryFormatter(config)
return formatter.format_memories_for_prompt(memories, query_context)

View File

@@ -0,0 +1,496 @@
# -*- coding: utf-8 -*-
"""
记忆元数据索引管理器
使用JSON文件存储记忆元数据支持快速模糊搜索和过滤
"""
import orjson
import threading
from pathlib import Path
from typing import Dict, List, Optional, Set, Any
from dataclasses import dataclass, asdict
from datetime import datetime
from src.common.logger import get_logger
from src.chat.memory_system.memory_chunk import MemoryType, ImportanceLevel, ConfidenceLevel
logger = get_logger(__name__)
@dataclass
class MemoryMetadataIndexEntry:
"""元数据索引条目(轻量级,只用于快速过滤)"""
memory_id: str
user_id: str
# 分类信息
memory_type: str # MemoryType.value
subjects: List[str] # 主语列表
objects: List[str] # 宾语列表
keywords: List[str] # 关键词列表
tags: List[str] # 标签列表
# 数值字段(用于范围过滤)
importance: int # ImportanceLevel.value (1-4)
confidence: int # ConfidenceLevel.value (1-4)
created_at: float # 创建时间戳
access_count: int # 访问次数
# 可选字段
chat_id: Optional[str] = None
content_preview: Optional[str] = None # 内容预览前100字符
class MemoryMetadataIndex:
"""记忆元数据索引管理器"""
def __init__(self, index_file: str = "data/memory_metadata_index.json"):
self.index_file = Path(index_file)
self.index: Dict[str, MemoryMetadataIndexEntry] = {} # memory_id -> entry
# 倒排索引(用于快速查找)
self.type_index: Dict[str, Set[str]] = {} # type -> {memory_ids}
self.subject_index: Dict[str, Set[str]] = {} # subject -> {memory_ids}
self.keyword_index: Dict[str, Set[str]] = {} # keyword -> {memory_ids}
self.tag_index: Dict[str, Set[str]] = {} # tag -> {memory_ids}
self.lock = threading.RLock()
# 加载已有索引
self._load_index()
def _load_index(self):
"""从文件加载索引"""
if not self.index_file.exists():
logger.info(f"元数据索引文件不存在,将创建新索引: {self.index_file}")
return
try:
with open(self.index_file, 'rb') as f:
data = orjson.loads(f.read())
# 重建内存索引
for entry_data in data.get('entries', []):
entry = MemoryMetadataIndexEntry(**entry_data)
self.index[entry.memory_id] = entry
self._update_inverted_indices(entry)
logger.info(f"✅ 加载元数据索引: {len(self.index)} 条记录")
except Exception as e:
logger.error(f"加载元数据索引失败: {e}", exc_info=True)
def _save_index(self):
"""保存索引到文件"""
try:
# 确保目录存在
self.index_file.parent.mkdir(parents=True, exist_ok=True)
# 序列化所有条目
entries = [asdict(entry) for entry in self.index.values()]
data = {
'version': '1.0',
'count': len(entries),
'last_updated': datetime.now().isoformat(),
'entries': entries
}
# 写入文件(使用临时文件 + 原子重命名)
temp_file = self.index_file.with_suffix('.tmp')
with open(temp_file, 'wb') as f:
f.write(orjson.dumps(data, option=orjson.OPT_INDENT_2))
temp_file.replace(self.index_file)
logger.debug(f"元数据索引已保存: {len(entries)} 条记录")
except Exception as e:
logger.error(f"保存元数据索引失败: {e}", exc_info=True)
def _update_inverted_indices(self, entry: MemoryMetadataIndexEntry):
"""更新倒排索引"""
memory_id = entry.memory_id
# 类型索引
self.type_index.setdefault(entry.memory_type, set()).add(memory_id)
# 主语索引
for subject in entry.subjects:
subject_norm = subject.strip().lower()
if subject_norm:
self.subject_index.setdefault(subject_norm, set()).add(memory_id)
# 关键词索引
for keyword in entry.keywords:
keyword_norm = keyword.strip().lower()
if keyword_norm:
self.keyword_index.setdefault(keyword_norm, set()).add(memory_id)
# 标签索引
for tag in entry.tags:
tag_norm = tag.strip().lower()
if tag_norm:
self.tag_index.setdefault(tag_norm, set()).add(memory_id)
def add_or_update(self, entry: MemoryMetadataIndexEntry):
"""添加或更新索引条目"""
with self.lock:
# 如果已存在,先从倒排索引中移除旧记录
if entry.memory_id in self.index:
self._remove_from_inverted_indices(entry.memory_id)
# 添加新记录
self.index[entry.memory_id] = entry
self._update_inverted_indices(entry)
def _remove_from_inverted_indices(self, memory_id: str):
"""从倒排索引中移除记录"""
if memory_id not in self.index:
return
entry = self.index[memory_id]
# 从类型索引移除
if entry.memory_type in self.type_index:
self.type_index[entry.memory_type].discard(memory_id)
# 从主语索引移除
for subject in entry.subjects:
subject_norm = subject.strip().lower()
if subject_norm in self.subject_index:
self.subject_index[subject_norm].discard(memory_id)
# 从关键词索引移除
for keyword in entry.keywords:
keyword_norm = keyword.strip().lower()
if keyword_norm in self.keyword_index:
self.keyword_index[keyword_norm].discard(memory_id)
# 从标签索引移除
for tag in entry.tags:
tag_norm = tag.strip().lower()
if tag_norm in self.tag_index:
self.tag_index[tag_norm].discard(memory_id)
def remove(self, memory_id: str):
"""移除索引条目"""
with self.lock:
if memory_id in self.index:
self._remove_from_inverted_indices(memory_id)
del self.index[memory_id]
def batch_add_or_update(self, entries: List[MemoryMetadataIndexEntry]):
"""批量添加或更新"""
with self.lock:
for entry in entries:
self.add_or_update(entry)
def save(self):
"""保存索引到磁盘"""
with self.lock:
self._save_index()
def search(
self,
memory_types: Optional[List[str]] = None,
subjects: Optional[List[str]] = None,
keywords: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
importance_min: Optional[int] = None,
importance_max: Optional[int] = None,
created_after: Optional[float] = None,
created_before: Optional[float] = None,
user_id: Optional[str] = None,
limit: Optional[int] = None,
flexible_mode: bool = True # 新增:灵活匹配模式
) -> List[str]:
"""
搜索符合条件的记忆ID列表支持模糊匹配
Returns:
List[str]: 符合条件的 memory_id 列表
"""
with self.lock:
if flexible_mode:
return self._search_flexible(
memory_types=memory_types,
subjects=subjects,
keywords=keywords, # 保留用于兼容性
tags=tags, # 保留用于兼容性
created_after=created_after,
created_before=created_before,
user_id=user_id,
limit=limit
)
else:
return self._search_strict(
memory_types=memory_types,
subjects=subjects,
keywords=keywords,
tags=tags,
importance_min=importance_min,
importance_max=importance_max,
created_after=created_after,
created_before=created_before,
user_id=user_id,
limit=limit
)
def _search_flexible(
self,
memory_types: Optional[List[str]] = None,
subjects: Optional[List[str]] = None,
created_after: Optional[float] = None,
created_before: Optional[float] = None,
user_id: Optional[str] = None,
limit: Optional[int] = None,
**kwargs # 接受但不使用的参数
) -> List[str]:
"""
灵活搜索模式2/4项匹配即可支持部分匹配
评分维度:
1. 记忆类型匹配 (0-1分)
2. 主语匹配 (0-1分)
3. 宾语匹配 (0-1分)
4. 时间范围匹配 (0-1分)
总分 >= 2分即视为有效
"""
# 用户过滤(必选)
if user_id:
base_candidates = {
mid for mid, entry in self.index.items()
if entry.user_id == user_id
}
else:
base_candidates = set(self.index.keys())
scored_candidates = []
for memory_id in base_candidates:
entry = self.index[memory_id]
score = 0
match_details = []
# 1. 记忆类型匹配
if memory_types:
type_score = 0
for mtype in memory_types:
if entry.memory_type == mtype:
type_score = 1
break
# 部分匹配:类型名称包含
if mtype.lower() in entry.memory_type.lower() or entry.memory_type.lower() in mtype.lower():
type_score = 0.5
break
score += type_score
if type_score > 0:
match_details.append(f"类型:{entry.memory_type}")
else:
match_details.append("类型:未指定")
# 2. 主语匹配(支持部分匹配)
if subjects:
subject_score = 0
for subject in subjects:
subject_norm = subject.strip().lower()
for entry_subject in entry.subjects:
entry_subject_norm = entry_subject.strip().lower()
# 精确匹配
if subject_norm == entry_subject_norm:
subject_score = 1
break
# 部分匹配:包含关系
if subject_norm in entry_subject_norm or entry_subject_norm in subject_norm:
subject_score = 0.6
break
if subject_score == 1:
break
score += subject_score
if subject_score > 0:
match_details.append("主语:匹配")
else:
match_details.append("主语:未指定")
# 3. 宾语匹配(支持部分匹配)
object_score = 0
if entry.objects:
for entry_object in entry.objects:
entry_object_norm = str(entry_object).strip().lower()
# 检查是否与主语相关(主宾关联)
for subject in subjects or []:
subject_norm = subject.strip().lower()
if subject_norm in entry_object_norm or entry_object_norm in subject_norm:
object_score = 0.8
match_details.append("宾语:主宾关联")
break
if object_score > 0:
break
score += object_score
if object_score > 0:
match_details.append("宾语:匹配")
elif not entry.objects:
match_details.append("宾语:无")
# 4. 时间范围匹配
time_score = 0
if created_after is not None or created_before is not None:
time_match = True
if created_after is not None and entry.created_at < created_after:
time_match = False
if created_before is not None and entry.created_at > created_before:
time_match = False
if time_match:
time_score = 1
match_details.append("时间:匹配")
else:
match_details.append("时间:不匹配")
else:
match_details.append("时间:未指定")
score += time_score
# 只有总分 >= 2 的记忆才会被返回
if score >= 2:
scored_candidates.append((memory_id, score, match_details))
# 按分数和时间排序
scored_candidates.sort(key=lambda x: (x[1], self.index[x[0]].created_at), reverse=True)
if limit:
result_ids = [mid for mid, _, _ in scored_candidates[:limit]]
else:
result_ids = [mid for mid, _, _ in scored_candidates]
logger.debug(
f"[灵活搜索] 过滤条件: types={memory_types}, subjects={subjects}, "
f"time_range=[{created_after}, {created_before}], 返回={len(result_ids)}"
)
# 记录匹配统计
if scored_candidates and len(scored_candidates) > 0:
avg_score = sum(score for _, score, _ in scored_candidates) / len(scored_candidates)
logger.debug(f"[灵活搜索] 平均匹配分数: {avg_score:.2f}, 最高分: {scored_candidates[0][1]:.2f}")
return result_ids
def _search_strict(
self,
memory_types: Optional[List[str]] = None,
subjects: Optional[List[str]] = None,
keywords: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
importance_min: Optional[int] = None,
importance_max: Optional[int] = None,
created_after: Optional[float] = None,
created_before: Optional[float] = None,
user_id: Optional[str] = None,
limit: Optional[int] = None
) -> List[str]:
"""严格搜索模式(原有逻辑)"""
# 初始候选集(所有记忆)
candidate_ids: Optional[Set[str]] = None
# 用户过滤(必选)
if user_id:
candidate_ids = {
mid for mid, entry in self.index.items()
if entry.user_id == user_id
}
else:
candidate_ids = set(self.index.keys())
# 类型过滤OR关系
if memory_types:
type_ids = set()
for mtype in memory_types:
type_ids.update(self.type_index.get(mtype, set()))
candidate_ids &= type_ids
# 主语过滤OR关系支持模糊匹配
if subjects:
subject_ids = set()
for subject in subjects:
subject_norm = subject.strip().lower()
# 精确匹配
if subject_norm in self.subject_index:
subject_ids.update(self.subject_index[subject_norm])
# 模糊匹配(包含)
for indexed_subject, ids in self.subject_index.items():
if subject_norm in indexed_subject or indexed_subject in subject_norm:
subject_ids.update(ids)
candidate_ids &= subject_ids
# 关键词过滤OR关系支持模糊匹配
if keywords:
keyword_ids = set()
for keyword in keywords:
keyword_norm = keyword.strip().lower()
# 精确匹配
if keyword_norm in self.keyword_index:
keyword_ids.update(self.keyword_index[keyword_norm])
# 模糊匹配(包含)
for indexed_keyword, ids in self.keyword_index.items():
if keyword_norm in indexed_keyword or indexed_keyword in keyword_norm:
keyword_ids.update(ids)
candidate_ids &= keyword_ids
# 标签过滤OR关系
if tags:
tag_ids = set()
for tag in tags:
tag_norm = tag.strip().lower()
tag_ids.update(self.tag_index.get(tag_norm, set()))
candidate_ids &= tag_ids
# 重要性过滤
if importance_min is not None or importance_max is not None:
importance_ids = {
mid for mid in candidate_ids
if (importance_min is None or self.index[mid].importance >= importance_min)
and (importance_max is None or self.index[mid].importance <= importance_max)
}
candidate_ids &= importance_ids
# 时间范围过滤
if created_after is not None or created_before is not None:
time_ids = {
mid for mid in candidate_ids
if (created_after is None or self.index[mid].created_at >= created_after)
and (created_before is None or self.index[mid].created_at <= created_before)
}
candidate_ids &= time_ids
# 转换为列表并排序(按创建时间倒序)
result_ids = sorted(
candidate_ids,
key=lambda mid: self.index[mid].created_at,
reverse=True
)
# 限制数量
if limit:
result_ids = result_ids[:limit]
logger.debug(
f"[严格搜索] types={memory_types}, subjects={subjects}, "
f"keywords={keywords}, 返回={len(result_ids)}"
)
return result_ids
def get_entry(self, memory_id: str) -> Optional[MemoryMetadataIndexEntry]:
"""获取单个索引条目"""
return self.index.get(memory_id)
def get_stats(self) -> Dict[str, Any]:
"""获取索引统计信息"""
with self.lock:
return {
'total_memories': len(self.index),
'types': {mtype: len(ids) for mtype, ids in self.type_index.items()},
'subjects_count': len(self.subject_index),
'keywords_count': len(self.keyword_index),
'tags_count': len(self.tag_index),
}

View File

@@ -135,22 +135,76 @@ class MemoryQueryPlanner:
persona = context.get("bot_personality") or context.get("bot_identity") or "未知"
# 构建未读消息上下文信息
context_section = ""
if context.get("has_unread_context") and context.get("unread_messages_context"):
unread_context = context["unread_messages_context"]
unread_messages = unread_context.get("messages", [])
unread_keywords = unread_context.get("keywords", [])
unread_participants = unread_context.get("participants", [])
context_summary = unread_context.get("context_summary", "")
if unread_messages:
# 构建未读消息摘要
message_previews = []
for msg in unread_messages[:5]: # 最多显示5条
sender = msg.get("sender", "未知")
content = msg.get("content", "")[:100] # 限制每条消息长度
message_previews.append(f"{sender}: {content}")
context_section = f"""
## 📋 未读消息上下文 (共{unread_context.get('total_count', 0)}条未读消息)
### 最近消息预览:
{chr(10).join(message_previews)}
### 上下文关键词:
{', '.join(unread_keywords[:15]) if unread_keywords else ''}
### 对话参与者:
{', '.join(unread_participants) if unread_participants else ''}
### 上下文摘要:
{context_summary[:300] if context_summary else ''}
"""
else:
context_section = """
## 📋 未读消息上下文:
无未读消息或上下文信息不可用
"""
return f"""
你是一名记忆检索规划助手,请基于输入生成一个简洁的 JSON 检索计划。
你的任务是分析当前查询并结合未读消息的上下文,生成更精准的记忆检索策略。
仅需提供以下字段:
- semantic_query: 用于向量召回的自然语言描述,要求具体且贴合当前查询;
- semantic_query: 用于向量召回的自然语言描述,要求具体且贴合当前查询和上下文
- memory_types: 建议检索的记忆类型列表,取值范围来自 MemoryType 枚举 (personal_fact,event,preference,opinion,relationship,emotion,knowledge,skill,goal,experience,contextual)
- subject_includes: 建议出现在记忆主语中的人物或角色;
- object_includes: 建议关注的对象、主题或关键信息;
- required_keywords: 建议必须包含的关键词(从上下文中提取);
- recency: 推荐的时间偏好,可选 recent/any/historical
- limit: 推荐的最大返回数量 (1-15)
- notes: 额外补充说明(可选)
- emphasis: 检索重点,可选 balanced/contextual/recent/comprehensive
请不要生成谓语字段,也不要额外补充其它参数。
当前查询: "{query_text}"
已知的对话参与者: {participant_preview}
机器人设定: {persona}
## 当前查询:
"{query_text}"
## 已知对话参与者:
{participant_preview}
## 机器人设定:
{persona}{context_section}
## 🎯 指导原则:
1. **上下文关联**: 优先分析与当前查询相关的未读消息内容和关键词
2. **语义理解**: 结合上下文理解查询的真实意图,而非字面意思
3. **参与者感知**: 考虑未读消息中的参与者,检索与他们相关的记忆
4. **主题延续**: 关注未读消息中讨论的主题,检索相关的历史记忆
5. **时间相关性**: 如果未读消息讨论最近的事件,偏向检索相关时期的记忆
请直接输出符合要求的 JSON 对象,禁止添加额外文本或 Markdown 代码块。
"""

View File

@@ -380,11 +380,11 @@ class MemorySystem:
self.status = original_status
return []
# 2. 构建记忆块
# 2. 构建记忆块(所有记忆统一使用 global 作用域,实现完全共享)
memory_chunks = await self.memory_builder.build_memories(
conversation_text,
normalized_context,
GLOBAL_MEMORY_SCOPE,
GLOBAL_MEMORY_SCOPE, # 强制使用 global不区分用户
timestamp or time.time()
)
@@ -609,7 +609,7 @@ class MemorySystem:
limit: int = 5,
**kwargs
) -> List[MemoryChunk]:
"""检索相关记忆(简化版,使用统一存储"""
"""检索相关记忆(三阶段召回:元数据粗筛 → 向量精筛 → 综合重排"""
raw_query = query_text or kwargs.get("query")
if not raw_query:
raise ValueError("query_text 或 query 参数不能为空")
@@ -619,6 +619,8 @@ class MemorySystem:
return []
context = context or {}
# 所有记忆完全共享,统一使用 global 作用域,不区分用户
resolved_user_id = GLOBAL_MEMORY_SCOPE
self.status = MemorySystemStatus.RETRIEVING
@@ -626,48 +628,152 @@ class MemorySystem:
try:
normalized_context = self._normalize_context(context, GLOBAL_MEMORY_SCOPE, None)
effective_limit = limit or self.config.final_recall_limit
# 构建过滤器
filters = {
"user_id": resolved_user_id
effective_limit = self.config.final_recall_limit
# === 阶段一:元数据粗筛(软性过滤) ===
coarse_filters = {
"user_id": GLOBAL_MEMORY_SCOPE, # 必选:确保作用域正确
}
# 应用查询规划结果
# 应用查询规划(优化查询语句并构建元数据过滤)
optimized_query = raw_query
metadata_filters = {}
if self.query_planner:
try:
query_plan = await self.query_planner.plan_query(raw_query, normalized_context)
if getattr(query_plan, "memory_types", None):
filters["memory_types"] = [mt.value for mt in query_plan.memory_types]
if getattr(query_plan, "subject_includes", None):
filters["keywords"] = query_plan.subject_includes
# 构建包含未读消息的增强上下文
enhanced_context = await self._build_enhanced_query_context(raw_query, normalized_context)
query_plan = await self.query_planner.plan_query(raw_query, enhanced_context)
# 使用LLM优化后的查询语句更精确的语义表达
if getattr(query_plan, "semantic_query", None):
raw_query = query_plan.semantic_query
optimized_query = query_plan.semantic_query
# 构建JSON元数据过滤条件用于阶段一粗筛
# 将查询规划的结果转换为元数据过滤条件
if getattr(query_plan, "memory_types", None):
metadata_filters['memory_types'] = [mt.value for mt in query_plan.memory_types]
if getattr(query_plan, "subject_includes", None):
metadata_filters['subjects'] = query_plan.subject_includes
if getattr(query_plan, "required_keywords", None):
metadata_filters['keywords'] = query_plan.required_keywords
# 时间范围过滤
recency = getattr(query_plan, "recency_preference", "any")
current_time = time.time()
if recency == "recent":
# 最近7天
metadata_filters['created_after'] = current_time - (7 * 24 * 3600)
elif recency == "historical":
# 30天以前
metadata_filters['created_before'] = current_time - (30 * 24 * 3600)
# 添加用户ID到元数据过滤
metadata_filters['user_id'] = GLOBAL_MEMORY_SCOPE
logger.debug(f"[阶段一] 查询优化: '{raw_query}''{optimized_query}'")
logger.debug(f"[阶段一] 元数据过滤条件: {metadata_filters}")
except Exception as plan_exc:
logger.warning("查询规划失败,使用默认检索策略: %s", plan_exc, exc_info=True)
logger.warning("查询规划失败,使用原始查询: %s", plan_exc, exc_info=True)
# 即使查询规划失败也保留基本的user_id过滤
metadata_filters = {'user_id': GLOBAL_MEMORY_SCOPE}
# 使用Vector DB存储搜索
# === 阶段二:向量精筛 ===
coarse_limit = self.config.coarse_recall_limit # 粗筛阶段返回更多候选
logger.debug(f"[阶段二] 开始向量搜索: query='{optimized_query[:60]}...', limit={coarse_limit}")
search_results = await self.unified_storage.search_similar_memories(
query_text=raw_query,
limit=effective_limit,
filters=filters
query_text=optimized_query,
limit=coarse_limit,
filters=coarse_filters, # ChromaDB where条件保留兼容
metadata_filters=metadata_filters # JSON元数据索引过滤
)
logger.info(f"[阶段二] 向量搜索完成: 返回 {len(search_results)} 条候选")
# 转换为记忆对象 - search_results 返回 List[Tuple[MemoryChunk, float]]
final_memories = []
for memory, similarity_score in search_results:
# === 阶段三:综合重排 ===
scored_memories = []
current_time = time.time()
for memory, vector_similarity in search_results:
# 1. 向量相似度得分(已归一化到 0-1
vector_score = vector_similarity
# 2. 时效性得分指数衰减30天半衰期
age_seconds = current_time - memory.metadata.created_at
age_days = age_seconds / (24 * 3600)
# 使用 math.exp 而非 np.exp避免依赖numpy
import math
recency_score = math.exp(-age_days / 30)
# 3. 重要性得分(枚举值转换为归一化得分 0-1
# ImportanceLevel: LOW=1, NORMAL=2, HIGH=3, CRITICAL=4
importance_enum = memory.metadata.importance
if hasattr(importance_enum, 'value'):
# 枚举类型转换为0-1范围(value - 1) / 3
importance_score = (importance_enum.value - 1) / 3.0
else:
# 如果已经是数值,直接使用
importance_score = float(importance_enum) if importance_enum else 0.5
# 4. 访问频率得分归一化访问10次以上得满分
access_count = memory.metadata.access_count
frequency_score = min(access_count / 10.0, 1.0)
# 综合得分(加权平均)
final_score = (
self.config.vector_weight * vector_score +
self.config.recency_weight * recency_score +
self.config.context_weight * importance_score +
0.1 * frequency_score # 访问频率权重固定10%
)
scored_memories.append((memory, final_score, {
"vector": vector_score,
"recency": recency_score,
"importance": importance_score,
"frequency": frequency_score,
"final": final_score
}))
# 更新访问记录
memory.update_access()
final_memories.append(memory)
# 按综合得分排序
scored_memories.sort(key=lambda x: x[1], reverse=True)
# 返回 Top-K
final_memories = [mem for mem, score, details in scored_memories[:effective_limit]]
retrieval_time = time.time() - start_time
# 详细日志
if scored_memories:
logger.info(f"[阶段三] 综合重排完成: Top 3 得分详情")
for i, (mem, score, details) in enumerate(scored_memories[:3], 1):
try:
summary = mem.content[:60] if hasattr(mem, 'content') and mem.content else ""
except:
summary = ""
logger.info(
f" #{i} | final={details['final']:.3f} "
f"(vec={details['vector']:.3f}, rec={details['recency']:.3f}, "
f"imp={details['importance']:.3f}, freq={details['frequency']:.3f}) "
f"| {summary}"
)
logger.info(
"简化记忆检索完成"
"三阶段记忆检索完成"
f" | user={resolved_user_id}"
f" | count={len(final_memories)}"
f" | 粗筛={len(search_results)}"
f" | 精筛={len(scored_memories)}"
f" | 返回={len(final_memories)}"
f" | duration={retrieval_time:.3f}s"
f" | query='{raw_query}'"
f" | query='{optimized_query[:60]}...'"
)
self.last_retrieval_time = time.time()
@@ -717,8 +823,8 @@ class MemorySystem:
except Exception:
context = dict(raw_context or {})
# 基础字段(统一使用全局作用域
context["user_id"] = GLOBAL_MEMORY_SCOPE
# 基础字段:强制使用传入的 user_id 参数(已统一为 GLOBAL_MEMORY_SCOPE
context["user_id"] = user_id or GLOBAL_MEMORY_SCOPE
context["timestamp"] = context.get("timestamp") or timestamp or time.time()
context["message_type"] = context.get("message_type") or "normal"
context["platform"] = context.get("platform") or context.get("source_platform") or "unknown"
@@ -758,6 +864,150 @@ class MemorySystem:
return context
async def _build_enhanced_query_context(self, raw_query: str, normalized_context: Dict[str, Any]) -> Dict[str, Any]:
"""构建包含未读消息综合上下文的增强查询上下文
Args:
raw_query: 原始查询文本
normalized_context: 标准化后的基础上下文
Returns:
Dict[str, Any]: 包含未读消息综合信息的增强上下文
"""
enhanced_context = dict(normalized_context) # 复制基础上下文
try:
# 获取stream_id以查找未读消息
stream_id = normalized_context.get("stream_id")
if not stream_id:
logger.debug("未找到stream_id使用基础上下文进行查询规划")
return enhanced_context
# 获取未读消息作为上下文
unread_messages_summary = await self._collect_unread_messages_context(stream_id)
if unread_messages_summary:
enhanced_context["unread_messages_context"] = unread_messages_summary
enhanced_context["has_unread_context"] = True
logger.debug(f"为查询规划构建了增强上下文,包含 {len(unread_messages_summary.get('messages', []))} 条未读消息")
else:
enhanced_context["has_unread_context"] = False
logger.debug("未找到未读消息,使用基础上下文进行查询规划")
except Exception as e:
logger.warning(f"构建增强查询上下文失败: {e}", exc_info=True)
enhanced_context["has_unread_context"] = False
return enhanced_context
async def _collect_unread_messages_context(self, stream_id: str) -> Optional[Dict[str, Any]]:
"""收集未读消息的综合上下文信息
Args:
stream_id: 流ID
Returns:
Optional[Dict[str, Any]]: 未读消息的综合信息,包含消息列表、关键词、主题等
"""
try:
from src.chat.message_receive.chat_stream import get_chat_manager
chat_manager = get_chat_manager()
chat_stream = chat_manager.get_stream(stream_id)
if not chat_stream or not hasattr(chat_stream, "context_manager"):
logger.debug(f"未找到stream_id={stream_id}的聊天流或上下文管理器")
return None
# 获取未读消息
context_manager = chat_stream.context_manager
unread_messages = context_manager.get_unread_messages()
if not unread_messages:
logger.debug(f"stream_id={stream_id}没有未读消息")
return None
# 构建未读消息摘要
messages_summary = []
all_keywords = set()
participant_names = set()
for msg in unread_messages[:10]: # 限制处理最近10条未读消息
try:
# 提取消息内容
content = (getattr(msg, "processed_plain_text", None) or
getattr(msg, "display_message", None) or "")
if not content:
continue
# 提取发送者信息
sender_name = "未知用户"
if hasattr(msg, "user_info") and msg.user_info:
sender_name = (getattr(msg.user_info, "user_nickname", None) or
getattr(msg.user_info, "user_cardname", None) or
getattr(msg.user_info, "user_id", None) or "未知用户")
participant_names.add(sender_name)
# 添加到消息摘要
messages_summary.append({
"sender": sender_name,
"content": content[:200], # 限制长度避免过长
"timestamp": getattr(msg, "time", None)
})
# 提取关键词(简单实现)
content_lower = content.lower()
# 这里可以添加更复杂的关键词提取逻辑
words = [w.strip() for w in content_lower.split() if len(w.strip()) > 1]
all_keywords.update(words[:5]) # 每条消息最多取5个词
except Exception as msg_e:
logger.debug(f"处理未读消息时出错: {msg_e}")
continue
if not messages_summary:
return None
# 构建综合上下文信息
unread_context = {
"messages": messages_summary,
"total_count": len(unread_messages),
"processed_count": len(messages_summary),
"keywords": list(all_keywords)[:20], # 最多20个关键词
"participants": list(participant_names),
"context_summary": self._build_unread_context_summary(messages_summary)
}
logger.debug(f"收集到未读消息上下文: {len(messages_summary)}条消息,{len(all_keywords)}个关键词,{len(participant_names)}个参与者")
return unread_context
except Exception as e:
logger.warning(f"收集未读消息上下文失败: {e}", exc_info=True)
return None
def _build_unread_context_summary(self, messages_summary: List[Dict[str, Any]]) -> str:
"""构建未读消息的文本摘要
Args:
messages_summary: 未读消息摘要列表
Returns:
str: 未读消息的文本摘要
"""
if not messages_summary:
return ""
summary_parts = []
for msg_info in messages_summary:
sender = msg_info.get("sender", "未知")
content = msg_info.get("content", "")
if content:
summary_parts.append(f"{sender}: {content}")
return " | ".join(summary_parts)
async def _resolve_conversation_context(self, fallback_text: str, context: Optional[Dict[str, Any]]) -> str:
"""使用 stream_id 历史消息和相关记忆充实对话文本,默认回退到传入文本"""
if not context:

View File

@@ -24,11 +24,63 @@ import numpy as np
from src.common.logger import get_logger
from src.common.vector_db import vector_db_service
from src.chat.utils.utils import get_embedding
from src.chat.memory_system.memory_chunk import MemoryChunk
from src.chat.memory_system.memory_chunk import MemoryChunk, ConfidenceLevel, ImportanceLevel
from src.chat.memory_system.memory_forgetting_engine import MemoryForgettingEngine
from src.chat.memory_system.memory_metadata_index import MemoryMetadataIndex, MemoryMetadataIndexEntry
logger = get_logger(__name__)
# 全局枚举映射表缓存
_ENUM_MAPPINGS_CACHE = {}
def _build_enum_mapping(enum_class: type) -> Dict[str, Any]:
"""构建枚举类的完整映射表
Args:
enum_class: 枚举类
Returns:
Dict[str, Any]: 包含各种映射格式的字典
"""
cache_key = f"{enum_class.__module__}.{enum_class.__name__}"
# 如果已经缓存过,直接返回
if cache_key in _ENUM_MAPPINGS_CACHE:
return _ENUM_MAPPINGS_CACHE[cache_key]
mapping = {
"name_to_enum": {}, # 枚举名称 -> 枚举实例 (HIGH -> ImportanceLevel.HIGH)
"value_to_enum": {}, # 整数值 -> 枚举实例 (3 -> ImportanceLevel.HIGH)
"value_str_to_enum": {}, # 字符串value -> 枚举实例 ("3" -> ImportanceLevel.HIGH)
"enum_value_to_name": {}, # 枚举实例 -> 名称映射 (反向)
"all_possible_strings": set(), # 所有可能的字符串表示
}
for member in enum_class:
# 名称映射 (支持大小写)
mapping["name_to_enum"][member.name] = member
mapping["name_to_enum"][member.name.lower()] = member
mapping["name_to_enum"][member.name.upper()] = member
# 值映射
mapping["value_to_enum"][member.value] = member
mapping["value_str_to_enum"][str(member.value)] = member
# 反向映射
mapping["enum_value_to_name"][member] = member.name
# 收集所有可能的字符串表示
mapping["all_possible_strings"].add(member.name)
mapping["all_possible_strings"].add(member.name.lower())
mapping["all_possible_strings"].add(member.name.upper())
mapping["all_possible_strings"].add(str(member.value))
# 缓存结果
_ENUM_MAPPINGS_CACHE[cache_key] = mapping
logger.debug(f"构建枚举映射表: {enum_class.__name__} -> {len(mapping['name_to_enum'])} 个名称映射, {len(mapping['value_to_enum'])} 个值映射")
return mapping
@dataclass
class VectorStorageConfig:
@@ -38,7 +90,7 @@ class VectorStorageConfig:
metadata_collection: str = "memory_metadata_v2"
# 检索配置
similarity_threshold: float = 0.8
similarity_threshold: float = 0.5 # 降低阈值以提高召回率0.5-0.6 是合理范围)
search_limit: int = 20
batch_size: int = 100
@@ -50,6 +102,26 @@ class VectorStorageConfig:
# 遗忘配置
enable_forgetting: bool = True
retention_hours: int = 24 * 30 # 30天
@classmethod
def from_global_config(cls):
"""从全局配置创建实例"""
from src.config.config import global_config
memory_cfg = global_config.memory
return cls(
memory_collection=getattr(memory_cfg, 'vector_db_memory_collection', 'unified_memory_v2'),
metadata_collection=getattr(memory_cfg, 'vector_db_metadata_collection', 'memory_metadata_v2'),
similarity_threshold=getattr(memory_cfg, 'vector_db_similarity_threshold', 0.5),
search_limit=getattr(memory_cfg, 'vector_db_search_limit', 20),
batch_size=getattr(memory_cfg, 'vector_db_batch_size', 100),
enable_caching=getattr(memory_cfg, 'vector_db_enable_caching', True),
cache_size_limit=getattr(memory_cfg, 'vector_db_cache_size_limit', 1000),
auto_cleanup_interval=getattr(memory_cfg, 'vector_db_auto_cleanup_interval', 3600),
enable_forgetting=getattr(memory_cfg, 'enable_memory_forgetting', True),
retention_hours=getattr(memory_cfg, 'vector_db_retention_hours', 720),
)
class VectorMemoryStorage:
@@ -71,11 +143,29 @@ class VectorMemoryStorage:
"""基于Vector DB的记忆存储系统"""
def __init__(self, config: Optional[VectorStorageConfig] = None):
self.config = config or VectorStorageConfig()
# 默认从全局配置读取如果没有传入config
if config is None:
try:
self.config = VectorStorageConfig.from_global_config()
logger.info("✅ Vector存储配置已从全局配置加载")
except Exception as e:
logger.warning(f"从全局配置加载失败,使用默认配置: {e}")
self.config = VectorStorageConfig()
else:
self.config = config
# 从配置中获取批处理大小和集合名称
self.batch_size = self.config.batch_size
self.collection_name = self.config.memory_collection
self.vector_db_service = vector_db_service
# 内存缓存
self.memory_cache: Dict[str, MemoryChunk] = {}
self.cache_timestamps: Dict[str, float] = {}
self._cache = self.memory_cache # 别名,兼容旧代码
# 元数据索引管理器JSON文件索引
self.metadata_index = MemoryMetadataIndex()
# 遗忘引擎
self.forgetting_engine: Optional[MemoryForgettingEngine] = None
@@ -180,29 +270,59 @@ class VectorMemoryStorage:
except Exception as e:
logger.error(f"自动清理失败: {e}")
def _memory_to_vector_format(self, memory: MemoryChunk) -> Tuple[Dict[str, Any], str]:
"""将MemoryChunk转换为Vector DB格式"""
# 选择用于向量化的文本
content = getattr(memory, 'display', None) or getattr(memory, 'text_content', None) or ""
def _memory_to_vector_format(self, memory: MemoryChunk) -> Dict[str, Any]:
"""将MemoryChunk转换为向量存储格式"""
try:
# 获取memory_id
memory_id = getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', None)
# 生成向量表示的文本
display_text = getattr(memory, 'display', None) or getattr(memory, 'text_content', None) or str(memory.content)
if not display_text.strip():
logger.warning(f"记忆 {memory_id} 缺少有效的显示文本")
display_text = f"{memory.memory_type.value}: {', '.join(memory.subjects)}"
# 构建元数据全部从memory.metadata获取
meta = getattr(memory, 'metadata', None)
metadata = {
"user_id": getattr(meta, 'user_id', None),
"chat_id": getattr(meta, 'chat_id', 'unknown'),
"memory_type": memory.memory_type.value,
"keywords": orjson.dumps(getattr(memory, 'keywords', [])).decode("utf-8"),
"importance": getattr(meta, 'importance', None),
"timestamp": getattr(meta, 'created_at', None),
"access_count": getattr(meta, 'access_count', 0),
"last_access_time": getattr(meta, 'last_accessed', 0),
"confidence": getattr(meta, 'confidence', None),
"source": "vector_storage_v2",
# 存储完整的记忆数据
"memory_data": orjson.dumps(memory.to_dict()).decode("utf-8")
}
# 构建元数据 - 修复枚举值和列表序列化
metadata = {
"memory_id": memory_id,
"user_id": memory.metadata.user_id or "unknown",
"memory_type": memory.memory_type.value,
"importance": memory.metadata.importance.name, # 使用 .name 而不是枚举对象
"confidence": memory.metadata.confidence.name, # 使用 .name 而不是枚举对象
"created_at": memory.metadata.created_at,
"last_accessed": memory.metadata.last_accessed or memory.metadata.created_at,
"access_count": memory.metadata.access_count,
"subjects": orjson.dumps(memory.subjects).decode("utf-8"), # 列表转JSON字符串
"keywords": orjson.dumps(memory.keywords).decode("utf-8"), # 列表转JSON字符串
"tags": orjson.dumps(memory.tags).decode("utf-8"), # 列表转JSON字符串
"categories": orjson.dumps(memory.categories).decode("utf-8"), # 列表转JSON字符串
"relevance_score": memory.metadata.relevance_score
}
return metadata, content
# 添加可选字段
if memory.metadata.source_context:
metadata["source_context"] = str(memory.metadata.source_context)
if memory.content.predicate:
metadata["predicate"] = memory.content.predicate
if memory.content.object:
if isinstance(memory.content.object, (dict, list)):
metadata["object"] = orjson.dumps(memory.content.object).decode()
else:
metadata["object"] = str(memory.content.object)
return {
"id": memory_id,
"embedding": None, # 将由vector_db_service生成
"metadata": metadata,
"document": display_text
}
except Exception as e:
memory_id = getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', 'unknown')
logger.error(f"转换记忆 {memory_id} 到向量格式失败: {e}", exc_info=True)
raise
def _vector_result_to_memory(self, document: str, metadata: Dict[str, Any]) -> Optional[MemoryChunk]:
"""将Vector DB结果转换为MemoryChunk"""
@@ -212,27 +332,108 @@ class VectorMemoryStorage:
memory_dict = orjson.loads(metadata["memory_data"])
return MemoryChunk.from_dict(memory_dict)
# 兜底:从基础字段重建
# 兜底:从基础字段重建(使用新的结构化格式)
logger.warning(f"未找到memory_data使用兜底逻辑重建记忆 (id={metadata.get('memory_id', 'unknown')})")
# 构建符合MemoryChunk.from_dict期望的结构
memory_dict = {
"memory_id": metadata.get("memory_id", f"recovered_{int(time.time())}"),
"user_id": metadata.get("user_id", "unknown"),
"text_content": document,
"display": document,
"memory_type": metadata.get("memory_type", "general"),
"keywords": orjson.loads(metadata.get("keywords", "[]")),
"importance": metadata.get("importance", 0.5),
"timestamp": metadata.get("timestamp", time.time()),
"access_count": metadata.get("access_count", 0),
"last_access_time": metadata.get("last_access_time", 0),
"confidence": metadata.get("confidence", 0.8),
"metadata": {}
"metadata": {
"memory_id": metadata.get("memory_id", f"recovered_{int(time.time())}"),
"user_id": metadata.get("user_id", "unknown"),
"created_at": metadata.get("timestamp", time.time()),
"last_accessed": metadata.get("last_access_time", time.time()),
"last_modified": metadata.get("timestamp", time.time()),
"access_count": metadata.get("access_count", 0),
"relevance_score": 0.0,
"confidence": self._parse_enum_value(metadata.get("confidence", 2), ConfidenceLevel, ConfidenceLevel.MEDIUM),
"importance": self._parse_enum_value(metadata.get("importance", 2), ImportanceLevel, ImportanceLevel.NORMAL),
"source_context": None,
},
"content": {
"subject": "",
"predicate": "",
"object": "",
"display": document # 使用document作为显示文本
},
"memory_type": metadata.get("memory_type", "contextual"),
"keywords": orjson.loads(metadata.get("keywords", "[]")) if isinstance(metadata.get("keywords"), str) else metadata.get("keywords", []),
"tags": [],
"categories": [],
"embedding": None,
"semantic_hash": None,
"related_memories": [],
"temporal_context": None
}
return MemoryChunk.from_dict(memory_dict)
except Exception as e:
logger.warning(f"转换Vector结果到MemoryChunk失败: {e}")
logger.error(f"转换Vector结果到MemoryChunk失败: {e}", exc_info=True)
return None
def _parse_enum_value(self, value: Any, enum_class: type, default: Any) -> Any:
"""解析枚举值,支持字符串、整数和枚举实例
Args:
value: 要解析的值(可能是字符串、整数或枚举实例)
enum_class: 目标枚举类
default: 默认值
Returns:
解析后的枚举实例
"""
if value is None:
return default
# 如果已经是枚举实例,直接返回
if isinstance(value, enum_class):
return value
# 如果是整数尝试按value值匹配
if isinstance(value, int):
try:
for member in enum_class:
if member.value == value:
return member
# 如果没找到匹配的,返回默认值
logger.warning(f"无法找到{enum_class.__name__}中value={value}的枚举项,使用默认值")
return default
except Exception as e:
logger.warning(f"解析{enum_class.__name__}整数值{value}时出错: {e},使用默认值")
return default
# 如果是字符串尝试按名称或value值匹配
if isinstance(value, str):
str_value = value.strip().upper()
# 先尝试按枚举名称匹配
try:
if hasattr(enum_class, str_value):
return getattr(enum_class, str_value)
except AttributeError:
pass
# 再尝试按value值匹配如果value是字符串形式的数字
try:
int_value = int(str_value)
return self._parse_enum_value(int_value, enum_class, default)
except ValueError:
pass
# 最后尝试按小写名称匹配
try:
for member in enum_class:
if member.value.upper() == str_value:
return member
logger.warning(f"无法找到{enum_class.__name__}中名称或value为'{value}'的枚举项,使用默认值")
return default
except Exception as e:
logger.warning(f"解析{enum_class.__name__}字符串值'{value}'时出错: {e},使用默认值")
return default
# 其他类型,返回默认值
logger.warning(f"不支持的{enum_class.__name__}值类型: {type(value)},使用默认值")
return default
def _get_from_cache(self, memory_id: str) -> Optional[MemoryChunk]:
"""从缓存获取记忆"""
@@ -262,70 +463,124 @@ class VectorMemoryStorage:
self.memory_cache.pop(oldest_id, None)
self.cache_timestamps.pop(oldest_id, None)
self.memory_cache[memory.memory_id] = memory
self.cache_timestamps[memory.memory_id] = time.time()
memory_id = getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', None)
if memory_id:
self.memory_cache[memory_id] = memory
self.cache_timestamps[memory_id] = time.time()
async def store_memories(self, memories: List[MemoryChunk]) -> int:
"""批量存储记忆"""
if not memories:
return 0
start_time = datetime.now()
success_count = 0
try:
# 准备批量数据
embeddings = []
documents = []
metadatas = []
ids = []
# 转换为向量格式
vector_data_list = []
for memory in memories:
try:
# 转换格式
metadata, content = self._memory_to_vector_format(memory)
if not content.strip():
logger.warning(f"记忆 {memory.memory_id} 内容为空,跳过")
continue
# 生成向量
embedding = await get_embedding(content)
if not embedding:
logger.warning(f"生成向量失败,跳过记忆: {memory.memory_id}")
continue
embeddings.append(embedding)
documents.append(content)
metadatas.append(metadata)
ids.append(memory.memory_id)
# 添加到缓存
self._add_to_cache(memory)
vector_data = self._memory_to_vector_format(memory)
vector_data_list.append(vector_data)
except Exception as e:
logger.error(f"处理记忆 {memory.memory_id} 失败: {e}")
memory_id = getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', 'unknown')
logger.error(f"处理记忆 {memory_id} 失败: {e}")
continue
# 批量插入Vector DB
if embeddings:
vector_db_service.add(
collection_name=self.config.memory_collection,
embeddings=embeddings,
documents=documents,
metadatas=metadatas,
ids=ids
)
if not vector_data_list:
logger.warning("没有有效的记忆数据可存储")
return 0
# 批量存储到向量数据库
for i in range(0, len(vector_data_list), self.batch_size):
batch = vector_data_list[i:i + self.batch_size]
stored_count = len(embeddings)
self.stats["total_stores"] += stored_count
self.stats["total_memories"] += stored_count
logger.info(f"成功存储 {stored_count}/{len(memories)} 条记忆")
return stored_count
try:
# 生成embeddings
embeddings = []
for item in batch:
try:
embedding = await get_embedding(item["document"])
embeddings.append(embedding)
except Exception as e:
logger.error(f"生成embedding失败: {e}")
# 使用零向量作为后备
embeddings.append([0.0] * 768) # 默认维度
# vector_db_service.add 需要embeddings参数
self.vector_db_service.add(
collection_name=self.collection_name,
embeddings=embeddings,
ids=[item["id"] for item in batch],
documents=[item["document"] for item in batch],
metadatas=[item["metadata"] for item in batch]
)
success = True
if success:
# 更新缓存和元数据索引
metadata_entries = []
for item in batch:
memory_id = item["id"]
# 从原始 memories 列表中找到对应的 MemoryChunk
memory = next((m for m in memories if (getattr(m.metadata, 'memory_id', None) or getattr(m, 'memory_id', None)) == memory_id), None)
if memory:
# 更新缓存
self._cache[memory_id] = memory
success_count += 1
# 创建元数据索引条目
try:
index_entry = MemoryMetadataIndexEntry(
memory_id=memory_id,
user_id=memory.metadata.user_id or "unknown",
memory_type=memory.memory_type.value,
subjects=memory.subjects,
objects=[str(memory.content.object)] if memory.content.object else [],
keywords=memory.keywords,
tags=memory.tags,
importance=memory.metadata.importance.value,
confidence=memory.metadata.confidence.value,
created_at=memory.metadata.created_at,
access_count=memory.metadata.access_count,
chat_id=memory.metadata.chat_id,
content_preview=str(memory.content)[:100] if memory.content else None
)
metadata_entries.append(index_entry)
except Exception as e:
logger.warning(f"创建元数据索引条目失败 (memory_id={memory_id}): {e}")
# 批量更新元数据索引
if metadata_entries:
try:
self.metadata_index.batch_add_or_update(metadata_entries)
logger.debug(f"更新元数据索引: {len(metadata_entries)}")
except Exception as e:
logger.error(f"批量更新元数据索引失败: {e}")
else:
logger.warning(f"批次存储失败,跳过 {len(batch)} 条记忆")
except Exception as e:
logger.error(f"批量存储失败: {e}", exc_info=True)
continue
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"成功存储 {success_count}/{len(memories)} 条记忆,耗时 {duration:.2f}")
return 0
# 保存元数据索引到磁盘
if success_count > 0:
try:
self.metadata_index.save()
logger.debug("元数据索引已保存到磁盘")
except Exception as e:
logger.error(f"保存元数据索引失败: {e}")
return success_count
except Exception as e:
logger.error(f"批量存储记忆失败: {e}")
return 0
logger.error(f"批量存储记忆失败: {e}", exc_info=True)
return success_count
async def store_memory(self, memory: MemoryChunk) -> bool:
"""存储单条记忆"""
@@ -337,13 +592,62 @@ class VectorMemoryStorage:
query_text: str,
limit: int = 10,
similarity_threshold: Optional[float] = None,
filters: Optional[Dict[str, Any]] = None
filters: Optional[Dict[str, Any]] = None,
# 新增元数据过滤参数用于JSON索引粗筛
metadata_filters: Optional[Dict[str, Any]] = None
) -> List[Tuple[MemoryChunk, float]]:
"""搜索相似记忆"""
"""
搜索相似记忆(混合索引模式)
Args:
query_text: 查询文本
limit: 返回数量限制
similarity_threshold: 相似度阈值
filters: ChromaDB where条件保留用于兼容
metadata_filters: JSON元数据索引过滤条件支持:
- memory_types: List[str]
- subjects: List[str]
- keywords: List[str]
- tags: List[str]
- importance_min: int
- importance_max: int
- created_after: float
- created_before: float
- user_id: str
"""
if not query_text.strip():
return []
try:
# === 阶段一JSON元数据粗筛可选 ===
candidate_ids: Optional[List[str]] = None
if metadata_filters:
logger.debug(f"[JSON元数据粗筛] 开始,过滤条件: {metadata_filters}")
candidate_ids = self.metadata_index.search(
memory_types=metadata_filters.get('memory_types'),
subjects=metadata_filters.get('subjects'),
keywords=metadata_filters.get('keywords'),
tags=metadata_filters.get('tags'),
importance_min=metadata_filters.get('importance_min'),
importance_max=metadata_filters.get('importance_max'),
created_after=metadata_filters.get('created_after'),
created_before=metadata_filters.get('created_before'),
user_id=metadata_filters.get('user_id'),
limit=self.config.search_limit * 2, # 粗筛返回更多候选
flexible_mode=True # 使用灵活匹配模式
)
logger.info(f"[JSON元数据粗筛] 完成,筛选出 {len(candidate_ids)} 个候选ID")
# 如果粗筛后没有结果,回退到全部记忆搜索
if not candidate_ids:
total_memories = len(self.metadata_index.index)
logger.warning(f"JSON元数据粗筛后无候选启用回退机制在全部 {total_memories} 条记忆中进行向量搜索")
logger.info("💡 提示:这可能是因为查询条件过于严格,或相关记忆的元数据与查询条件不完全匹配")
candidate_ids = None # 设为None表示不限制候选ID
else:
logger.debug(f"[JSON元数据粗筛] 成功筛选出候选,进入向量精筛阶段")
# === 阶段二:向量精筛 ===
# 生成查询向量
query_embedding = await get_embedding(query_text)
if not query_embedding:
@@ -354,7 +658,16 @@ class VectorMemoryStorage:
# 构建where条件
where_conditions = filters or {}
# 如果有候选ID列表添加到where条件
if candidate_ids:
# ChromaDB的where条件需要使用$in操作符
where_conditions["memory_id"] = {"$in": candidate_ids}
logger.debug(f"[向量精筛] 限制在 {len(candidate_ids)} 个候选ID内搜索")
else:
logger.info("[向量精筛] 在全部记忆中搜索(元数据筛选无结果回退)")
# 查询Vector DB
logger.debug(f"[向量精筛] 开始limit={min(limit, self.config.search_limit)}")
results = vector_db_service.query(
collection_name=self.config.memory_collection,
query_embeddings=[query_embedding],
@@ -371,6 +684,7 @@ class VectorMemoryStorage:
metadatas = results.get("metadatas", [[]])[0]
ids = results.get("ids", [[]])[0]
logger.info(f"向量检索返回原始结果documents={len(documents)}, ids={len(ids)}, metadatas={len(metadatas)}")
for i, (doc, metadata, memory_id) in enumerate(zip(documents, metadatas, ids)):
# 计算相似度
distance = distances[i] if i < len(distances) else 1.0
@@ -390,12 +704,19 @@ class VectorMemoryStorage:
if memory:
similar_memories.append((memory, similarity))
# 记录单条结果的关键日志id相似度简短文本
try:
short_text = (str(memory.content)[:120]) if hasattr(memory, 'content') else (doc[:120] if isinstance(doc, str) else '')
except Exception:
short_text = ''
logger.info(f"检索结果 - id={memory_id}, similarity={similarity:.4f}, summary={short_text}")
# 按相似度排序
similar_memories.sort(key=lambda x: x[1], reverse=True)
self.stats["total_searches"] += 1
logger.debug(f"搜索相似记忆: 查询='{query_text[:30]}...', 结果数={len(similar_memories)}")
logger.info(f"搜索相似记忆: query='{query_text[:60]}...', limit={limit}, threshold={threshold}, filters={where_conditions}, 返回数={len(similar_memories)}")
logger.debug(f"搜索相似记忆 详细结果数={len(similar_memories)}")
return similar_memories
@@ -451,6 +772,7 @@ class VectorMemoryStorage:
metadatas = results.get("metadatas", [{}] * len(documents))
ids = results.get("ids", [])
logger.info(f"按过滤条件获取返回: docs={len(documents)}, ids={len(ids)}")
for i, (doc, metadata) in enumerate(zip(documents, metadatas)):
memory_id = ids[i] if i < len(ids) else None
@@ -459,6 +781,7 @@ class VectorMemoryStorage:
memory = self._get_from_cache(memory_id)
if memory:
memories.append(memory)
logger.debug(f"过滤获取命中缓存: id={memory_id}")
continue
# 从Vector结果重建
@@ -467,6 +790,7 @@ class VectorMemoryStorage:
memories.append(memory)
if memory_id:
self._add_to_cache(memory)
logger.debug(f"过滤获取结果: id={memory_id}, meta_keys={list(metadata.keys())}")
return memories
@@ -477,14 +801,20 @@ class VectorMemoryStorage:
async def update_memory(self, memory: MemoryChunk) -> bool:
"""更新记忆"""
try:
memory_id = getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', None)
if not memory_id:
logger.error("无法更新记忆缺少memory_id")
return False
# 先删除旧记忆
await self.delete_memory(memory.memory_id)
await self.delete_memory(memory_id)
# 重新存储更新后的记忆
return await self.store_memory(memory)
except Exception as e:
logger.error(f"更新记忆 {memory.memory_id} 失败: {e}")
memory_id = getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', 'unknown')
logger.error(f"更新记忆 {memory_id} 失败: {e}")
return False
async def delete_memory(self, memory_id: str) -> bool:
@@ -658,7 +988,7 @@ class VectorMemoryStorageAdapter:
query_text, limit, filters=filters
)
# 转换为原格式:(memory_id, similarity)
return [(memory.memory_id, similarity) for memory, similarity in results]
return [(getattr(memory.metadata, 'memory_id', None) or getattr(memory, 'memory_id', 'unknown'), similarity) for memory, similarity in results]
def get_stats(self) -> Dict[str, Any]:
return self.storage.get_storage_stats()

View File

@@ -18,6 +18,7 @@ from src.individuality.individuality import get_individuality
from src.llm_models.utils_model import LLMRequest
from src.chat.message_receive.message import UserInfo, Seg, MessageRecv, MessageSending
from src.chat.message_receive.chat_stream import ChatStream
from src.chat.utils.memory_mappings import get_memory_type_chinese_label
from src.chat.message_receive.uni_message_sender import HeartFCSender
from src.chat.utils.timer_calculator import Timer
from src.chat.utils.utils import get_chat_type_and_target_info
@@ -587,15 +588,25 @@ class DefaultReplyer:
# 转换格式以兼容现有代码
running_memories = []
if enhanced_memories:
for memory_chunk in enhanced_memories:
logger.debug(f"[记忆转换] 收到 {len(enhanced_memories)} 条原始记忆")
for idx, memory_chunk in enumerate(enhanced_memories, 1):
# 获取结构化内容的字符串表示
structure_display = str(memory_chunk.content) if hasattr(memory_chunk, 'content') else "unknown"
# 获取记忆内容优先使用display
content = memory_chunk.display or memory_chunk.text_content or ""
# 调试:记录每条记忆的内容获取情况
logger.debug(f"[记忆转换] 第{idx}条: display={repr(memory_chunk.display)[:80]}, text_content={repr(memory_chunk.text_content)[:80]}, final_content={repr(content)[:80]}")
running_memories.append({
"content": memory_chunk.display or memory_chunk.text_content or "",
"content": content,
"memory_type": memory_chunk.memory_type.value,
"confidence": memory_chunk.metadata.confidence.value,
"importance": memory_chunk.metadata.importance.value,
"relevance": getattr(memory_chunk, 'relevance_score', 0.5),
"relevance": getattr(memory_chunk.metadata, 'relevance_score', 0.5),
"source": memory_chunk.metadata.source,
"structure": memory_chunk.content_structure.value if memory_chunk.content_structure else "unknown",
"structure": structure_display,
})
# 构建瞬时记忆字符串
@@ -604,27 +615,13 @@ class DefaultReplyer:
if top_memory:
instant_memory = top_memory[0].get("content", "")
logger.info(f"增强记忆系统检索到 {len(running_memories)} 条记忆")
logger.info(f"增强记忆系统检索到 {len(enhanced_memories)} 条原始记忆,转换为 {len(running_memories)}可用记忆")
except Exception as e:
logger.warning(f"增强记忆系统检索失败: {e}")
running_memories = []
instant_memory = ""
def _format_confidence_label(value: Optional[float]) -> str:
if value is None:
return "未知"
mapping = {4: "已验证", 3: "", 2: "中等", 1: "较低"}
rounded = int(value)
return mapping.get(rounded, f"{value:.2f}")
def _format_importance_label(value: Optional[float]) -> str:
if value is None:
return "未知"
mapping = {4: "关键", 3: "", 2: "一般", 1: "较低"}
rounded = int(value)
return mapping.get(rounded, f"{value:.2f}")
# 构建记忆字符串,使用方括号格式
memory_str = ""
has_any_memory = False
@@ -640,31 +637,32 @@ class DefaultReplyer:
# 调试相关度信息
relevance_info = [(m.get('memory_type', 'unknown'), m.get('relevance', 0.0)) for m in sorted_memories]
logger.debug(f"记忆相关度信息: {relevance_info}")
logger.debug(f"[记忆构建] 准备将 {len(sorted_memories)} 条记忆添加到提示词")
for running_memory in sorted_memories:
for idx, running_memory in enumerate(sorted_memories, 1):
content = running_memory.get('content', '')
memory_type = running_memory.get('memory_type', 'unknown')
# 跳过空内容
if not content or not content.strip():
logger.warning(f"[记忆构建] 跳过第 {idx} 条记忆:内容为空 (type={memory_type})")
logger.debug(f"[记忆构建] 空记忆详情: {running_memory}")
continue
# 映射记忆类型到中文标签
type_mapping = {
"personal_fact": "个人事实",
"preference": "偏好",
"event": "事件",
"opinion": "观点",
"relationship": "个人事实",
"unknown": "未知"
}
chinese_type = type_mapping.get(memory_type, "未知")
# 使用全局记忆类型映射表
chinese_type = get_memory_type_chinese_label(memory_type)
# 提取纯净内容(如果包含旧格式的元数据)
clean_content = content
if "(类型:" in content and "" in content:
clean_content = content.split("(类型:")[0].strip()
logger.debug(f"[记忆构建] 添加第 {idx} 条记忆: [{chinese_type}] {clean_content[:50]}...")
memory_parts.append(f"- **[{chinese_type}]** {clean_content}")
memory_str = "\n".join(memory_parts) + "\n"
has_any_memory = True
logger.debug(f"[记忆构建] 成功构建记忆字符串,包含 {len(memory_parts) - 2} 条记忆")
# 添加瞬时记忆
if instant_memory:

View File

@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
"""
记忆系统相关的映射表和工具函数
提供记忆类型、置信度、重要性等的中文标签映射
"""
# 记忆类型到中文标签的完整映射表
MEMORY_TYPE_CHINESE_MAPPING = {
"personal_fact": "个人事实",
"preference": "偏好",
"event": "事件",
"opinion": "观点",
"relationship": "人际关系",
"emotion": "情感状态",
"knowledge": "知识信息",
"skill": "技能能力",
"goal": "目标计划",
"experience": "经验教训",
"contextual": "上下文信息",
"unknown": "未知"
}
# 置信度等级到中文标签的映射表
CONFIDENCE_LEVEL_CHINESE_MAPPING = {
1: "低置信度",
2: "中等置信度",
3: "高置信度",
4: "已验证",
"LOW": "低置信度",
"MEDIUM": "中等置信度",
"HIGH": "高置信度",
"VERIFIED": "已验证",
"unknown": "未知"
}
# 重要性等级到中文标签的映射表
IMPORTANCE_LEVEL_CHINESE_MAPPING = {
1: "低重要性",
2: "一般重要性",
3: "高重要性",
4: "关键重要性",
"LOW": "低重要性",
"NORMAL": "一般重要性",
"HIGH": "高重要性",
"CRITICAL": "关键重要性",
"unknown": "未知"
}
def get_memory_type_chinese_label(memory_type: str) -> str:
"""获取记忆类型的中文标签
Args:
memory_type: 记忆类型字符串
Returns:
str: 对应的中文标签,如果找不到则返回"未知"
"""
return MEMORY_TYPE_CHINESE_MAPPING.get(memory_type, "未知")
def get_confidence_level_chinese_label(level) -> str:
"""获取置信度等级的中文标签
Args:
level: 置信度等级(可以是数字、字符串或枚举实例)
Returns:
str: 对应的中文标签,如果找不到则返回"未知"
"""
# 处理枚举实例
if hasattr(level, 'value'):
level = level.value
# 处理数字
if isinstance(level, int):
return CONFIDENCE_LEVEL_CHINESE_MAPPING.get(level, "未知")
# 处理字符串
if isinstance(level, str):
level_upper = level.upper()
return CONFIDENCE_LEVEL_CHINESE_MAPPING.get(level_upper, "未知")
return "未知"
def get_importance_level_chinese_label(level) -> str:
"""获取重要性等级的中文标签
Args:
level: 重要性等级(可以是数字、字符串或枚举实例)
Returns:
str: 对应的中文标签,如果找不到则返回"未知"
"""
# 处理枚举实例
if hasattr(level, 'value'):
level = level.value
# 处理数字
if isinstance(level, int):
return IMPORTANCE_LEVEL_CHINESE_MAPPING.get(level, "未知")
# 处理字符串
if isinstance(level, str):
level_upper = level.upper()
return IMPORTANCE_LEVEL_CHINESE_MAPPING.get(level_upper, "未知")
return "未知"

View File

@@ -124,6 +124,10 @@ async def db_query(
raise ValueError("query_type must be 'get', 'create', 'update', 'delete' or 'count'")
async with get_db_session() as session:
if not session:
logger.error("[SQLAlchemy] 无法获取数据库会话")
return None if single_result else []
if query_type == "get":
query = select(model_class)
@@ -221,7 +225,7 @@ async def db_query(
# 删除记录
affected_rows = 0
for record in records_to_delete:
session.delete(record)
await session.delete(record)
affected_rows += 1
return affected_rows
@@ -274,6 +278,9 @@ async def db_save(
"""
try:
async with get_db_session() as session:
if not session:
logger.error("[SQLAlchemy] 无法获取数据库会话")
return None
# 如果提供了key_field和key_value尝试更新现有记录
if key_field and key_value is not None:
if hasattr(model_class, key_field):

View File

@@ -420,7 +420,9 @@ class Config(ValidatedConfigBase):
default_factory=lambda: CrossContextConfig(), description="跨群聊上下文共享配置"
)
affinity_flow: AffinityFlowConfig = Field(default_factory=lambda: AffinityFlowConfig(), description="亲和流配置")
ProactiveThinking: ProactiveThinkingConfig = Field(default_factory=lambda: AffinityFlowConfig(), description="主动思考配置")
proactive_thinking: ProactiveThinkingConfig = Field(
default_factory=lambda: ProactiveThinkingConfig(), description="主动思考配置"
)
class APIAdapterConfig(ValidatedConfigBase):

View File

@@ -311,11 +311,12 @@ class MemoryConfig(ValidatedConfigBase):
enable_vector_instant_memory: bool = Field(default=True, description="启用基于向量的瞬时记忆")
# Vector DB配置
vector_db_memory_collection: str = Field(default="unified_memory_v2", description="Vector DB集合名称")
vector_db_similarity_threshold: float = Field(default=0.8, description="Vector DB相似度阈值")
vector_db_memory_collection: str = Field(default="unified_memory_v2", description="Vector DB记忆集合名称")
vector_db_metadata_collection: str = Field(default="memory_metadata_v2", description="Vector DB元数据集合名称")
vector_db_similarity_threshold: float = Field(default=0.5, description="Vector DB相似度阈值推荐0.5-0.6,过高会导致检索不到结果)")
vector_db_search_limit: int = Field(default=20, description="Vector DB搜索限制")
vector_db_batch_size: int = Field(default=100, description="批处理大小")
vector_db_enable_caching: bool = Field(default=True, description="启用缓存")
vector_db_enable_caching: bool = Field(default=True, description="启用内存缓存")
vector_db_cache_size_limit: int = Field(default=1000, description="缓存大小限制")
vector_db_auto_cleanup_interval: int = Field(default=3600, description="自动清理间隔(秒)")
vector_db_retention_hours: int = Field(default=720, description="记忆保留时间小时默认30天")

View File

@@ -382,21 +382,19 @@ class BaseAction(ABC):
# 构造命令数据
command_data = {"name": command_name, "args": args or {}}
response = await send_api.adapter_command_to_stream(
action=command_name,
params=args or {},
success = await send_api.command_to_stream(
command=command_data,
stream_id=self.chat_id,
platform=self.platform
storage_message=storage_message,
display_message=display_message,
)
# 根据响应判断成功与否
if response and response.get("status") == "ok":
logger.info(f"{self.log_prefix} 成功执行适配器命令: {command_name}, 响应: {response.get('data')}")
return True
if success:
logger.info(f"{self.log_prefix} 成功发送命令: {command_name}")
else:
error_message = response.get('message', '未知错误')
logger.error(f"{self.log_prefix} 执行适配器命令失败: {command_name}, 错误: {error_message}")
return False
logger.error(f"{self.log_prefix} 发送命令失败: {command_name}")
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送命令时出错: {e}")

View File

@@ -127,7 +127,7 @@ class ChatterActionPlanner:
}
)
logger.debug(
logger.info(
f"消息 {message.message_id} 兴趣度: {message_interest:.3f}, 应回复: {message.should_reply}"
)

View File

@@ -23,7 +23,7 @@ logger = get_logger(__name__)
class ProactiveThinkerPlugin(BasePlugin):
"""一个主动思考的插件,但现在还只是个空壳子"""
plugin_name: str = "proactive_thinker"
enable_plugin: bool = True
enable_plugin: bool = False
dependencies: list[str] = []
python_dependencies: list[str] = []
config_file_name: str = "config.toml"

View File

@@ -1,23 +1,220 @@
import asyncio
import random
import time
from datetime import datetime
from typing import List, Union, Type, Optional
from maim_message import UserInfo
from src.chat.message_receive.chat_stream import get_chat_manager
from src.common.logger import get_logger
from src.config.config import global_config
from src.manager.async_task_manager import async_task_manager, AsyncTask
from src.plugin_system import EventType, BaseEventHandler
from src.plugin_system.apis import chat_api, person_api
from src.plugin_system.base.base_event import HandlerResult
from .proactive_thinker_executor import ProactiveThinkerExecutor
logger = get_logger(__name__)
from src.plugin_system import (
EventType,
BaseEventHandler,
HandlerResult,
)
class ColdStartTask(AsyncTask):
"""
冷启动任务,专门用于处理那些在白名单里,但从未与机器人发生过交互的用户。
它的核心职责是“破冰”,主动创建聊天流并发起第一次问候。
"""
def __init__(self):
super().__init__(task_name="ColdStartTask")
self.chat_manager = get_chat_manager()
self.executor = ProactiveThinkerExecutor()
async def run(self):
"""任务主循环,周期性地检查是否有需要“破冰”的新用户。"""
logger.info("冷启动任务已启动,将周期性检查白名单中的新朋友。")
# 初始等待一段时间,确保其他服务(如数据库)完全启动
await asyncio.sleep(20)
while True:
try:
logger.info("【冷启动】开始扫描白名单,寻找从未聊过的用户...")
# 从全局配置中获取私聊白名单
enabled_private_chats = global_config.proactive_thinking.enabled_private_chats
if not enabled_private_chats:
logger.debug("【冷启动】私聊白名单为空,任务暂停一小时。")
await asyncio.sleep(3600) # 白名单为空时,没必要频繁检查
continue
# 遍历白名单中的每一个用户
for chat_id in enabled_private_chats:
try:
platform, user_id_str = chat_id.split(":")
user_id = int(user_id_str)
# 【核心逻辑】使用 chat_api 检查该用户是否已经存在聊天流ChatStream
# 如果返回了 ChatStream 对象,说明已经聊过天了,不是本次任务的目标
if chat_api.get_stream_by_user_id(user_id_str, platform):
continue # 跳过已存在的用户
logger.info(f"【冷启动】发现白名单新用户 {chat_id},准备发起第一次问候。")
# 【增强体验】尝试从关系数据库中获取该用户的昵称
# 这样打招呼时可以更亲切而不是只知道一个冷冰冰的ID
person_id = person_api.get_person_id(platform, user_id)
nickname = await person_api.get_person_value(person_id, "nickname")
# 如果数据库里有昵称,就用数据库里的;如果没有,就用 "用户+ID" 作为备用
user_nickname = nickname or f"用户{user_id}"
# 创建 UserInfo 对象,这是创建聊天流的必要信息
user_info = UserInfo(platform=platform, user_id=str(user_id), user_nickname=user_nickname)
# 【关键步骤】主动创建聊天流。
# 创建后,该用户就进入了机器人的“好友列表”,后续将由 ProactiveThinkingTask 接管
stream = await self.chat_manager.get_or_create_stream(platform, user_info)
await self.executor.execute(stream_id=stream.stream_id, start_mode="cold_start")
logger.info(f"【冷启动】已为新用户 {chat_id} (昵称: {user_nickname}) 创建聊天流并发送问候。")
except ValueError:
logger.warning(f"【冷启动】白名单条目格式错误或用户ID无效已跳过: {chat_id}")
except Exception as e:
logger.error(f"【冷启动】处理用户 {chat_id} 时发生未知错误: {e}", exc_info=True)
# 完成一轮检查后,进入长时休眠
await asyncio.sleep(3600)
except asyncio.CancelledError:
logger.info("冷启动任务被正常取消。")
break
except Exception as e:
logger.error(f"【冷启动】任务出现严重错误将在5分钟后重试: {e}", exc_info=True)
await asyncio.sleep(300)
class ProactiveThinkingTask(AsyncTask):
"""
主动思考的后台任务(日常唤醒),负责在聊天“冷却”后重新活跃气氛。
它只处理已经存在的聊天流。
"""
def __init__(self):
super().__init__(task_name="ProactiveThinkingTask")
self.chat_manager = get_chat_manager()
self.executor = ProactiveThinkerExecutor()
def _get_next_interval(self) -> float:
"""
动态计算下一次执行的时间间隔,模拟人类行为的随机性。
结合了基础间隔、随机偏移和每日不同时段的活跃度调整。
"""
# 从配置中读取基础间隔和随机范围
base_interval = global_config.proactive_thinking.interval
sigma = global_config.proactive_thinking.interval_sigma
# 1. 在 [base - sigma, base + sigma] 范围内随机取一个值
interval = random.uniform(base_interval - sigma, base_interval + sigma)
# 2. 根据当前时间,应用活跃度调整因子
now = datetime.now()
current_time_str = now.strftime("%H:%M")
adjust_rules = global_config.proactive_thinking.talk_frequency_adjust
if adjust_rules and adjust_rules[0]:
# 按时间对规则排序,确保能找到正确的时间段
rules = sorted([rule.split(",") for rule in adjust_rules[0][1:]], key=lambda x: x[0])
factor = 1.0
# 找到最后一个小于等于当前时间的规则
for time_str, factor_str in rules:
if current_time_str >= time_str:
factor = float(factor_str)
else:
break # 后面的时间都比当前晚,无需再找
# factor > 1 表示更活跃,所以用除法来缩短间隔
interval /= factor
# 保证最小间隔,防止过于频繁的骚扰
return max(60.0, interval)
async def run(self):
"""任务主循环,周期性地检查所有已存在的聊天是否需要“唤醒”。"""
logger.info("日常唤醒任务已启动,将根据动态间隔检查聊天活跃度。")
await asyncio.sleep(15) # 初始等待
while True:
# 计算下一次检查前的休眠时间
next_interval = self._get_next_interval()
try:
logger.debug(f"【日常唤醒】下一次检查将在 {next_interval:.2f} 秒后进行。")
await asyncio.sleep(next_interval)
logger.info("【日常唤醒】开始检查不活跃的聊天...")
# 加载白名单配置
enabled_private = set(global_config.proactive_thinking.enabled_private_chats)
enabled_groups = set(global_config.proactive_thinking.enabled_group_chats)
# 获取当前所有聊天流的快照
all_streams = list(self.chat_manager.streams.values())
for stream in all_streams:
# 1. 检查该聊天是否在白名单内(或白名单为空时默认允许)
is_whitelisted = False
if stream.group_info: # 群聊
if not enabled_groups or f"qq:{stream.group_info.group_id}" in enabled_groups:
is_whitelisted = True
else: # 私聊
if not enabled_private or f"qq:{stream.user_info.user_id}" in enabled_private:
is_whitelisted = True
if not is_whitelisted:
continue # 不在白名单内,跳过
# 2. 【核心逻辑】检查聊天冷却时间是否足够长
time_since_last_active = time.time() - stream.last_active_time
if time_since_last_active > next_interval:
logger.info(f"【日常唤醒】聊天流 {stream.stream_id} 已冷却 {time_since_last_active:.2f} 秒,触发主动对话。")
await self.executor.execute(stream_id=stream.stream_id, start_mode="wake_up")
# 【关键步骤】在触发后,立刻更新活跃时间并保存。
# 这可以防止在同一个检查周期内,对同一个目标因为意外的延迟而发送多条消息。
stream.update_active_time()
await self.chat_manager._save_stream(stream)
except asyncio.CancelledError:
logger.info("日常唤醒任务被正常取消。")
break
except Exception as e:
logger.error(f"【日常唤醒】任务出现错误将在60秒后重试: {e}", exc_info=True)
await asyncio.sleep(60)
class ProactiveThinkerEventHandler(BaseEventHandler):
"""主动思考需要的启动时触发的事件处理器"""
"""主动思考插件的启动事件处理器,负责根据配置启动一个或两个后台任务。"""
handler_name: str = "proactive_thinker_on_start"
handler_description: str = "主动思考插件的启动事件处理器"
init_subscribe: List[Union[EventType, str]] = [EventType.ON_START]
async def execute(self, kwargs: dict | None) -> "HandlerResult":
"""执行事件处理"""
logger.info("ProactiveThinkerPlugin on_start event triggered.")
# 返回 (是否执行成功, 是否需要继续处理, 可选的返回消息)
"""在机器人启动时执行,根据配置决定是否启动后台任务。"""
logger.info("检测到插件启动事件,正在初始化【主动思考】插件...")
# 检查总开关
if global_config.proactive_thinking.enable:
# 启动负责“日常唤醒”的核心任务
logger.info("【主动思考】功能已启用,正在启动“日常唤醒”任务...")
proactive_task = ProactiveThinkingTask()
await async_task_manager.add_task(proactive_task)
# 检查“冷启动”功能的独立开关
if global_config.proactive_thinking.enable_cold_start:
logger.info("“冷启动”功能已启用,正在启动“破冰”任务...")
cold_start_task = ColdStartTask()
await async_task_manager.add_task(cold_start_task)
else:
logger.info("【主动思考】功能未启用,所有任务均跳过启动。")
return HandlerResult(success=True, continue_process=True, message=None)

View File

@@ -0,0 +1,284 @@
import orjson
from typing import Optional, Dict, Any
from datetime import datetime
from src.common.logger import get_logger
from src.plugin_system.apis import chat_api, person_api, schedule_api, send_api, llm_api, message_api, generator_api, database_api
from src.config.config import global_config, model_config
from src.person_info.person_info import get_person_info_manager
logger = get_logger(__name__)
class ProactiveThinkerExecutor:
"""
主动思考执行器 V2
- 统一执行入口
- 引入决策模块,判断是否及如何发起对话
- 结合人设、日程、关系信息生成更具情境的对话
"""
def __init__(self):
# 可以在此初始化所需模块例如LLM请求器等
pass
async def execute(self, stream_id: str, start_mode: str = "wake_up"):
"""
统一执行入口
Args:
stream_id: 聊天流ID
start_mode: 启动模式, 'cold_start''wake_up'
"""
logger.info(f"开始为聊天流 {stream_id} 执行主动思考,模式: {start_mode}")
# 1. 信息收集
context = await self._gather_context(stream_id)
if not context:
return
# 2. 决策阶段
decision_result = await self._make_decision(context, start_mode)
if not decision_result or not decision_result.get("should_reply"):
reason = decision_result.get("reason", "未提供") if decision_result else "决策过程返回None"
logger.info(f"决策结果为:不回复。原因: {reason}")
await database_api.store_action_info(
chat_stream=self._get_stream_from_id(stream_id),
action_name="proactive_decision",
action_prompt_display=f"主动思考决定不回复,原因: {reason}",
action_done = True,
action_data=decision_result
)
return
# 3. 规划与执行阶段
topic = decision_result.get("topic", "打个招呼")
reason = decision_result.get("reason", "")
await database_api.store_action_info(
chat_stream=self._get_stream_from_id(stream_id),
action_name="proactive_decision",
action_prompt_display=f"主动思考决定回复,原因: {reason},话题:{topic}",
action_done = True,
action_data=decision_result
)
logger.info(f"决策结果为:回复。话题: {topic}")
plan_prompt = self._build_plan_prompt(context, start_mode, topic, reason)
is_success, response, _, _ = await llm_api.generate_with_model(prompt=plan_prompt, model_config=model_config.model_task_config.utils)
if is_success and response:
stream = self._get_stream_from_id(stream_id)
if stream:
# 使用消息分割器处理并发送消息
reply_set = generator_api.process_human_text(response, enable_splitter=True, enable_chinese_typo=False)
for reply_type, content in reply_set:
if reply_type == "text":
await send_api.text_to_stream(stream_id=stream.stream_id, text=content)
else:
logger.warning(f"无法发送消息,因为找不到 stream_id 为 {stream_id} 的聊天流")
def _get_stream_from_id(self, stream_id: str):
"""根据stream_id解析并获取stream对象"""
try:
platform, chat_id, stream_type = stream_id.split(":")
if stream_type == "private":
return chat_api.ChatManager.get_private_stream_by_user_id(platform, chat_id)
elif stream_type == "group":
return chat_api.ChatManager.get_group_stream_by_group_id(platform, chat_id)
except Exception as e:
logger.error(f"解析 stream_id ({stream_id}) 或获取 stream 失败: {e}")
return None
async def _gather_context(self, stream_id: str) -> Optional[Dict[str, Any]]:
"""
收集构建提示词所需的所有上下文信息
"""
stream = self._get_stream_from_id(stream_id)
if not stream:
logger.warning(f"无法找到 stream_id 为 {stream_id} 的聊天流")
return None
user_info = stream.user_info
if not user_info or not user_info.platform or not user_info.user_id:
logger.warning(f"Stream {stream_id} 的 user_info 不完整")
return None
person_id = person_api.get_person_id(user_info.platform, int(user_info.user_id))
person_info_manager = get_person_info_manager()
# 获取日程
schedules = await schedule_api.ScheduleAPI.get_today_schedule()
schedule_context = "\n".join([f"- {s['title']} ({s['start_time']}-{s['end_time']})" for s in schedules]) if schedules else "今天没有日程安排。"
# 获取关系信息
short_impression = await person_info_manager.get_value(person_id, "short_impression") or ""
impression = await person_info_manager.get_value(person_id, "impression") or ""
attitude = await person_info_manager.get_value(person_id, "attitude") or 50
# 获取最近聊天记录
recent_messages = await message_api.get_recent_messages(stream_id, limit=10)
recent_chat_history = await message_api.build_readable_messages_to_str(recent_messages) if recent_messages else ""
# 获取最近的动作历史
action_history = await database_api.db_query(
database_api.MODEL_MAPPING["ActionRecords"],
filters={"chat_id": stream_id, "action_name": "proactive_decision"},
limit=3,
order_by=["-time"]
)
action_history_context = ""
if isinstance(action_history, list):
action_history_context = "\n".join([f"- {a['action_data']}" for a in action_history if isinstance(a, dict)]) or ""
return {
"person_id": person_id,
"user_info": user_info,
"schedule_context": schedule_context,
"recent_chat_history": recent_chat_history,
"action_history_context": action_history_context,
"relationship": {
"short_impression": short_impression,
"impression": impression,
"attitude": attitude
},
"persona": {
"core": global_config.personality.personality_core,
"side": global_config.personality.personality_side,
"identity": global_config.personality.identity,
},
"current_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
async def _make_decision(self, context: Dict[str, Any], start_mode: str) -> Optional[Dict[str, Any]]:
"""
决策模块:判断是否应该主动发起对话,以及聊什么话题
"""
persona = context['persona']
user_info = context['user_info']
relationship = context['relationship']
prompt = f"""
# 角色
你的名字是{global_config.bot.nickname},你的人设如下:
- 核心人设: {persona['core']}
- 侧面人设: {persona['side']}
- 身份: {persona['identity']}
# 任务
现在是 {context['current_time']},你需要根据当前的情境,决定是否要主动向用户 '{user_info.user_nickname}' 发起对话。
# 情境分析
1. **启动模式**: {start_mode} ({'初次见面/很久未见' if start_mode == 'cold_start' else '日常唤醒'})
2. **你的日程**:
{context['schedule_context']}
3. **你和Ta的关系**:
- 简短印象: {relationship['short_impression']}
- 详细印象: {relationship['impression']}
- 好感度: {relationship['attitude']}/100
4. **最近的聊天摘要**:
{context['recent_chat_history']}
# 决策指令
请综合以上所有信息做出决策。你的决策需要以JSON格式输出包含以下字段
- `should_reply`: bool, 是否应该发起对话。
- `topic`: str, 如果 `should_reply` 为 true你打算聊什么话题(例如:问候一下今天的日程、关心一下昨天的某件事、分享一个你自己的趣事等)
- `reason`: str, 做出此决策的简要理由。
---
示例1 (应该回复):
{{
"should_reply": true,
"topic": "提醒Ta今天下午有'项目会议'的日程",
"reason": "现在是上午Ta下午有个重要会议我觉得应该主动提醒一下这会显得我很贴心。"
}}
示例2 (不应回复):
{{
"should_reply": false,
"topic": null,
"reason": "虽然我们的关系不错但现在是深夜而且Ta今天的日程都已经完成了我没有合适的理由去打扰Ta。"
}}
---
请输出你的决策:
"""
is_success, response, _, _ = await llm_api.generate_with_model(prompt=prompt, model_config=model_config.model_task_config.utils)
if not is_success:
return {"should_reply": False, "reason": "决策模型生成失败"}
try:
# 假设LLM返回JSON格式的决策结果
decision = orjson.loads(response)
return decision
except orjson.JSONDecodeError:
logger.error(f"决策LLM返回的JSON格式无效: {response}")
return {"should_reply": False, "reason": "决策模型返回格式错误"}
def _build_plan_prompt(self, context: Dict[str, Any], start_mode: str, topic: str, reason: str) -> str:
"""
根据启动模式和决策话题,构建最终的规划提示词
"""
persona = context['persona']
user_info = context['user_info']
relationship = context['relationship']
if start_mode == "cold_start":
prompt = f"""
# 角色
你的名字是{global_config.bot.nickname},你的人设如下:
- 核心人设: {persona['core']}
- 侧面人设: {persona['side']}
- 身份: {persona['identity']}
# 任务
你需要主动向一个新朋友 '{user_info.user_nickname}' 发起对话。这是你们的第一次交流,或者很久没聊了。
# 决策上下文
- **决策理由**: {reason}
- **你和Ta的关系**:
- 简短印象: {relationship['short_impression']}
- 详细印象: {relationship['impression']}
- 好感度: {relationship['attitude']}/100
# 对话指引
- 你的目标是“破冰”,让对话自然地开始。
- 你应该围绕这个话题展开: {topic}
- 你的语气应该符合你的人设,友好且真诚。
- 直接输出你要说的第一句话,不要包含任何额外的前缀或解释。
"""
else: # wake_up
prompt = f"""
# 角色
你的名字是{global_config.bot.nickname},你的人设如下:
- 核心人设: {persona['core']}
- 侧面人设: {persona['side']}
- 身份: {persona['identity']}
# 任务
现在是 {context['current_time']},你需要主动向你的朋友 '{user_info.user_nickname}' 发起对话。
# 决策上下文
- **决策理由**: {reason}
# 情境分析
1. **你的日程**:
{context['schedule_context']}
2. **你和Ta的关系**:
- 详细印象: {relationship['impression']}
- 好感度: {relationship['attitude']}/100
3. **最近的聊天摘要**:
{context['recent_chat_history']}
4. **你最近的相关动作**:
{context['action_history_context']}
# 对话指引
- 你决定和Ta聊聊关于“{topic}”的话题。
- 请结合以上所有情境信息,自然地开启对话。
- 你的语气应该符合你的人设以及你对Ta的好感度。
- 直接输出你要说的第一句话,不要包含任何额外的前缀或解释。
"""
return prompt

View File

@@ -319,7 +319,7 @@ class SetEmojiLikeAction(BaseAction):
try:
success = await self.send_command(
command_name="set_msg_emoji_like",
command_name="set_emoji_like",
args={"message_id": message_id, "emoji_id": emoji_id, "set": set_like},
storage_message=False,
)

View File

@@ -1,5 +1,5 @@
[inner]
version = "7.1.3"
version = "7.1.4"
#----以下是给开发人员阅读的如果你只是部署了MoFox-Bot不需要阅读----
#如果你想要修改配置文件请递增version的值
@@ -133,54 +133,6 @@ dynamic_distribution_max_interval = 30.0 # 最大分发间隔(秒)
dynamic_distribution_jitter_factor = 0.2 # 分发间隔随机扰动因子
max_concurrent_distributions = 10 # 最大并发处理的消息流数量可以根据API性能和服务器负载调整
talk_frequency_adjust = [
["", "8:00,1", "12:00,1.2", "18:00,1.5", "01:00,0.6"],
["qq:114514:group", "12:20,1", "16:10,2", "20:10,1", "00:10,0.3"],
["qq:1919810:private", "8:20,1", "12:10,2", "20:10,1.5", "00:10,0.2"]
]
# 基于聊天流的个性化活跃度配置
# 格式:[["platform:chat_id:type", "HH:MM,frequency", "HH:MM,frequency", ...], ...]
# 全局配置示例:
# [["", "8:00,1", "12:00,2", "18:00,1.5", "00:00,0.5"]]
# 特定聊天流配置示例:
# [
# ["", "8:00,1", "12:00,1.2", "18:00,1.5", "01:00,0.6"], # 全局默认配置
# ["qq:1026294844:group", "12:20,1", "16:10,2", "20:10,1", "00:10,0.3"], # 特定群聊配置
# ["qq:729957033:private", "8:20,1", "12:10,2", "20:10,1.5", "00:10,0.2"] # 特定私聊配置
# ]
# 说明:
# - 当第一个元素为空字符串""时,表示全局默认配置
# - 当第一个元素为"platform:id:type"格式时,表示特定聊天流配置
# - 后续元素是"时间,频率"格式,表示从该时间开始使用该活跃度,直到下一个时间点
# - 优先级:特定聊天流配置 > 全局配置 > 默认 talk_frequency
# 主动思考功能配置仅在focus模式下生效
enable_proactive_thinking = false # 是否启用主动思考功能
proactive_thinking_interval = 1500 # 主动思考触发间隔时间默认1500秒25分钟
# TIPS:
# 创意玩法可以设置为0设置为0时将基于delta_sigma生成纯随机间隔
# 负数保险:如果出现了负数,会自动使用绝对值
proactive_thinking_in_private = true # 主动思考可以在私聊里面启用
proactive_thinking_in_group = true # 主动思考可以在群聊里面启用
# 主动思考启用范围配置 - 按平台和类型分别配置,建议平台配置为小写
# 格式:["platform:user_id", "platform:user_id", ...]
# 示例:["qq:123456789", "telegram:user123", "bilibili:987654321"]
proactive_thinking_enable_in_private = [] # 启用主动思考的私聊范围,为空则不限制
proactive_thinking_enable_in_groups = [] # 启用主动思考的群聊范围,为空则不限制
delta_sigma = 120 # 正态分布的标准差,控制时间间隔的随机程度
# 特殊用法:
# - 设置为0禁用正态分布使用固定间隔
# - 设置得很大如6000产生高度随机的间隔即使基础间隔为0也能工作
# - 负数会自动转换为正数,不用担心配置错误以及极端边界情况
# 实验建议:试试 proactive_thinking_interval=0 + delta_sigma 非常大 的纯随机模式!
# 结果保证生成的间隔永远为正数负数会取绝对值最小1秒最大24小时
[relationship]
enable_relationship = true # 是否启用关系系统
@@ -303,11 +255,46 @@ max_frequency_bonus = 10.0 # 最大激活频率奖励天数
# 休眠机制
dormant_threshold_days = 90 # 休眠状态判定天数(超过此天数未访问的记忆进入休眠状态)
# 统一存储配置 (新增)
unified_storage_path = "data/unified_memory" # 统一存储数据路径
unified_storage_cache_limit = 10000 # 内存缓存大小限制
unified_storage_auto_save_interval = 50 # 自动保存间隔(记忆条数)
unified_storage_enable_compression = true # 是否启用数据压缩
# 统一存储配置 (已弃用 - 请使用Vector DB配置)
# DEPRECATED: unified_storage_path = "data/unified_memory"
# DEPRECATED: unified_storage_cache_limit = 10000
# DEPRECATED: unified_storage_auto_save_interval = 50
# DEPRECATED: unified_storage_enable_compression = true
# Vector DB存储配置 (新增 - 替代JSON存储)
enable_vector_memory_storage = true # 启用Vector DB存储
enable_llm_instant_memory = true # 启用基于LLM的瞬时记忆
enable_vector_instant_memory = true # 启用基于向量的瞬时记忆
# Vector DB配置
vector_db_memory_collection = "unified_memory_v2" # Vector DB主记忆集合名称
vector_db_metadata_collection = "memory_metadata_v2" # Vector DB元数据集合名称
vector_db_similarity_threshold = 0.5 # Vector DB相似度阈值 (推荐范围: 0.5-0.6, 过高会导致检索不到结果)
vector_db_search_limit = 20 # Vector DB单次搜索返回的最大结果数
vector_db_batch_size = 100 # 批处理大小 (批量存储记忆时每批处理的记忆条数)
vector_db_enable_caching = true # 启用内存缓存
vector_db_cache_size_limit = 1000 # 缓存大小限制 (内存缓存最多保存的记忆条数)
vector_db_auto_cleanup_interval = 3600 # 自动清理间隔(秒)
vector_db_retention_hours = 720 # 记忆保留时间小时默认30天
# 多阶段召回配置(可选)
# 取消注释以启用更严格的粗筛,适用于大规模记忆库(>10万条
# memory_importance_threshold = 0.3 # 重要性阈值过滤低价值记忆范围0.0-1.0
# memory_recency_days = 30 # 时间范围只搜索最近N天的记忆0表示不限制
# Vector DB配置 (ChromaDB)
[vector_db]
type = "chromadb" # Vector DB类型
path = "data/chroma_db" # Vector DB数据路径
[vector_db.settings]
anonymized_telemetry = false # 禁用匿名遥测
allow_reset = true # 允许重置
[vector_db.collections]
unified_memory_v2 = { description = "统一记忆存储V2", hnsw_space = "cosine", version = "2.0" }
memory_metadata_v2 = { description = "记忆元数据索引", hnsw_space = "cosine", version = "2.0" }
semantic_cache = { description = "语义缓存", hnsw_space = "cosine" }
[voice]
enable_asr = true # 是否启用语音识别启用后MoFox-Bot可以识别语音消息启用该功能需要配置语音识别模型[model.voice]
@@ -570,4 +557,33 @@ relationship_weight = 0.3 # 人物关系分数权重
# 提及bot相关参数
mention_bot_adjustment_threshold = 0.3 # 提及bot后的调整阈值
mention_bot_interest_score = 0.6 # 提及bot的兴趣分
base_relationship_score = 0.3 # 基础人物关系分
base_relationship_score = 0.3 # 基础人物关系分
[proactive_thinking] # 主动思考(主动发起对话)功能配置
# --- 总开关 ---
enable = true # 是否启用主动发起对话功能
# --- 触发时机 ---
# 基础触发间隔AI会围绕这个时间点主动发起对话
interval = 1500 # 默认25分钟
# 间隔随机化标准差让触发时间更自然。设为0则为固定间隔。
interval_sigma = 120
# 每日活跃度调整,格式:[["", "HH:MM,factor", ...], ["stream_id", ...]]
# factor > 1.0 会缩短思考间隔更活跃factor < 1.0 会延长间隔。
talk_frequency_adjust = [['', '8:00,1', '12:00,1.2', '18:00,1.5', '01:00,0.6']]
# --- 作用范围 ---
enable_in_private = true # 是否允许在私聊中主动发起对话
enable_in_group = true # 是否允许在群聊中主动发起对话
# 私聊白名单,为空则对所有私聊生效
# 格式: ["platform:user_id", ...] e.g., ["qq:123456"]
enabled_private_chats = []
# 群聊白名单,为空则对所有群聊生效
# 格式: ["platform:group_id", ...] e.g., ["qq:7891011"]
enabled_group_chats = []
# --- 冷启动配置 (针对私聊) ---
# 对于白名单中不活跃的私聊,是否允许进行一次“冷启动”问候
enable_cold_start = true
# 冷启动后,该私聊的下一次主动思考需要等待的最小时间(秒)
cold_start_cooldown = 86400 # 默认24小时