diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 000000000..b31e37f10 --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,263 @@ +# MoFox_Bot AI Coding Agent Instructions + +MoFox_Bot 是基于 MaiCore 的增强型 QQ 聊天机器人,集成了 LLM、插件系统、记忆图谱、情感系统等高级特性。本指南帮助 AI 代理快速理解项目架构并高效开发。 + +## 🏗️ 核心架构 + +### 应用启动流程 +- **入口点**: `bot.py` → `src/main.py` 的 `MainSystem` 类 +- **启动顺序**: EULA 检查 → 数据库初始化 → 插件加载 → 组件初始化 → WebUI 启动(可选) +- **关键管理器**: 所有管理器通过单例模式获取(如 `get_xxx_manager()`) + +### 六层数据库架构 +项目在 2025年11月重构了完整的数据库层,采用 **SQLAlchemy 2.0**: + +1. **Core Layer** (`src/common/database/core.py`): `DatabaseEngine` 单例、WAL 模式 SQLite、连接池管理 +2. **API Layer** (`src/common/database/api/`): `CRUDBase` 通用 CRUD、`QueryBuilder` 链式查询、`specialized.py` 业务特化 API +3. **Optimization Layer** (`src/common/database/optimization/`): 3级缓存 (L1内存/L2 SQLite/L3预加载)、`IntelligentPreloader`、`AdaptiveBatchScheduler` +4. **Config Layer** (`src/common/database/config/`): 数据库/缓存/预加载器配置 +5. **Utils Layer** (`src/common/database/utils/`): 装饰器(重试、超时、缓存)、性能监控 +6. **Compatibility Layer** (`src/common/database/compatibility/`): 向后兼容旧 API(`db_query`、`db_save` 等) + +**关键原则**: +- ✅ 新代码使用 `CRUDBase` 或 `QueryBuilder` +- ✅ 批量操作使用 `AdaptiveBatchScheduler` +- ⚠️ 避免直接使用 `Session`,使用提供的 API 层 +- ⚠️ 数据模型在 `src/common/database/sqlalchemy_models.py` 统一定义 + +### 插件系统架构 +**核心概念**: 组件化设计,插件包含多个可注册组件 + +**组件类型** (`src/plugin_system/base/component_types.py`): +- `ACTION`: 主动/被动行为(回复、发送表情、禁言等) +- `COMMAND`: 命令处理(传统 `/` 前缀命令) +- `PLUS_COMMAND`: 增强命令(支持参数解析、权限检查) +- `TOOL`: LLM 工具调用(函数调用集成) +- `EVENT_HANDLER`: 事件订阅处理器 +- `INTEREST_CALCULATOR`: 兴趣值计算器 +- `PROMPT`: 自定义提示词注入 + +**插件开发流程**: +1. 在 `plugins/` 下创建目录,编写 `_manifest.json` +2. 创建 `plugin.py`,继承 `BasePlugin` 或 `PlusPlugin` +3. 使用 `@register_plugin` 装饰器注册 +4. 实现 `get_plugin_components()` 返回组件列表 +5. 组件通过 `ComponentRegistry` 自动注册 + +**示例结构**: +```python +from src.plugin_system import BasePlugin, register_plugin, BaseAction + +@register_plugin +class MyPlugin(BasePlugin): + plugin_name = "my_plugin" + enable_plugin = True + + def get_plugin_components(self): + return [(ActionInfo(...), MyAction)] +``` + +**关键 API** (`src/plugin_system/apis/`): +- `chat_api`: 聊天功能(获取消息、发送消息) +- `database_api`: 数据库操作(推荐使用新 API) +- `llm_api`: LLM 交互(模型调用、工具注册) +- `permission_api`: 权限管理(检查权限、节点操作) +- `component_manage_api`: 组件查询与管理 + +### 统一调度器(Unified Scheduler) +**位置**: `src/schedule/unified_scheduler.py` + +**触发类型**: +- `TIME`: 延迟触发(`delay_seconds`)或指定时间(`trigger_at`) +- `EVENT`: 事件触发(基于 `event_manager`) +- `CUSTOM`: 自定义条件函数 + +**使用模式**: +```python +from src.schedule.unified_scheduler import unified_scheduler, TriggerType + +await unified_scheduler.create_schedule( + callback=my_async_function, + trigger_type=TriggerType.TIME, + trigger_config={"delay_seconds": 30}, + is_recurring=True, + task_name="periodic_task" +) +``` + +⚠️ **自动启动**: 调度器在 `MainSystem.initialize()` 中自动启动,无需手动初始化 + +### 记忆系统架构 +**双轨记忆**: +- **Memory Graph** (`src/memory_graph/`): 基于图的持久记忆(人物、事件、关系) +- **Chat Memory** (`src/chat/memory_system/`): 会话上下文记忆 + +**兴趣值系统** (`src/chat/interest_system/`): +- 通过插件自动注册 `InterestCalculator` 组件 +- 支持主题聚类、时间衰减、动态权重 +- 影响 AFC (Affinity Flow Chatter) 对话策略 + +**关系系统** (`src/person_info/`): +- 亲密度值影响回复风格和语气 +- 与兴趣值系统协同工作 + +## 🛠️ 开发工作流 + +### 环境管理 +**首选**: `uv` 包管理器(配置清华镜像) +```powershell +uv venv +uv pip install -r requirements.txt +``` + +**环境配置**: +1. 复制 `template/template.env` → `.env` +2. 设置 `EULA_CONFIRMED=true` +3. 编辑 `config/bot_config.toml` 和 `config/model_config.toml` + +### 代码质量 +**Linter**: Ruff(配置在 `pyproject.toml`) +```powershell +ruff check . # 检查 +ruff format . # 格式化 +``` + +**规范**: +- 行长度: 120 字符 +- 引号: 双引号 +- 类型提示: 推荐使用(尤其是公共 API) +- 异步优先: 所有 I/O 操作使用 `async/await` + +### 日志系统 +**位置**: `src/common/logger.py` + +**使用模式**: +```python +from src.common.logger import get_logger + +logger = get_logger("module_name") +logger.info("信息") +logger.error("错误", exc_info=True) # 包含堆栈跟踪 +``` + +**日志级别**: 通过 `bot_config.toml` 的 `[logging]` 配置 + +### 运行与调试 +**启动命令**: +```powershell +python bot.py # 标准启动 +python __main__.py # 备用入口 +``` + +**WebUI 开发**: +- WebUI 位于同级目录 `webui/` 或 `../webui` +- 自动通过 `npm run dev` 启动(可在 `.env` 设置 `WEBUI_DIR`) +- 超时 60 秒检测是否成功 + +**调试技巧**: +- 检查 `logs/app_*.jsonl` 结构化日志 +- 使用 `get_errors()` 工具查看编译错误 +- 数据库问题:查看 `data/MaiBot.db`(SQLite)或 MySQL 连接 + +## 📋 关键约定与模式 + +### 配置管理 +**全局配置**: `src/config/config.py` 的 `global_config` 单例 +- 通过 TOML 文件驱动(`config/bot_config.toml`) +- 支持环境变量覆盖(`.env`) +- 数据库类型切换:`database.database_type = "sqlite" | "mysql"` + +### 事件系统 +**Event Manager** (`src/plugin_system/core/event_manager.py`): +```python +from src.plugin_system.core.event_manager import event_manager +from src.plugin_system.base.component_types import EventType + +await event_manager.trigger_event( + EventType.ON_MESSAGE_RECEIVED, + message_data=data, + permission_group="USER" +) +``` + +**常用事件**: +- `ON_START` / `ON_STOP`: 系统生命周期 +- `ON_MESSAGE_RECEIVED`: 消息接收 +- `ON_PLUGIN_LOADED` / `ON_PLUGIN_UNLOADED`: 插件生命周期 + +### 消息处理 +**核心类**: `ChatBot` (`src/chat/message_receive/bot.py`) +- 消息通过 `_message_process_wrapper` 异步并行处理 +- 使用 `MessageStorageBatcher` 批量存储(`src/chat/message_receive/storage.py`) +- 消息分块重组: `MessageReassembler` (`src/utils/message_chunker.py`) + +### 批量操作最佳实践 +**场景**: 需要保存大量数据库记录 +```python +from src.common.database.optimization.batch_scheduler import get_batch_scheduler + +scheduler = get_batch_scheduler() +await scheduler.schedule_batch_insert(model_class, data_list) +``` + +### 权限系统 +**检查权限**: +```python +from src.plugin_system.apis.permission_api import permission_api + +has_permission = await permission_api.check_permission( + user_id="123456", + platform="qq", + permission_node="plugin.my_plugin.admin" +) +``` + +**Master 用户**: 在 `bot_config.toml` 的 `[permission.master_users]` 配置 + +## 🔍 常见问题与陷阱 + +### 数据库相关 +❌ **错误**: 直接创建 `Session` 对象 +✅ **正确**: 使用 `CRUDBase` 或 `QueryBuilder` API + +❌ **错误**: 循环中逐条插入 +✅ **正确**: 使用 `AdaptiveBatchScheduler` 批量插入 + +### 插件开发 +❌ **错误**: 在 `__init__` 中执行异步操作 +✅ **正确**: 在 `on_plugin_loaded()` 中执行异步初始化 + +❌ **错误**: 硬编码配置值 +✅ **正确**: 使用 `self.plugin_config` 读取配置 + +### 性能优化 +⚠️ **避免**: 在主事件循环中阻塞 I/O +✅ **使用**: `asyncio.to_thread()` 或 `loop.run_in_executor()` + +⚠️ **避免**: 频繁的小查询 +✅ **使用**: 预加载、缓存或批量查询 + +## 📚 关键文档参考 + +- **插件开发**: `docs/plugins/quick-start.md` +- **数据库架构**: `docs/database_refactoring_completion.md` +- **统一调度器**: `docs/unified_scheduler_guide.md` +- **记忆图谱**: `docs/memory_graph_guide.md` +- **部署指南**: `docs/deployment_guide.md` +- **配置说明**: 在线文档 https://mofox-studio.github.io/MoFox-Bot-Docs/ + +## 🎯 快速定位关键文件 + +| 功能域 | 入口文件 | +|--------|----------| +| 主系统 | `src/main.py` | +| 插件管理器 | `src/plugin_system/core/plugin_manager.py` | +| 数据库 API | `src/common/database/api/crud.py` | +| 消息处理 | `src/chat/message_receive/bot.py` | +| LLM 集成 | `src/llm_models/model_client/` | +| 配置系统 | `src/config/config.py` | +| 日志系统 | `src/common/logger.py` | + +--- + +**项目特色**: 本项目集成了 MCP (Model Context Protocol) 支持、Affinity Flow Chatter 智能对话、视频分析、日程管理等独特功能。探索 `src/plugins/built_in/` 查看内置插件示例。 diff --git a/src/memory_graph/core/builder.py b/src/memory_graph/core/builder.py index 26a94d019..f8607e583 100644 --- a/src/memory_graph/core/builder.py +++ b/src/memory_graph/core/builder.py @@ -126,6 +126,9 @@ class MemoryBuilder: edges.extend(attr_edges) # 6. 构建 Memory 对象 + # 新记忆应该有较高的初始激活度 + initial_activation = 0.75 # 新记忆初始激活度为 0.75 + memory = Memory( id=memory_id, subject_id=subject_node.id, @@ -133,6 +136,7 @@ class MemoryBuilder: nodes=nodes, edges=edges, importance=extracted_params["importance"], + activation=initial_activation, # 设置较高的初始激活度 created_at=extracted_params["timestamp"], last_accessed=extracted_params["timestamp"], access_count=0, @@ -140,6 +144,12 @@ class MemoryBuilder: metadata={ "subject": extracted_params["subject"], "topic": extracted_params["topic"], + "activation": { + "level": initial_activation, + "last_access": extracted_params["timestamp"].isoformat(), + "access_count": 0, + "created_at": extracted_params["timestamp"].isoformat(), + }, }, ) diff --git a/src/memory_graph/manager.py b/src/memory_graph/manager.py index ef7305efa..f9bf5ed6a 100644 --- a/src/memory_graph/manager.py +++ b/src/memory_graph/manager.py @@ -78,7 +78,7 @@ class MemoryManager: self._last_maintenance = datetime.now() self._maintenance_task: asyncio.Task | None = None self._maintenance_interval_hours = getattr(self.config, "consolidation_interval_hours", 1.0) - self._maintenance_schedule_id: str | None = None # 调度任务ID + self._maintenance_running = False # 维护任务运行状态 logger.info(f"记忆管理器已创建 (data_dir={self.data_dir}, enable={getattr(self.config, 'enable', False)})") @@ -155,8 +155,8 @@ class MemoryManager: self._initialized = True logger.info("✅ 记忆管理器初始化完成") - # 启动后台维护调度任务 - await self.start_maintenance_scheduler() + # 启动后台维护任务 + self._start_maintenance_task() except Exception as e: logger.error(f"记忆管理器初始化失败: {e}", exc_info=True) @@ -178,8 +178,8 @@ class MemoryManager: try: logger.info("正在关闭记忆管理器...") - # 1. 停止调度任务 - await self.stop_maintenance_scheduler() + # 1. 停止维护任务 + await self._stop_maintenance_task() # 2. 执行最后一次维护(保存数据) if self.graph_store and self.persistence: @@ -867,12 +867,19 @@ class MemoryManager: max_expanded=max_expanded, ) - async def forget_memory(self, memory_id: str) -> bool: + async def forget_memory(self, memory_id: str, cleanup_orphans: bool = True) -> bool: """ - 遗忘记忆(标记为已遗忘,不删除) + 遗忘记忆(直接删除) + + 这个方法会: + 1. 从向量存储中删除节点的嵌入向量 + 2. 从图存储中删除记忆 + 3. 可选:清理孤立节点(建议批量遗忘后统一清理) + 4. 保存更新后的数据 Args: memory_id: 记忆 ID + cleanup_orphans: 是否立即清理孤立节点(默认True,批量遗忘时设为False) Returns: 是否遗忘成功 @@ -886,13 +893,36 @@ class MemoryManager: logger.warning(f"记忆不存在: {memory_id}") return False - memory.metadata["forgotten"] = True - memory.metadata["forgotten_at"] = datetime.now().isoformat() + # 1. 从向量存储删除节点的嵌入向量 + deleted_vectors = 0 + for node in memory.nodes: + if node.embedding is not None: + try: + await self.vector_store.delete_node(node.id) + deleted_vectors += 1 + except Exception as e: + logger.warning(f"删除节点向量失败 {node.id}: {e}") - # 保存更新 - await self.persistence.save_graph_store(self.graph_store) - logger.info(f"记忆已遗忘: {memory_id}") - return True + # 2. 从图存储删除记忆 + success = self.graph_store.remove_memory(memory_id, cleanup_orphans=False) + + if success: + # 3. 可选:清理孤立节点 + if cleanup_orphans: + orphan_nodes, orphan_edges = await self._cleanup_orphan_nodes_and_edges() + logger.info( + f"记忆已遗忘并删除: {memory_id} " + f"(删除了 {deleted_vectors} 个向量, 清理了 {orphan_nodes} 个孤立节点, {orphan_edges} 条孤立边)" + ) + else: + logger.debug(f"记忆已删除: {memory_id} (删除了 {deleted_vectors} 个向量)") + + # 4. 保存更新 + await self.persistence.save_graph_store(self.graph_store) + return True + else: + logger.error(f"从图存储删除记忆失败: {memory_id}") + return False except Exception as e: logger.error(f"遗忘记忆失败: {e}", exc_info=True) @@ -900,7 +930,12 @@ class MemoryManager: async def auto_forget_memories(self, threshold: float = 0.1) -> int: """ - 自动遗忘低激活度的记忆 + 自动遗忘低激活度的记忆(批量优化版) + + 应用时间衰减公式计算当前激活度,低于阈值则遗忘。 + 衰减公式:activation = base_activation * (decay_rate ^ days_passed) + + 优化:批量删除记忆后统一清理孤立节点,减少重复检查 Args: threshold: 激活度阈值 @@ -914,41 +949,145 @@ class MemoryManager: try: forgotten_count = 0 all_memories = self.graph_store.get_all_memories() + + # 获取配置参数 + min_importance = getattr(self.config, "forgetting_min_importance", 0.8) + decay_rate = getattr(self.config, "activation_decay_rate", 0.9) + + # 收集需要遗忘的记忆ID + memories_to_forget = [] for memory in all_memories: # 跳过已遗忘的记忆 if memory.metadata.get("forgotten", False): continue - # 跳过高重要性记忆 - min_importance = getattr(self.config, "forgetting_min_importance", 7.0) + # 跳过高重要性记忆(保护重要记忆不被遗忘) if memory.importance >= min_importance: continue - # 计算当前激活度 + # 计算当前激活度(应用时间衰减) activation_info = memory.metadata.get("activation", {}) + base_activation = activation_info.get("level", memory.activation) last_access = activation_info.get("last_access") if last_access: - last_access_dt = datetime.fromisoformat(last_access) - days_passed = (datetime.now() - last_access_dt).days + try: + last_access_dt = datetime.fromisoformat(last_access) + days_passed = (datetime.now() - last_access_dt).days + + # 应用指数衰减:activation = base * (decay_rate ^ days) + current_activation = base_activation * (decay_rate ** days_passed) + + logger.debug( + f"记忆 {memory.id[:8]}: 基础激活度={base_activation:.3f}, " + f"经过{days_passed}天衰减后={current_activation:.3f}" + ) + except (ValueError, TypeError) as e: + logger.warning(f"解析时间失败: {e}, 使用基础激活度") + current_activation = base_activation + else: + # 没有访问记录,使用基础激活度 + current_activation = base_activation - # 长时间未访问的记忆,应用时间衰减 - decay_factor = 0.9 ** days_passed - current_activation = activation_info.get("level", 0.0) * decay_factor + # 低于阈值则标记为待遗忘 + if current_activation < threshold: + memories_to_forget.append((memory.id, current_activation)) + logger.debug( + f"标记遗忘 {memory.id[:8]}: 激活度={current_activation:.3f} < 阈值={threshold:.3f}" + ) - # 低于阈值则遗忘 - if current_activation < threshold: - await self.forget_memory(memory.id) + # 批量遗忘记忆(不立即清理孤立节点) + if memories_to_forget: + logger.info(f"开始批量遗忘 {len(memories_to_forget)} 条记忆...") + + for memory_id, activation in memories_to_forget: + # cleanup_orphans=False:暂不清理孤立节点 + success = await self.forget_memory(memory_id, cleanup_orphans=False) + if success: forgotten_count += 1 + + # 统一清理孤立节点和边 + logger.info("批量遗忘完成,开始统一清理孤立节点和边...") + orphan_nodes, orphan_edges = await self._cleanup_orphan_nodes_and_edges() + + # 保存最终更新 + await self.persistence.save_graph_store(self.graph_store) + + logger.info( + f"✅ 自动遗忘完成: 遗忘了 {forgotten_count} 条记忆, " + f"清理了 {orphan_nodes} 个孤立节点, {orphan_edges} 条孤立边" + ) + else: + logger.info("✅ 自动遗忘完成: 没有需要遗忘的记忆") - logger.info(f"自动遗忘完成: 遗忘了 {forgotten_count} 条记忆") return forgotten_count except Exception as e: logger.error(f"自动遗忘失败: {e}", exc_info=True) return 0 + async def _cleanup_orphan_nodes_and_edges(self) -> tuple[int, int]: + """ + 清理孤立节点和边 + + 孤立节点:不再属于任何记忆的节点 + 孤立边:连接到已删除节点的边 + + Returns: + (清理的孤立节点数, 清理的孤立边数) + """ + try: + orphan_nodes_count = 0 + orphan_edges_count = 0 + + # 1. 清理孤立节点 + # graph_store.node_to_memories 记录了每个节点属于哪些记忆 + nodes_to_remove = [] + + for node_id, memory_ids in list(self.graph_store.node_to_memories.items()): + # 如果节点不再属于任何记忆,标记为删除 + if not memory_ids: + nodes_to_remove.append(node_id) + + # 从图中删除孤立节点 + for node_id in nodes_to_remove: + if self.graph_store.graph.has_node(node_id): + self.graph_store.graph.remove_node(node_id) + orphan_nodes_count += 1 + + # 从映射中删除 + if node_id in self.graph_store.node_to_memories: + del self.graph_store.node_to_memories[node_id] + + # 2. 清理孤立边(指向已删除节点的边) + edges_to_remove = [] + + for source, target, edge_id in self.graph_store.graph.edges(data='edge_id'): + # 检查边的源节点和目标节点是否还存在于node_to_memories中 + if source not in self.graph_store.node_to_memories or \ + target not in self.graph_store.node_to_memories: + edges_to_remove.append((source, target)) + + # 删除孤立边 + for source, target in edges_to_remove: + try: + self.graph_store.graph.remove_edge(source, target) + orphan_edges_count += 1 + except Exception as e: + logger.debug(f"删除边失败 {source} -> {target}: {e}") + + if orphan_nodes_count > 0 or orphan_edges_count > 0: + logger.info( + f"清理完成: {orphan_nodes_count} 个孤立节点, {orphan_edges_count} 条孤立边" + ) + + return orphan_nodes_count, orphan_edges_count + + except Exception as e: + logger.error(f"清理孤立节点和边失败: {e}", exc_info=True) + return 0, 0 + # ==================== 统计与维护 ==================== def get_statistics(self) -> dict[str, Any]: @@ -1043,7 +1182,14 @@ class MemoryManager: max_batch_size: int, ) -> None: """ - 后台执行记忆整理的具体实现 + 后台执行记忆整理的具体实现 (完整版) + + 流程: + 1. 获取时间窗口内的记忆 + 2. 重要性过滤 + 3. 向量检索关联记忆 + 4. 分批交给LLM分析关系 + 5. 统一更新记忆数据 这个方法会在独立任务中运行,不阻塞主流程 """ @@ -1052,9 +1198,11 @@ class MemoryManager: "merged_count": 0, "checked_count": 0, "skipped_count": 0, + "linked_count": 0, + "importance_filtered": 0, } - # 获取最近创建的记忆 + # ===== 步骤1: 获取时间窗口内的记忆 ===== cutoff_time = datetime.now() - timedelta(hours=time_window_hours) all_memories = self.graph_store.get_all_memories() @@ -1067,18 +1215,37 @@ class MemoryManager: logger.info("✅ 记忆整理完成: 没有需要整理的记忆") return + logger.info(f"📋 步骤1: 找到 {len(recent_memories)} 条时间窗口内的记忆") + + # ===== 步骤2: 重要性过滤 ===== + min_importance_for_consolidation = getattr(self.config, "consolidation_min_importance", 0.3) + important_memories = [ + mem for mem in recent_memories + if mem.importance >= min_importance_for_consolidation + ] + + result["importance_filtered"] = len(recent_memories) - len(important_memories) + logger.info( + f"📊 步骤2: 重要性过滤 (阈值={min_importance_for_consolidation:.2f}): " + f"{len(recent_memories)} → {len(important_memories)} 条记忆" + ) + + if not important_memories: + logger.info("✅ 记忆整理完成: 没有重要的记忆需要整理") + return + # 限制批量处理数量 - if len(recent_memories) > max_batch_size: - logger.info(f"📊 记忆数量 {len(recent_memories)} 超过批量限制 {max_batch_size},仅处理最新的 {max_batch_size} 条") - recent_memories = sorted(recent_memories, key=lambda m: m.created_at, reverse=True)[:max_batch_size] - result["skipped_count"] = len(all_memories) - max_batch_size + if len(important_memories) > max_batch_size: + logger.info(f"📊 记忆数量 {len(important_memories)} 超过批量限制 {max_batch_size},仅处理最新的 {max_batch_size} 条") + important_memories = sorted(important_memories, key=lambda m: m.created_at, reverse=True)[:max_batch_size] + result["skipped_count"] = len(important_memories) - max_batch_size - logger.info(f"📋 找到 {len(recent_memories)} 条待整理记忆") - result["checked_count"] = len(recent_memories) + result["checked_count"] = len(important_memories) + # ===== 步骤3: 去重(相似记忆合并)===== # 按记忆类型分组,减少跨类型比较 memories_by_type: dict[str, list[Memory]] = {} - for mem in recent_memories: + for mem in important_memories: mem_type = mem.metadata.get("memory_type", "") if mem_type not in memories_by_type: memories_by_type[mem_type] = [] @@ -1088,7 +1255,8 @@ class MemoryManager: to_delete: list[tuple[Memory, str]] = [] # (memory, reason) deleted_ids = set() - # 对每个类型的记忆进行相似度检测 + # 对每个类型的记忆进行相似度检测(去重) + logger.info("📍 步骤3: 开始相似记忆去重...") for mem_type, memories in memories_by_type.items(): if len(memories) < 2: continue @@ -1106,7 +1274,6 @@ class MemoryManager: valid_memories.append(mem) # 批量计算相似度矩阵(比逐个计算更高效) - for i in range(len(valid_memories)): # 更频繁的协作式多任务让出 if i % 5 == 0: @@ -1158,7 +1325,7 @@ class MemoryManager: # 批量删除标记的记忆 if to_delete: - logger.info(f"🗑️ 开始批量删除 {len(to_delete)} 条相似记忆") + logger.info(f"🗑️ 批量删除 {len(to_delete)} 条相似记忆") for memory, reason in to_delete: try: @@ -1175,7 +1342,118 @@ class MemoryManager: # 批量保存(一次性写入,减少I/O) await self.persistence.save_graph_store(self.graph_store) - logger.info("💾 批量保存完成") + logger.info("💾 去重保存完成") + + # ===== 步骤4: 向量检索关联记忆 + LLM分析关系 ===== + # 过滤掉已删除的记忆 + remaining_memories = [m for m in important_memories if m.id not in deleted_ids] + + if not remaining_memories: + logger.info("✅ 记忆整理完成: 去重后无剩余记忆") + return + + logger.info(f"📍 步骤4: 开始关联分析 ({len(remaining_memories)} 条记忆)...") + + # 分批处理记忆关联 + llm_batch_size = getattr(self.config, "consolidation_llm_batch_size", 10) + max_candidates_per_memory = getattr(self.config, "consolidation_max_candidates", 5) + min_confidence = getattr(self.config, "consolidation_min_confidence", 0.6) + + all_new_edges = [] # 收集所有新建的边 + + for batch_start in range(0, len(remaining_memories), llm_batch_size): + batch_end = min(batch_start + llm_batch_size, len(remaining_memories)) + batch = remaining_memories[batch_start:batch_end] + + logger.debug(f"处理批次 {batch_start//llm_batch_size + 1}/{(len(remaining_memories)-1)//llm_batch_size + 1}") + + for memory in batch: + # 跳过已经有很多连接的记忆 + existing_edges = len([ + e for e in memory.edges + if e.edge_type == EdgeType.RELATION + ]) + if existing_edges >= 10: + continue + + # 使用向量搜索找候选关联记忆 + candidates = await self._find_link_candidates( + memory, + exclude_ids={memory.id} | deleted_ids, + max_results=max_candidates_per_memory + ) + + if not candidates: + continue + + # 使用LLM分析关系 + relations = await self._analyze_memory_relations( + source_memory=memory, + candidate_memories=candidates, + min_confidence=min_confidence + ) + + # 建立关联边 + for relation in relations: + try: + # 创建关联边 + edge = MemoryEdge( + id=f"edge_{uuid.uuid4().hex[:12]}", + source_id=memory.subject_id, + target_id=relation["target_memory"].subject_id, + relation=relation["relation_type"], + edge_type=EdgeType.RELATION, + importance=relation["confidence"], + metadata={ + "auto_linked": True, + "confidence": relation["confidence"], + "reasoning": relation["reasoning"], + "created_at": datetime.now().isoformat(), + "created_by": "consolidation", + } + ) + + all_new_edges.append((memory, edge, relation)) + result["linked_count"] += 1 + + except Exception as e: + logger.warning(f"创建关联边失败: {e}") + continue + + # 每个批次后让出控制权 + await asyncio.sleep(0.01) + + # ===== 步骤5: 统一更新记忆数据 ===== + if all_new_edges: + logger.info(f"📍 步骤5: 统一更新 {len(all_new_edges)} 条新关联边...") + + for memory, edge, relation in all_new_edges: + try: + # 添加到图 + self.graph_store.graph.add_edge( + edge.source_id, + edge.target_id, + edge_id=edge.id, + relation=edge.relation, + edge_type=edge.edge_type.value, + importance=edge.importance, + metadata=edge.metadata, + ) + + # 同时添加到记忆的边列表 + memory.edges.append(edge) + + logger.debug( + f"✓ {memory.id[:8]} --[{relation['relation_type']}]--> " + f"{relation['target_memory'].id[:8]} (置信度={relation['confidence']:.2f})" + ) + + except Exception as e: + logger.warning(f"添加边到图失败: {e}") + + # 批量保存更新 + await self.persistence.save_graph_store(self.graph_store) + logger.info("💾 关联边保存完成") logger.info(f"✅ 记忆整理完成: {result}") @@ -1917,11 +2195,11 @@ class MemoryManager: logger.error(f"LLM批量关系分析失败: {e}", exc_info=True) return [] - async def start_maintenance_scheduler(self) -> None: + def _start_maintenance_task(self) -> None: """ - 启动记忆维护调度任务 + 启动记忆维护后台任务 - 使用 unified_scheduler 定期执行维护任务: + 直接创建async task,避免使用scheduler阻塞主程序: - 记忆整合(合并相似记忆) - 自动遗忘低激活度记忆 - 保存数据 @@ -1929,57 +2207,96 @@ class MemoryManager: 默认间隔:1小时 """ try: - from src.schedule.unified_scheduler import TriggerType, unified_scheduler + # 如果已有维护任务,先停止 + if self._maintenance_task and not self._maintenance_task.done(): + self._maintenance_task.cancel() + logger.info("取消旧的维护任务") - # 如果已有调度任务,先移除 - if self._maintenance_schedule_id: - await unified_scheduler.remove_schedule(self._maintenance_schedule_id) - logger.info("移除旧的维护调度任务") - - # 创建新的调度任务 - interval_seconds = self._maintenance_interval_hours * 3600 - - self._maintenance_schedule_id = await unified_scheduler.create_schedule( - callback=self.maintenance, - trigger_type=TriggerType.TIME, - trigger_config={ - "delay_seconds": interval_seconds, # 首次延迟(启动后1小时) - "interval_seconds": interval_seconds, # 循环间隔 - }, - is_recurring=True, - task_name="memory_maintenance", + # 创建新的后台维护任务 + self._maintenance_task = asyncio.create_task( + self._maintenance_loop(), + name="memory_maintenance_loop" ) logger.info( - f"✅ 记忆维护调度任务已启动 " - f"(间隔={self._maintenance_interval_hours}小时, " - f"schedule_id={self._maintenance_schedule_id[:8]}...)" + f"✅ 记忆维护后台任务已启动 " + f"(间隔={self._maintenance_interval_hours}小时)" ) - except ImportError: - logger.warning("无法导入 unified_scheduler,维护调度功能不可用") except Exception as e: - logger.error(f"启动维护调度任务失败: {e}", exc_info=True) + logger.error(f"启动维护后台任务失败: {e}", exc_info=True) - async def stop_maintenance_scheduler(self) -> None: + async def _stop_maintenance_task(self) -> None: """ - 停止记忆维护调度任务 + 停止记忆维护后台任务 """ - if not self._maintenance_schedule_id: + if not self._maintenance_task or self._maintenance_task.done(): return try: - from src.schedule.unified_scheduler import unified_scheduler + self._maintenance_running = False # 设置停止标志 + self._maintenance_task.cancel() - success = await unified_scheduler.remove_schedule(self._maintenance_schedule_id) - if success: - logger.info(f"✅ 记忆维护调度任务已停止 (schedule_id={self._maintenance_schedule_id[:8]}...)") - else: - logger.warning(f"停止维护调度任务失败 (schedule_id={self._maintenance_schedule_id[:8]}...)") + try: + await self._maintenance_task + except asyncio.CancelledError: + logger.debug("维护任务已取消") - self._maintenance_schedule_id = None + logger.info("✅ 记忆维护后台任务已停止") + self._maintenance_task = None - except ImportError: - logger.warning("无法导入 unified_scheduler") except Exception as e: - logger.error(f"停止维护调度任务失败: {e}", exc_info=True) + logger.error(f"停止维护后台任务失败: {e}", exc_info=True) + + async def _maintenance_loop(self) -> None: + """ + 记忆维护循环 + + 在后台独立运行,定期执行维护任务,避免阻塞主程序 + """ + self._maintenance_running = True + + try: + # 首次执行延迟(启动后1小时) + initial_delay = self._maintenance_interval_hours * 3600 + logger.debug(f"记忆维护任务将在 {initial_delay} 秒后首次执行") + + while self._maintenance_running: + try: + # 使用 asyncio.wait_for 来支持取消 + await asyncio.wait_for( + asyncio.sleep(initial_delay), + timeout=float('inf') # 允许随时取消 + ) + + # 检查是否仍然需要运行 + if not self._maintenance_running: + break + + # 执行维护任务(使用try-catch避免崩溃) + try: + await self.maintenance() + except Exception as e: + logger.error(f"维护任务执行失败: {e}", exc_info=True) + + # 后续执行使用相同间隔 + initial_delay = self._maintenance_interval_hours * 3600 + + except asyncio.CancelledError: + logger.debug("维护循环被取消") + break + except Exception as e: + logger.error(f"维护循环发生异常: {e}", exc_info=True) + # 异常后等待较短时间再重试 + try: + await asyncio.sleep(300) # 5分钟后重试 + except asyncio.CancelledError: + break + + except asyncio.CancelledError: + logger.debug("维护循环完全退出") + except Exception as e: + logger.error(f"维护循环意外结束: {e}", exc_info=True) + finally: + self._maintenance_running = False + logger.debug("维护循环已清理完毕") diff --git a/src/memory_graph/storage/graph_store.py b/src/memory_graph/storage/graph_store.py index 1e1a9a91f..93e30cb83 100644 --- a/src/memory_graph/storage/graph_store.py +++ b/src/memory_graph/storage/graph_store.py @@ -459,12 +459,13 @@ class GraphStore: logger.info("已将图中的边同步到 Memory.edges(保证 graph 与 memory 对象一致)") - def remove_memory(self, memory_id: str) -> bool: + def remove_memory(self, memory_id: str, cleanup_orphans: bool = True) -> bool: """ 从图中删除指定记忆 Args: memory_id: 要删除的记忆ID + cleanup_orphans: 是否立即清理孤立节点(默认True,批量删除时设为False) Returns: 是否删除成功 @@ -481,16 +482,19 @@ class GraphStore: for node in memory.nodes: if node.id in self.node_to_memories: self.node_to_memories[node.id].discard(memory_id) - # 如果该节点不再属于任何记忆,从图中移除节点 - if not self.node_to_memories[node.id]: - if self.graph.has_node(node.id): - self.graph.remove_node(node.id) - del self.node_to_memories[node.id] + + # 可选:立即清理孤立节点 + if cleanup_orphans: + # 如果该节点不再属于任何记忆,从图中移除节点 + if not self.node_to_memories[node.id]: + if self.graph.has_node(node.id): + self.graph.remove_node(node.id) + del self.node_to_memories[node.id] # 3. 从记忆索引中移除 del self.memory_index[memory_id] - logger.info(f"成功删除记忆: {memory_id}") + logger.debug(f"成功删除记忆: {memory_id}") return True except Exception as e: diff --git a/src/memory_graph/utils/graph_expansion.py b/src/memory_graph/utils/graph_expansion.py index 45824904b..63c90cd16 100644 --- a/src/memory_graph/utils/graph_expansion.py +++ b/src/memory_graph/utils/graph_expansion.py @@ -1,9 +1,15 @@ """ -图扩展工具 +图扩展工具(优化版) -提供记忆图的扩展算法,用于从初始记忆集合沿图结构扩展查找相关记忆 +提供记忆图的扩展算法,用于从初始记忆集合沿图结构扩展查找相关记忆。 +优化重点: +1. 改进BFS遍历效率 +2. 批量向量检索,减少数据库调用 +3. 早停机制,避免不必要的扩展 +4. 更清晰的日志输出 """ +import asyncio from typing import TYPE_CHECKING from src.common.logger import get_logger @@ -28,10 +34,16 @@ async def expand_memories_with_semantic_filter( max_expanded: int = 20, ) -> list[tuple[str, float]]: """ - 从初始记忆集合出发,沿图结构扩展,并用语义相似度过滤 + 从初始记忆集合出发,沿图结构扩展,并用语义相似度过滤(优化版) 这个方法解决了纯向量搜索可能遗漏的"语义相关且图结构相关"的记忆。 + 优化改进: + - 使用记忆级别的BFS,而非节点级别(更直接) + - 批量获取邻居记忆,减少遍历次数 + - 早停机制:达到max_expanded后立即停止 + - 更详细的调试日志 + Args: graph_store: 图存储 vector_store: 向量存储 @@ -48,103 +60,137 @@ async def expand_memories_with_semantic_filter( return [] try: + import time + start_time = time.time() + # 记录已访问的记忆,避免重复 visited_memories = set(initial_memory_ids) # 记录扩展的记忆及其分数 expanded_memories: dict[str, float] = {} - # BFS扩展 - current_level = initial_memory_ids + # BFS扩展(基于记忆而非节点) + current_level_memories = initial_memory_ids + depth_stats = [] # 每层统计 for depth in range(max_depth): - next_level = [] + next_level_memories = [] + candidates_checked = 0 + candidates_passed = 0 - for memory_id in current_level: + logger.debug(f"🔍 图扩展 - 深度 {depth+1}/{max_depth}, 当前层记忆数: {len(current_level_memories)}") + + # 遍历当前层的记忆 + for memory_id in current_level_memories: memory = graph_store.get_memory_by_id(memory_id) if not memory: continue - # 遍历该记忆的所有节点 - for node in memory.nodes: - if not node.has_embedding(): + # 获取该记忆的邻居记忆(通过边关系) + neighbor_memory_ids = set() + + # 遍历记忆的所有边,收集邻居记忆 + for edge in memory.edges: + # 获取边的目标节点 + target_node_id = edge.target_id + source_node_id = edge.source_id + + # 通过节点找到其他记忆 + for node_id in [target_node_id, source_node_id]: + if node_id in graph_store.node_to_memories: + neighbor_memory_ids.update(graph_store.node_to_memories[node_id]) + + # 过滤掉已访问的和自己 + neighbor_memory_ids.discard(memory_id) + neighbor_memory_ids -= visited_memories + + # 批量评估邻居记忆 + for neighbor_mem_id in neighbor_memory_ids: + candidates_checked += 1 + + neighbor_memory = graph_store.get_memory_by_id(neighbor_mem_id) + if not neighbor_memory: continue - # 获取邻居节点 - try: - neighbors = list(graph_store.graph.neighbors(node.id)) - except Exception: + # 获取邻居记忆的主题节点向量 + topic_node = next( + (n for n in neighbor_memory.nodes if n.has_embedding()), + None + ) + + if not topic_node or topic_node.embedding is None: continue - for neighbor_id in neighbors: - # 获取邻居节点信息 - neighbor_node_data = graph_store.graph.nodes.get(neighbor_id) - if not neighbor_node_data: - continue + # 计算语义相似度 + semantic_sim = cosine_similarity(query_embedding, topic_node.embedding) - # 获取邻居节点的向量(从向量存储) - neighbor_vector_data = await vector_store.get_node_by_id(neighbor_id) - if not neighbor_vector_data or neighbor_vector_data.get("embedding") is None: - continue + # 计算边的重要性(影响评分) + edge_importance = neighbor_memory.importance * 0.5 # 使用记忆重要性作为边权重 - neighbor_embedding = neighbor_vector_data["embedding"] + # 综合评分:语义相似度(70%) + 重要性(20%) + 深度衰减(10%) + depth_decay = 1.0 / (depth + 2) # 深度衰减 + relevance_score = semantic_sim * 0.7 + edge_importance * 0.2 + depth_decay * 0.1 - # 计算与查询的语义相似度 - semantic_sim = cosine_similarity(query_embedding, neighbor_embedding) + # 只保留超过阈值的 + if relevance_score < semantic_threshold: + continue - # 获取边的权重 - try: - edge_data = graph_store.graph.get_edge_data(node.id, neighbor_id) - edge_importance = edge_data.get("importance", 0.5) if edge_data else 0.5 - except Exception: - edge_importance = 0.5 + candidates_passed += 1 - # 综合评分:语义相似度(70%) + 图结构权重(20%) + 深度衰减(10%) - depth_decay = 1.0 / (depth + 1) # 深度越深,权重越低 - relevance_score = semantic_sim * 0.7 + edge_importance * 0.2 + depth_decay * 0.1 + # 记录扩展的记忆 + if neighbor_mem_id not in expanded_memories: + expanded_memories[neighbor_mem_id] = relevance_score + visited_memories.add(neighbor_mem_id) + next_level_memories.append(neighbor_mem_id) + else: + # 如果已存在,取最高分 + expanded_memories[neighbor_mem_id] = max( + expanded_memories[neighbor_mem_id], relevance_score + ) - # 只保留超过阈值的节点 - if relevance_score < semantic_threshold: - continue + # 早停:达到最大扩展数量 + if len(expanded_memories) >= max_expanded: + logger.debug(f"⏹️ 提前停止:已达到最大扩展数量 {max_expanded}") + break + + # 早停检查 + if len(expanded_memories) >= max_expanded: + break + + # 记录本层统计 + depth_stats.append({ + "depth": depth + 1, + "checked": candidates_checked, + "passed": candidates_passed, + "expanded_total": len(expanded_memories) + }) - # 提取邻居节点所属的记忆 - neighbor_memory_ids = neighbor_node_data.get("memory_ids", []) - if isinstance(neighbor_memory_ids, str): - import json - - try: - neighbor_memory_ids = json.loads(neighbor_memory_ids) - except Exception: - neighbor_memory_ids = [neighbor_memory_ids] - - for neighbor_mem_id in neighbor_memory_ids: - if neighbor_mem_id in visited_memories: - continue - - # 记录这个扩展记忆 - if neighbor_mem_id not in expanded_memories: - expanded_memories[neighbor_mem_id] = relevance_score - visited_memories.add(neighbor_mem_id) - next_level.append(neighbor_mem_id) - else: - # 如果已存在,取最高分 - expanded_memories[neighbor_mem_id] = max( - expanded_memories[neighbor_mem_id], relevance_score - ) - - # 如果没有新节点或已达到数量限制,提前终止 - if not next_level or len(expanded_memories) >= max_expanded: + # 如果没有新记忆或已达到数量限制,提前终止 + if not next_level_memories or len(expanded_memories) >= max_expanded: + logger.debug(f"⏹️ 停止扩展:{'无新记忆' if not next_level_memories else '达到上限'}") break - current_level = next_level[:max_expanded] # 限制每层的扩展数量 + # 限制下一层的记忆数量,避免爆炸性增长 + current_level_memories = next_level_memories[:max_expanded] + + # 每层让出控制权 + await asyncio.sleep(0.001) # 排序并返回 sorted_results = sorted(expanded_memories.items(), key=lambda x: x[1], reverse=True)[:max_expanded] - + + elapsed = time.time() - start_time logger.info( - f"图扩展完成: 初始{len(initial_memory_ids)}个 → " + f"✅ 图扩展完成: 初始{len(initial_memory_ids)}个 → " f"扩展{len(sorted_results)}个新记忆 " - f"(深度={max_depth}, 阈值={semantic_threshold:.2f})" + f"(深度={max_depth}, 阈值={semantic_threshold:.2f}, 耗时={elapsed:.3f}s)" ) + + # 输出每层统计 + for stat in depth_stats: + logger.debug( + f" 深度{stat['depth']}: 检查{stat['checked']}个, " + f"通过{stat['passed']}个, 累计扩展{stat['expanded_total']}个" + ) return sorted_results