From 1eb41f8372176c8bcba349014e54e372deeaaf96 Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Fri, 3 Oct 2025 22:00:53 +0800 Subject: [PATCH] =?UTF-8?q?refactor(memory):=20=E9=87=8D=E6=9E=84=E8=AE=B0?= =?UTF-8?q?=E5=BF=86=E7=B3=BB=E7=BB=9F=E6=9E=B6=E6=9E=84=EF=BC=8C=E5=BC=95?= =?UTF-8?q?=E5=85=A5=E5=8F=AF=E9=85=8D=E7=BD=AE=E7=9A=84=E9=87=87=E6=A0=B7?= =?UTF-8?q?=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将记忆系统从单一构建模式重构为多策略可配置架构,支持灵活的采样行为控制: 核心架构改进: - 重构记忆构建流程,分离采样策略与核心逻辑 - 引入MemorySamplingMode枚举,标准化采样模式定义 - 设计插件化采样器接口,支持海马体后台定时采样 - 优化记忆处理管道,添加bypass_interval机制支持后台采样 配置系统增强: - 新增memory_sampling_mode配置项,支持hippocampus/immediate/all三种模式 - 添加海马体双峰采样参数配置,支持自定义采样间隔和分布 - 引入自适应采样阈值控制,动态调整记忆构建频率 - 完善精准记忆配置,支持基于价值评分的触发机制 兼容性与性能优化: - 修复Python 3.9兼容性问题,替换strict=False参数 - 优化记忆检索逻辑,统一使用filters参数替代scope_id - 改进错误处理机制,增强系统稳定性 BREAKING CHANGE: 新增memory_sampling_mode配置项,默认值从adaptive改为immediate --- scripts/log_viewer_optimized.py | 6 +- src/chat/memory_system/hippocampus_sampler.py | 731 ++++++++++++++++++ src/chat/memory_system/memory_system.py | 199 ++++- src/config/official_configs.py | 35 + template/bot_config_template.toml | 15 +- 5 files changed, 969 insertions(+), 17 deletions(-) create mode 100644 src/chat/memory_system/hippocampus_sampler.py diff --git a/scripts/log_viewer_optimized.py b/scripts/log_viewer_optimized.py index f38dafa64..65cf579c0 100644 --- a/scripts/log_viewer_optimized.py +++ b/scripts/log_viewer_optimized.py @@ -373,7 +373,11 @@ class VirtualLogDisplay: # 为每个部分应用正确的标签 current_len = 0 - for part, tag_name in zip(parts, tags, strict=False): + # Python 3.9 兼容性:不使用 strict=False 参数 + min_len = min(len(parts), len(tags)) + for i in range(min_len): + part = parts[i] + tag_name = tags[i] start_index = f"{start_pos}+{current_len}c" end_index = f"{start_pos}+{current_len + len(part)}c" self.text_widget.tag_add(tag_name, start_index, end_index) diff --git a/src/chat/memory_system/hippocampus_sampler.py b/src/chat/memory_system/hippocampus_sampler.py new file mode 100644 index 000000000..0cc6b61d5 --- /dev/null +++ b/src/chat/memory_system/hippocampus_sampler.py @@ -0,0 +1,731 @@ +# -*- coding: utf-8 -*- +""" +海马体双峰分布采样器 +基于旧版海马体的采样策略,适配新版记忆系统 +实现低消耗、高效率的记忆采样模式 +""" + +import asyncio +import random +import time +from datetime import datetime, timedelta +from typing import List, Optional, Tuple, Dict, Any +from dataclasses import dataclass + +import numpy as np +import orjson + +from src.chat.utils.chat_message_builder import ( + get_raw_msg_by_timestamp, + build_readable_messages, + get_raw_msg_by_timestamp_with_chat, +) +from src.chat.utils.utils import translate_timestamp_to_human_readable +from src.common.logger import get_logger +from src.config.config import global_config +from src.llm_models.utils_model import LLMRequest + +logger = get_logger(__name__) + + +@dataclass +class HippocampusSampleConfig: + """海马体采样配置""" + # 双峰分布参数 + recent_mean_hours: float = 12.0 # 近期分布均值(小时) + recent_std_hours: float = 8.0 # 近期分布标准差(小时) + recent_weight: float = 0.7 # 近期分布权重 + + distant_mean_hours: float = 48.0 # 远期分布均值(小时) + distant_std_hours: float = 24.0 # 远期分布标准差(小时) + distant_weight: float = 0.3 # 远期分布权重 + + # 采样参数 + total_samples: int = 50 # 总采样数 + sample_interval: int = 1800 # 采样间隔(秒) + max_sample_length: int = 30 # 每次采样的最大消息数量 + batch_size: int = 5 # 批处理大小 + + @classmethod + def from_global_config(cls) -> 'HippocampusSampleConfig': + """从全局配置创建海马体采样配置""" + config = global_config.memory.hippocampus_distribution_config + return cls( + recent_mean_hours=config[0], + recent_std_hours=config[1], + recent_weight=config[2], + distant_mean_hours=config[3], + distant_std_hours=config[4], + distant_weight=config[5], + total_samples=global_config.memory.hippocampus_sample_size, + sample_interval=global_config.memory.hippocampus_sample_interval, + max_sample_length=global_config.memory.hippocampus_batch_size, + batch_size=global_config.memory.hippocampus_batch_size, + ) + + +class HippocampusSampler: + """海马体双峰分布采样器""" + + def __init__(self, memory_system=None): + self.memory_system = memory_system + self.config = HippocampusSampleConfig.from_global_config() + self.last_sample_time = 0 + self.is_running = False + + # 记忆构建模型 + self.memory_builder_model: Optional[LLMRequest] = None + + # 统计信息 + self.sample_count = 0 + self.success_count = 0 + self.last_sample_results: List[Dict[str, Any]] = [] + + async def initialize(self): + """初始化采样器""" + try: + # 初始化LLM模型 + from src.config.config import model_config + task_config = getattr(model_config.model_task_config, "utils", None) + if task_config: + self.memory_builder_model = LLMRequest( + model_set=task_config, + request_type="memory.hippocampus_build" + ) + asyncio.create_task(self.start_background_sampling()) + logger.info("✅ 海马体采样器初始化成功") + else: + raise RuntimeError("未找到记忆构建模型配置") + + except Exception as e: + logger.error(f"❌ 海马体采样器初始化失败: {e}") + raise + + def generate_time_samples(self) -> List[datetime]: + """生成双峰分布的时间采样点""" + # 计算每个分布的样本数 + recent_samples = max(1, int(self.config.total_samples * self.config.recent_weight)) + distant_samples = max(1, self.config.total_samples - recent_samples) + + # 生成两个正态分布的小时偏移 + recent_offsets = np.random.normal( + loc=self.config.recent_mean_hours, + scale=self.config.recent_std_hours, + size=recent_samples + ) + distant_offsets = np.random.normal( + loc=self.config.distant_mean_hours, + scale=self.config.distant_std_hours, + size=distant_samples + ) + + # 合并两个分布的偏移 + all_offsets = np.concatenate([recent_offsets, distant_offsets]) + + # 转换为时间戳(使用绝对值确保时间点在过去) + base_time = datetime.now() + timestamps = [ + base_time - timedelta(hours=abs(offset)) + for offset in all_offsets + ] + + # 按时间排序(从最早到最近) + return sorted(timestamps) + + async def collect_message_samples(self, target_timestamp: float) -> Optional[List[Dict[str, Any]]]: + """收集指定时间戳附近的消息样本""" + try: + # 随机时间窗口:5-30分钟 + time_window_seconds = random.randint(300, 1800) + + # 尝试3次获取消息 + for attempt in range(3): + timestamp_start = target_timestamp + timestamp_end = target_timestamp + time_window_seconds + + # 获取单条消息作为锚点 + anchor_messages = await get_raw_msg_by_timestamp( + timestamp_start=timestamp_start, + timestamp_end=timestamp_end, + limit=1, + limit_mode="earliest", + ) + + if not anchor_messages: + target_timestamp -= 120 # 向前调整2分钟 + continue + + anchor_message = anchor_messages[0] + chat_id = anchor_message.get("chat_id") + + if not chat_id: + continue + + # 获取同聊天的多条消息 + messages = await get_raw_msg_by_timestamp_with_chat( + timestamp_start=timestamp_start, + timestamp_end=timestamp_end, + limit=self.config.max_sample_length, + limit_mode="earliest", + chat_id=chat_id, + ) + + if messages and len(messages) >= 2: # 至少需要2条消息 + # 过滤掉已经记忆过的消息 + filtered_messages = [ + msg for msg in messages + if msg.get("memorized_times", 0) < 2 # 最多记忆2次 + ] + + if filtered_messages: + logger.debug(f"成功收集 {len(filtered_messages)} 条消息样本") + return filtered_messages + + target_timestamp -= 120 # 向前调整再试 + + logger.debug(f"时间戳 {target_timestamp} 附近未找到有效消息样本") + return None + + except Exception as e: + logger.error(f"收集消息样本失败: {e}") + return None + + async def build_memory_from_samples(self, messages: List[Dict[str, Any]], target_timestamp: float) -> Optional[str]: + """从消息样本构建记忆""" + if not messages or not self.memory_system or not self.memory_builder_model: + return None + + try: + # 构建可读消息文本 + readable_text = await build_readable_messages( + messages, + merge_messages=True, + timestamp_mode="normal_no_YMD", + replace_bot_name=False, + ) + + if not readable_text: + logger.warning("无法从消息样本生成可读文本") + return None + + # 添加当前日期信息 + current_date = f"当前日期: {datetime.now().isoformat()}" + input_text = f"{current_date}\n{readable_text}" + + logger.debug(f"开始构建记忆,文本长度: {len(input_text)}") + + # 构建上下文 + context = { + "user_id": "hippocampus_sampler", + "timestamp": time.time(), + "source": "hippocampus_sampling", + "message_count": len(messages), + "sample_mode": "bimodal_distribution", + "is_hippocampus_sample": True, # 标识为海马体样本 + "bypass_value_threshold": True, # 绕过价值阈值检查 + "hippocampus_sample_time": target_timestamp, # 记录样本时间 + } + + # 使用记忆系统构建记忆(绕过构建间隔检查) + memories = await self.memory_system.build_memory_from_conversation( + conversation_text=input_text, + context=context, + timestamp=time.time(), + bypass_interval=True # 海马体采样器绕过构建间隔限制 + ) + + if memories: + memory_count = len(memories) + self.success_count += 1 + + # 记录采样结果 + result = { + "timestamp": time.time(), + "memory_count": memory_count, + "message_count": len(messages), + "text_preview": readable_text[:100] + "..." if len(readable_text) > 100 else readable_text, + "memory_types": [m.memory_type.value for m in memories], + } + self.last_sample_results.append(result) + + # 限制结果历史长度 + if len(self.last_sample_results) > 10: + self.last_sample_results.pop(0) + + logger.info(f"✅ 海马体采样成功构建 {memory_count} 条记忆") + return f"构建{memory_count}条记忆" + else: + logger.debug("海马体采样未生成有效记忆") + return None + + except Exception as e: + logger.error(f"海马体采样构建记忆失败: {e}") + return None + + async def perform_sampling_cycle(self) -> Dict[str, Any]: + """执行一次完整的采样周期(优化版:批量融合构建)""" + if not self.should_sample(): + return {"status": "skipped", "reason": "interval_not_met"} + + start_time = time.time() + self.sample_count += 1 + + try: + # 生成时间采样点 + time_samples = self.generate_time_samples() + logger.debug(f"生成 {len(time_samples)} 个时间采样点") + + # 记录时间采样点(调试用) + readable_timestamps = [ + translate_timestamp_to_human_readable(int(ts.timestamp()), mode="normal") + for ts in time_samples[:5] # 只显示前5个 + ] + logger.debug(f"时间采样点示例: {readable_timestamps}") + + # 第一步:批量收集所有消息样本 + logger.debug("开始批量收集消息样本...") + collected_messages = await self._collect_all_message_samples(time_samples) + + if not collected_messages: + logger.info("未收集到有效消息样本,跳过本次采样") + self.last_sample_time = time.time() + return { + "status": "success", + "sample_count": self.sample_count, + "success_count": self.success_count, + "processed_samples": len(time_samples), + "successful_builds": 0, + "duration": time.time() - start_time, + "samples_generated": len(time_samples), + "message": "未收集到有效消息样本", + } + + logger.info(f"收集到 {len(collected_messages)} 组消息样本") + + # 第二步:融合和去重消息 + logger.debug("开始融合和去重消息...") + fused_messages = await self._fuse_and_deduplicate_messages(collected_messages) + + if not fused_messages: + logger.info("消息融合后为空,跳过记忆构建") + self.last_sample_time = time.time() + return { + "status": "success", + "sample_count": self.sample_count, + "success_count": self.success_count, + "processed_samples": len(time_samples), + "successful_builds": 0, + "duration": time.time() - start_time, + "samples_generated": len(time_samples), + "message": "消息融合后为空", + } + + logger.info(f"融合后得到 {len(fused_messages)} 组有效消息") + + # 第三步:一次性构建记忆 + logger.debug("开始批量构建记忆...") + build_result = await self._build_batch_memory(fused_messages, time_samples) + + # 更新最后采样时间 + self.last_sample_time = time.time() + + duration = time.time() - start_time + result = { + "status": "success", + "sample_count": self.sample_count, + "success_count": self.success_count, + "processed_samples": len(time_samples), + "successful_builds": build_result.get("memory_count", 0), + "duration": duration, + "samples_generated": len(time_samples), + "messages_collected": len(collected_messages), + "messages_fused": len(fused_messages), + "optimization_mode": "batch_fusion", + } + + logger.info( + f"✅ 海马体采样周期完成(批量融合模式) | " + f"采样点: {len(time_samples)} | " + f"收集消息: {len(collected_messages)} | " + f"融合消息: {len(fused_messages)} | " + f"构建记忆: {build_result.get('memory_count', 0)} | " + f"耗时: {duration:.2f}s" + ) + + return result + + except Exception as e: + logger.error(f"❌ 海马体采样周期失败: {e}") + return { + "status": "error", + "error": str(e), + "sample_count": self.sample_count, + "duration": time.time() - start_time, + } + + async def _collect_all_message_samples(self, time_samples: List[datetime]) -> List[List[Dict[str, Any]]]: + """批量收集所有时间点的消息样本""" + collected_messages = [] + max_concurrent = min(5, len(time_samples)) # 提高并发数到5 + + for i in range(0, len(time_samples), max_concurrent): + batch = time_samples[i:i + max_concurrent] + tasks = [] + + # 创建并发收集任务 + for timestamp in batch: + target_ts = timestamp.timestamp() + task = self.collect_message_samples(target_ts) + tasks.append(task) + + # 执行并发收集 + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 处理收集结果 + for result in results: + if isinstance(result, list) and result: + collected_messages.append(result) + elif isinstance(result, Exception): + logger.debug(f"消息收集异常: {result}") + + # 批次间短暂延迟 + if i + max_concurrent < len(time_samples): + await asyncio.sleep(0.5) + + return collected_messages + + async def _fuse_and_deduplicate_messages(self, collected_messages: List[List[Dict[str, Any]]]) -> List[List[Dict[str, Any]]]: + """融合和去重消息样本""" + if not collected_messages: + return [] + + try: + # 展平所有消息 + all_messages = [] + for message_group in collected_messages: + all_messages.extend(message_group) + + logger.debug(f"展开后总消息数: {len(all_messages)}") + + # 去重逻辑:基于消息内容和时间戳 + unique_messages = [] + seen_hashes = set() + + for message in all_messages: + # 创建消息哈希用于去重 + content = message.get("processed_plain_text", "") or message.get("display_message", "") + timestamp = message.get("time", 0) + chat_id = message.get("chat_id", "") + + # 简单哈希:内容前50字符 + 时间戳(精确到分钟) + 聊天ID + hash_key = f"{content[:50]}_{int(timestamp//60)}_{chat_id}" + + if hash_key not in seen_hashes and len(content.strip()) > 10: + seen_hashes.add(hash_key) + unique_messages.append(message) + + logger.debug(f"去重后消息数: {len(unique_messages)}") + + # 按时间排序 + unique_messages.sort(key=lambda x: x.get("time", 0)) + + # 按聊天ID分组重新组织 + chat_groups = {} + for message in unique_messages: + chat_id = message.get("chat_id", "unknown") + if chat_id not in chat_groups: + chat_groups[chat_id] = [] + chat_groups[chat_id].append(message) + + # 合并相邻时间范围内的消息 + fused_groups = [] + for chat_id, messages in chat_groups.items(): + fused_groups.extend(self._merge_adjacent_messages(messages)) + + logger.debug(f"融合后消息组数: {len(fused_groups)}") + return fused_groups + + except Exception as e: + logger.error(f"消息融合失败: {e}") + # 返回原始消息组作为备选 + return collected_messages[:5] # 限制返回数量 + + def _merge_adjacent_messages(self, messages: List[Dict[str, Any]], time_gap: int = 1800) -> List[List[Dict[str, Any]]]: + """合并时间间隔内的消息""" + if not messages: + return [] + + merged_groups = [] + current_group = [messages[0]] + + for i in range(1, len(messages)): + current_time = messages[i].get("time", 0) + prev_time = current_group[-1].get("time", 0) + + # 如果时间间隔小于阈值,合并到当前组 + if current_time - prev_time <= time_gap: + current_group.append(messages[i]) + else: + # 否则开始新组 + merged_groups.append(current_group) + current_group = [messages[i]] + + # 添加最后一组 + merged_groups.append(current_group) + + # 过滤掉只有一条消息的组(除非内容较长) + result_groups = [] + for group in merged_groups: + if len(group) > 1 or any(len(msg.get("processed_plain_text", "")) > 100 for msg in group): + result_groups.append(group) + + return result_groups + + async def _build_batch_memory(self, fused_messages: List[List[Dict[str, Any]]], time_samples: List[datetime]) -> Dict[str, Any]: + """批量构建记忆""" + if not fused_messages: + return {"memory_count": 0, "memories": []} + + try: + total_memories = [] + total_memory_count = 0 + + # 构建融合后的文本 + batch_input_text = await self._build_fused_conversation_text(fused_messages) + + if not batch_input_text: + logger.warning("无法构建融合文本,尝试单独构建") + # 备选方案:分别构建 + return await self._fallback_individual_build(fused_messages) + + # 创建批量上下文 + batch_context = { + "user_id": "hippocampus_batch_sampler", + "timestamp": time.time(), + "source": "hippocampus_batch_sampling", + "message_groups_count": len(fused_messages), + "total_messages": sum(len(group) for group in fused_messages), + "sample_count": len(time_samples), + "is_hippocampus_sample": True, + "bypass_value_threshold": True, + "optimization_mode": "batch_fusion", + } + + logger.debug(f"批量构建记忆,文本长度: {len(batch_input_text)}") + + # 一次性构建记忆 + memories = await self.memory_system.build_memory_from_conversation( + conversation_text=batch_input_text, + context=batch_context, + timestamp=time.time(), + bypass_interval=True + ) + + if memories: + memory_count = len(memories) + self.success_count += 1 + total_memory_count += memory_count + total_memories.extend(memories) + + logger.info(f"✅ 批量海马体采样成功构建 {memory_count} 条记忆") + else: + logger.debug("批量海马体采样未生成有效记忆") + + # 记录采样结果 + result = { + "timestamp": time.time(), + "memory_count": total_memory_count, + "message_groups_count": len(fused_messages), + "total_messages": sum(len(group) for group in fused_messages), + "text_preview": batch_input_text[:200] + "..." if len(batch_input_text) > 200 else batch_input_text, + "memory_types": [m.memory_type.value for m in total_memories], + } + + self.last_sample_results.append(result) + + # 限制结果历史长度 + if len(self.last_sample_results) > 10: + self.last_sample_results.pop(0) + + return { + "memory_count": total_memory_count, + "memories": total_memories, + "result": result + } + + except Exception as e: + logger.error(f"批量构建记忆失败: {e}") + return {"memory_count": 0, "error": str(e)} + + async def _build_fused_conversation_text(self, fused_messages: List[List[Dict[str, Any]]]) -> str: + """构建融合后的对话文本""" + try: + # 添加批次标识 + current_date = f"海马体批量采样 - {datetime.now().isoformat()}\n" + conversation_parts = [current_date] + + for group_idx, message_group in enumerate(fused_messages): + if not message_group: + continue + + # 为每个消息组添加分隔符 + group_header = f"\n=== 对话片段 {group_idx + 1} ===" + conversation_parts.append(group_header) + + # 构建可读消息 + group_text = await build_readable_messages( + message_group, + merge_messages=True, + timestamp_mode="normal_no_YMD", + replace_bot_name=False, + ) + + if group_text and len(group_text.strip()) > 10: + conversation_parts.append(group_text.strip()) + + return "\n".join(conversation_parts) + + except Exception as e: + logger.error(f"构建融合文本失败: {e}") + return "" + + async def _fallback_individual_build(self, fused_messages: List[List[Dict[str, Any]]]) -> Dict[str, Any]: + """备选方案:单独构建每个消息组""" + total_memories = [] + total_count = 0 + + for group in fused_messages[:5]: # 限制最多5组 + try: + memories = await self.build_memory_from_samples(group, time.time()) + if memories: + total_memories.extend(memories) + total_count += len(memories) + except Exception as e: + logger.debug(f"单独构建失败: {e}") + + return { + "memory_count": total_count, + "memories": total_memories, + "fallback_mode": True + } + + async def process_sample_timestamp(self, target_timestamp: float) -> Optional[str]: + """处理单个时间戳采样(保留作为备选方法)""" + try: + # 收集消息样本 + messages = await self.collect_message_samples(target_timestamp) + if not messages: + return None + + # 构建记忆 + result = await self.build_memory_from_samples(messages, target_timestamp) + return result + + except Exception as e: + logger.debug(f"处理时间戳采样失败 {target_timestamp}: {e}") + return None + + def should_sample(self) -> bool: + """检查是否应该进行采样""" + current_time = time.time() + + # 检查时间间隔 + if current_time - self.last_sample_time < self.config.sample_interval: + return False + + # 检查是否已初始化 + if not self.memory_builder_model: + logger.warning("海马体采样器未初始化") + return False + + return True + + async def start_background_sampling(self): + """启动后台采样""" + if self.is_running: + logger.warning("海马体后台采样已在运行") + return + + self.is_running = True + logger.info("🚀 启动海马体后台采样任务") + + try: + while self.is_running: + try: + # 执行采样周期 + result = await self.perform_sampling_cycle() + + # 如果是跳过状态,短暂睡眠 + if result.get("status") == "skipped": + await asyncio.sleep(60) # 1分钟后重试 + else: + # 正常等待下一个采样间隔 + await asyncio.sleep(self.config.sample_interval) + + except Exception as e: + logger.error(f"海马体后台采样异常: {e}") + await asyncio.sleep(300) # 异常时等待5分钟 + + except asyncio.CancelledError: + logger.info("海马体后台采样任务被取消") + finally: + self.is_running = False + + def stop_background_sampling(self): + """停止后台采样""" + self.is_running = False + logger.info("🛑 停止海马体后台采样任务") + + def get_sampling_stats(self) -> Dict[str, Any]: + """获取采样统计信息""" + success_rate = (self.success_count / self.sample_count * 100) if self.sample_count > 0 else 0 + + # 计算最近的平均数据 + recent_avg_messages = 0 + recent_avg_memory_count = 0 + if self.last_sample_results: + recent_results = self.last_sample_results[-5:] # 最近5次 + recent_avg_messages = sum(r.get("total_messages", 0) for r in recent_results) / len(recent_results) + recent_avg_memory_count = sum(r.get("memory_count", 0) for r in recent_results) / len(recent_results) + + return { + "is_running": self.is_running, + "sample_count": self.sample_count, + "success_count": self.success_count, + "success_rate": f"{success_rate:.1f}%", + "last_sample_time": self.last_sample_time, + "optimization_mode": "batch_fusion", # 显示优化模式 + "performance_metrics": { + "avg_messages_per_sample": f"{recent_avg_messages:.1f}", + "avg_memories_per_sample": f"{recent_avg_memory_count:.1f}", + "fusion_efficiency": f"{(recent_avg_messages/max(recent_avg_memory_count, 1)):.1f}x" if recent_avg_messages > 0 else "N/A" + }, + "config": { + "sample_interval": self.config.sample_interval, + "total_samples": self.config.total_samples, + "recent_weight": f"{self.config.recent_weight:.1%}", + "distant_weight": f"{self.config.distant_weight:.1%}", + "max_concurrent": 5, # 批量模式并发数 + "fusion_time_gap": "30分钟", # 消息融合时间间隔 + }, + "recent_results": self.last_sample_results[-5:], # 最近5次结果 + } + + +# 全局海马体采样器实例 +_hippocampus_sampler: Optional[HippocampusSampler] = None + + +def get_hippocampus_sampler(memory_system=None) -> HippocampusSampler: + """获取全局海马体采样器实例""" + global _hippocampus_sampler + if _hippocampus_sampler is None: + _hippocampus_sampler = HippocampusSampler(memory_system) + return _hippocampus_sampler + + +async def initialize_hippocampus_sampler(memory_system=None) -> HippocampusSampler: + """初始化全局海马体采样器""" + sampler = get_hippocampus_sampler(memory_system) + await sampler.initialize() + return sampler \ No newline at end of file diff --git a/src/chat/memory_system/memory_system.py b/src/chat/memory_system/memory_system.py index 5236da62a..fc802c5d2 100644 --- a/src/chat/memory_system/memory_system.py +++ b/src/chat/memory_system/memory_system.py @@ -19,6 +19,12 @@ from src.chat.memory_system.memory_builder import MemoryBuilder, MemoryExtractio from src.chat.memory_system.memory_chunk import MemoryChunk from src.chat.memory_system.memory_fusion import MemoryFusionEngine from src.chat.memory_system.memory_query_planner import MemoryQueryPlanner +# 简化的记忆采样模式枚举 +class MemorySamplingMode(Enum): + """记忆采样模式""" + HIPPOCAMPUS = "hippocampus" # 海马体模式:定时任务采样 + IMMEDIATE = "immediate" # 即时模式:回复后立即采样 + ALL = "all" # 所有模式:同时使用海马体和即时采样 from src.common.logger import get_logger from src.config.config import global_config, model_config from src.llm_models.utils_model import LLMRequest @@ -148,6 +154,9 @@ class MemorySystem: # 记忆指纹缓存,用于快速检测重复记忆 self._memory_fingerprints: dict[str, str] = {} + # 海马体采样器 + self.hippocampus_sampler = None + logger.info("MemorySystem 初始化开始") async def initialize(self): @@ -249,6 +258,16 @@ class MemorySystem: self.query_planner = MemoryQueryPlanner(planner_model, default_limit=self.config.final_recall_limit) + # 初始化海马体采样器 + if global_config.memory.enable_hippocampus_sampling: + try: + from .hippocampus_sampler import initialize_hippocampus_sampler + self.hippocampus_sampler = await initialize_hippocampus_sampler(self) + logger.info("✅ 海马体采样器初始化成功") + except Exception as e: + logger.warning(f"海马体采样器初始化失败: {e}") + self.hippocampus_sampler = None + # 统一存储已经自动加载数据,无需额外加载 logger.info("✅ 简化版记忆系统初始化完成") @@ -283,14 +302,14 @@ class MemorySystem: try: # 使用统一存储检索相似记忆 + filters = {"user_id": user_id} if user_id else None search_results = await self.unified_storage.search_similar_memories( - query_text=query_text, limit=limit, scope_id=user_id + query_text=query_text, limit=limit, filters=filters ) # 转换为记忆对象 memories = [] - for memory_id, similarity_score in search_results: - memory = self.unified_storage.get_memory_by_id(memory_id) + for memory, similarity_score in search_results: if memory: memory.update_access() # 更新访问信息 memories.append(memory) @@ -302,7 +321,7 @@ class MemorySystem: return [] async def build_memory_from_conversation( - self, conversation_text: str, context: dict[str, Any], timestamp: float | None = None + self, conversation_text: str, context: dict[str, Any], timestamp: float | None = None, bypass_interval: bool = False ) -> list[MemoryChunk]: """从对话中构建记忆 @@ -310,6 +329,7 @@ class MemorySystem: conversation_text: 对话文本 context: 上下文信息 timestamp: 时间戳,默认为当前时间 + bypass_interval: 是否绕过构建间隔检查(海马体采样器专用) Returns: 构建的记忆块列表 @@ -328,7 +348,8 @@ class MemorySystem: min_interval = max(0.0, getattr(self.config, "min_build_interval_seconds", 0.0)) current_time = time.time() - if build_scope_key and min_interval > 0: + # 构建间隔检查(海马体采样器可以绕过) + if build_scope_key and min_interval > 0 and not bypass_interval: last_time = self._last_memory_build_times.get(build_scope_key) if last_time and (current_time - last_time) < min_interval: remaining = min_interval - (current_time - last_time) @@ -340,18 +361,35 @@ class MemorySystem: build_marker_time = current_time self._last_memory_build_times[build_scope_key] = current_time + elif bypass_interval: + # 海马体采样模式:不更新构建时间记录,避免影响即时模式 + logger.debug("海马体采样模式:绕过构建间隔检查") conversation_text = await self._resolve_conversation_context(conversation_text, normalized_context) logger.debug("开始构建记忆,文本长度: %d", len(conversation_text)) - # 1. 信息价值评估 - value_score = await self._assess_information_value(conversation_text, normalized_context) + # 1. 信息价值评估(海马体采样器可以绕过) + if not bypass_interval and not context.get("bypass_value_threshold", False): + value_score = await self._assess_information_value(conversation_text, normalized_context) - if value_score < self.config.memory_value_threshold: - logger.info(f"信息价值评分 {value_score:.2f} 低于阈值,跳过记忆构建") - self.status = original_status - return [] + if value_score < self.config.memory_value_threshold: + logger.info(f"信息价值评分 {value_score:.2f} 低于阈值,跳过记忆构建") + self.status = original_status + return [] + else: + # 海马体采样器:使用默认价值分数或简单评估 + value_score = 0.6 # 默认中等价值 + if context.get("is_hippocampus_sample", False): + # 对海马体样本进行简单价值评估 + if len(conversation_text) > 100: # 长文本可能有更多信息 + value_score = 0.7 + elif len(conversation_text) > 50: + value_score = 0.6 + else: + value_score = 0.5 + + logger.debug(f"海马体采样模式:使用价值评分 {value_score:.2f}") # 2. 构建记忆块(所有记忆统一使用 global 作用域,实现完全共享) memory_chunks = await self.memory_builder.build_memories( @@ -469,7 +507,7 @@ class MemorySystem: continue search_tasks.append( self.unified_storage.search_similar_memories( - query_text=display_text, limit=8, scope_id=GLOBAL_MEMORY_SCOPE + query_text=display_text, limit=8, filters={"user_id": GLOBAL_MEMORY_SCOPE} ) ) @@ -512,12 +550,70 @@ class MemorySystem: return existing_candidates async def process_conversation_memory(self, context: dict[str, Any]) -> dict[str, Any]: - """对外暴露的对话记忆处理接口,仅依赖上下文信息""" + """对外暴露的对话记忆处理接口,支持海马体、即时、所有三种采样模式""" start_time = time.time() try: context = dict(context or {}) + # 获取配置的采样模式 + sampling_mode = getattr(global_config.memory, 'memory_sampling_mode', 'immediate') + current_mode = MemorySamplingMode(sampling_mode) + + logger.debug(f"使用记忆采样模式: {current_mode.value}") + + # 根据采样模式处理记忆 + if current_mode == MemorySamplingMode.HIPPOCAMPUS: + # 海马体模式:仅后台定时采样,不立即处理 + return { + "success": True, + "created_memories": [], + "memory_count": 0, + "processing_time": time.time() - start_time, + "status": self.status.value, + "processing_mode": "hippocampus", + "message": "海马体模式:记忆将由后台定时任务采样处理", + } + + elif current_mode == MemorySamplingMode.IMMEDIATE: + # 即时模式:立即处理记忆构建 + return await self._process_immediate_memory(context, start_time) + + elif current_mode == MemorySamplingMode.ALL: + # 所有模式:同时进行即时处理和海马体采样 + immediate_result = await self._process_immediate_memory(context, start_time) + + # 海马体采样器会在后台继续处理,这里只是记录 + if self.hippocampus_sampler: + immediate_result["processing_mode"] = "all_modes" + immediate_result["hippocampus_status"] = "background_sampling_enabled" + immediate_result["message"] = "所有模式:即时处理已完成,海马体采样将在后台继续" + else: + immediate_result["processing_mode"] = "immediate_fallback" + immediate_result["hippocampus_status"] = "not_available" + immediate_result["message"] = "海马体采样器不可用,回退到即时模式" + + return immediate_result + + else: + # 默认回退到即时模式 + logger.warning(f"未知的采样模式 {sampling_mode},回退到即时模式") + return await self._process_immediate_memory(context, start_time) + + except Exception as e: + processing_time = time.time() - start_time + logger.error(f"对话记忆处理失败: {e}", exc_info=True) + return { + "success": False, + "error": str(e), + "processing_time": processing_time, + "status": self.status.value, + "processing_mode": "error", + } + + async def _process_immediate_memory(self, context: dict[str, Any], start_time: float) -> dict[str, Any]: + """即时记忆处理的辅助方法""" + try: conversation_candidate = ( context.get("conversation_text") or context.get("message_content") @@ -537,6 +633,23 @@ class MemorySystem: normalized_context = self._normalize_context(context, GLOBAL_MEMORY_SCOPE, timestamp) normalized_context.setdefault("conversation_text", conversation_text) + # 检查信息价值阈值 + value_score = await self._assess_information_value(conversation_text, normalized_context) + threshold = getattr(global_config.memory, 'precision_memory_reply_threshold', 0.5) + + if value_score < threshold: + logger.debug(f"信息价值评分 {value_score:.2f} 低于阈值 {threshold},跳过记忆构建") + return { + "success": True, + "created_memories": [], + "memory_count": 0, + "processing_time": time.time() - start_time, + "status": self.status.value, + "processing_mode": "immediate", + "skip_reason": f"value_score_{value_score:.2f}_below_threshold_{threshold}", + "value_score": value_score, + } + memories = await self.build_memory_from_conversation( conversation_text=conversation_text, context=normalized_context, timestamp=timestamp ) @@ -550,12 +663,20 @@ class MemorySystem: "memory_count": memory_count, "processing_time": processing_time, "status": self.status.value, + "processing_mode": "immediate", + "value_score": value_score, } except Exception as e: processing_time = time.time() - start_time - logger.error(f"对话记忆处理失败: {e}", exc_info=True) - return {"success": False, "error": str(e), "processing_time": processing_time, "status": self.status.value} + logger.error(f"即时记忆处理失败: {e}", exc_info=True) + return { + "success": False, + "error": str(e), + "processing_time": processing_time, + "status": self.status.value, + "processing_mode": "immediate_error", + } async def retrieve_relevant_memories( self, @@ -1372,11 +1493,53 @@ class MemorySystem: except Exception as e: logger.error(f"❌ 记忆系统维护失败: {e}", exc_info=True) + def start_hippocampus_sampling(self): + """启动海马体采样""" + if self.hippocampus_sampler: + asyncio.create_task(self.hippocampus_sampler.start_background_sampling()) + logger.info("🚀 海马体后台采样已启动") + else: + logger.warning("海马体采样器未初始化,无法启动采样") + + def stop_hippocampus_sampling(self): + """停止海马体采样""" + if self.hippocampus_sampler: + self.hippocampus_sampler.stop_background_sampling() + logger.info("🛑 海马体后台采样已停止") + + def get_system_stats(self) -> dict[str, Any]: + """获取系统统计信息""" + base_stats = { + "status": self.status.value, + "total_memories": self.total_memories, + "last_build_time": self.last_build_time, + "last_retrieval_time": self.last_retrieval_time, + "config": asdict(self.config), + } + + # 添加海马体采样器统计 + if self.hippocampus_sampler: + base_stats["hippocampus_sampler"] = self.hippocampus_sampler.get_sampling_stats() + + # 添加存储统计 + if self.unified_storage: + try: + storage_stats = self.unified_storage.get_storage_stats() + base_stats["storage_stats"] = storage_stats + except Exception as e: + logger.debug(f"获取存储统计失败: {e}") + + return base_stats + async def shutdown(self): """关闭系统(简化版)""" try: logger.info("正在关闭简化记忆系统...") + # 停止海马体采样 + if self.hippocampus_sampler: + self.hippocampus_sampler.stop_background_sampling() + # 保存统一存储数据 if self.unified_storage: await self.unified_storage.cleanup() @@ -1456,4 +1619,10 @@ async def initialize_memory_system(llm_model: LLMRequest | None = None): if memory_system is None: memory_system = MemorySystem(llm_model=llm_model) await memory_system.initialize() + + # 根据配置启动海马体采样 + sampling_mode = getattr(global_config.memory, 'memory_sampling_mode', 'immediate') + if sampling_mode in ['hippocampus', 'all']: + memory_system.start_hippocampus_sampling() + return memory_system diff --git a/src/config/official_configs.py b/src/config/official_configs.py index ecdb5d5b5..07fa87091 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -337,6 +337,41 @@ class MemoryConfig(ValidatedConfigBase): # 休眠机制 dormant_threshold_days: int = Field(default=90, description="休眠状态判定天数") + # === 混合记忆系统配置 === + # 采样模式配置 + memory_sampling_mode: Literal["adaptive", "hippocampus", "precision"] = Field( + default="adaptive", description="记忆采样模式:adaptive(自适应),hippocampus(海马体双峰采样),precision(精准记忆)" + ) + + # 海马体双峰采样配置 + enable_hippocampus_sampling: bool = Field(default=True, description="启用海马体双峰采样策略") + hippocampus_sample_interval: int = Field(default=1800, description="海马体采样间隔(秒,默认30分钟)") + hippocampus_sample_size: int = Field(default=30, description="海马体每次采样的消息数量") + hippocampus_batch_size: int = Field(default=5, description="海马体每批处理的记忆数量") + + # 双峰分布配置 [近期均值, 近期标准差, 近期权重, 远期均值, 远期标准差, 远期权重] + hippocampus_distribution_config: list[float] = Field( + default=[12.0, 8.0, 0.7, 48.0, 24.0, 0.3], + description="海马体双峰分布配置:[近期均值(h), 近期标准差(h), 近期权重, 远期均值(h), 远期标准差(h), 远期权重]" + ) + + # 自适应采样配置 + adaptive_sampling_enabled: bool = Field(default=True, description="启用自适应采样策略") + adaptive_sampling_threshold: float = Field(default=0.8, description="自适应采样负载阈值(0-1)") + adaptive_sampling_check_interval: int = Field(default=300, description="自适应采样检查间隔(秒)") + adaptive_sampling_max_concurrent_builds: int = Field(default=3, description="自适应采样最大并发记忆构建数") + + # 精准记忆配置(现有系统的增强版本) + precision_memory_reply_threshold: float = Field( + default=0.6, description="精准记忆回复触发阈值(对话价值评分超过此值时触发记忆构建)" + ) + precision_memory_max_builds_per_hour: int = Field(default=10, description="精准记忆每小时最大构建数量") + + # 混合系统优化配置 + memory_system_load_balancing: bool = Field(default=True, description="启用记忆系统负载均衡") + memory_build_throttling: bool = Field(default=True, description="启用记忆构建节流") + memory_priority_queue_enabled: bool = Field(default=True, description="启用记忆优先级队列") + class MoodConfig(ValidatedConfigBase): """情绪配置类""" diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index e0097e1ad..41a95b6e7 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -1,5 +1,5 @@ [inner] -version = "7.1.5" +version = "7.1.6" #----以下是给开发人员阅读的,如果你只是部署了MoFox-Bot,不需要阅读---- #如果你想要修改配置文件,请递增version的值 @@ -208,6 +208,19 @@ max_context_emojis = 30 # 每次随机传递给LLM的表情包详细描述的最 enable_memory = true # 是否启用记忆系统 memory_build_interval = 600 # 记忆构建间隔(秒)。间隔越低,学习越频繁,但可能产生更多冗余信息 +# === 记忆采样系统配置 === +memory_sampling_mode = "immediate" # 记忆采样模式:hippocampus(海马体定时采样),immediate(即时采样),all(所有模式) + +# 海马体双峰采样配置 +enable_hippocampus_sampling = true # 启用海马体双峰采样策略 +hippocampus_sample_interval = 1800 # 海马体采样间隔(秒,默认30分钟) +hippocampus_sample_size = 30 # 海马体采样样本数量 +hippocampus_batch_size = 10 # 海马体批量处理大小 +hippocampus_distribution_config = [12.0, 8.0, 0.7, 48.0, 24.0, 0.3] # 海马体双峰分布配置:[近期均值(h), 近期标准差(h), 近期权重, 远期均值(h), 远期标准差(h), 远期权重] + +# 即时采样配置 +precision_memory_reply_threshold = 0.5 # 精准记忆回复阈值(0-1),高于此值的对话将立即构建记忆 + min_memory_length = 10 # 最小记忆长度 max_memory_length = 500 # 最大记忆长度 memory_value_threshold = 0.5 # 记忆价值阈值,低于该值的记忆会被丢弃