diff --git a/src/chat/message_manager/distribution_manager.py b/src/chat/message_manager/distribution_manager.py index cb52e5f14..3fe789f2c 100644 --- a/src/chat/message_manager/distribution_manager.py +++ b/src/chat/message_manager/distribution_manager.py @@ -121,7 +121,7 @@ async def conversation_loop( except asyncio.CancelledError: logger.info(f" [生成器] stream={stream_id[:8]}, 被取消") break - except Exception as e: # noqa: BLE001 + except Exception as e: logger.error(f" [生成器] stream={stream_id[:8]}, 出错: {e}") await asyncio.sleep(5.0) @@ -151,10 +151,10 @@ async def run_chat_stream( # 创建生成器 tick_generator = conversation_loop( stream_id=stream_id, - get_context_func=manager._get_stream_context, # noqa: SLF001 - calculate_interval_func=manager._calculate_interval, # noqa: SLF001 - flush_cache_func=manager._flush_cached_messages_to_unread, # noqa: SLF001 - check_force_dispatch_func=manager._needs_force_dispatch_for_context, # noqa: SLF001 + get_context_func=manager._get_stream_context, + calculate_interval_func=manager._calculate_interval, + flush_cache_func=manager._flush_cached_messages_to_unread, + check_force_dispatch_func=manager._needs_force_dispatch_for_context, is_running_func=lambda: manager.is_running, ) @@ -162,13 +162,13 @@ async def run_chat_stream( async for tick in tick_generator: try: # 获取上下文 - context = await manager._get_stream_context(stream_id) # noqa: SLF001 + context = await manager._get_stream_context(stream_id) if not context: continue # 并发保护:检查是否正在处理 if context.is_chatter_processing: - if manager._recover_stale_chatter_state(stream_id, context): # noqa: SLF001 + if manager._recover_stale_chatter_state(stream_id, context): logger.warning(f" [驱动器] stream={stream_id[:8]}, 处理标志残留已修复") else: logger.debug(f" [驱动器] stream={stream_id[:8]}, Chatter正在处理,跳过此Tick") @@ -182,7 +182,7 @@ async def run_chat_stream( # 更新能量值 try: - await manager._update_stream_energy(stream_id, context) # noqa: SLF001 + await manager._update_stream_energy(stream_id, context) except Exception as e: logger.debug(f"更新能量失败: {e}") @@ -191,7 +191,7 @@ async def run_chat_stream( try: async with manager._processing_semaphore: success = await asyncio.wait_for( - manager._process_stream_messages(stream_id, context), # noqa: SLF001 + manager._process_stream_messages(stream_id, context), global_config.chat.thinking_timeout, ) except asyncio.TimeoutError: @@ -209,7 +209,7 @@ async def run_chat_stream( except asyncio.CancelledError: raise - except Exception as e: # noqa: BLE001 + except Exception as e: logger.error(f" [驱动器] stream={stream_id[:8]}, 处理Tick时出错: {e}") manager.stats["total_failures"] += 1 @@ -222,7 +222,7 @@ async def run_chat_stream( if context and context.stream_loop_task: context.stream_loop_task = None logger.debug(f" [驱动器] stream={stream_id[:8]}, 清理任务记录") - except Exception as e: # noqa: BLE001 + except Exception as e: logger.debug(f"清理任务记录失败: {e}") diff --git a/src/chat/message_manager/message_manager.py b/src/chat/message_manager/message_manager.py index 91a9ed5db..252fed382 100644 --- a/src/chat/message_manager/message_manager.py +++ b/src/chat/message_manager/message_manager.py @@ -110,10 +110,10 @@ class MessageManager: if not (context.stream_loop_task and not context.stream_loop_task.done()): # 异步启动驱动器任务;避免在高并发下阻塞消息入队 await stream_loop_manager.start_stream_loop(stream_id) - + # 检查并处理消息打断 await self._check_and_handle_interruption(chat_stream, message) - + # 入队消息 await chat_stream.context.add_message(message) diff --git a/src/config/config.py b/src/config/config.py index efec21705..f1e9c4096 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -1,8 +1,8 @@ import os import shutil import sys -import typing import types +import typing from datetime import datetime from pathlib import Path from typing import Any, get_args, get_origin @@ -30,8 +30,8 @@ from src.config.official_configs import ( ExperimentalConfig, ExpressionConfig, InnerConfig, - LogConfig, KokoroFlowChatterConfig, + LogConfig, LPMMKnowledgeConfig, MemoryConfig, MessageBusConfig, @@ -515,7 +515,7 @@ class Config(ValidatedConfigBase): ) @property - def MMC_VERSION(self) -> str: # noqa: N802 + def MMC_VERSION(self) -> str: return MMC_VERSION diff --git a/src/memory_graph/long_term_manager.py b/src/memory_graph/long_term_manager.py index 91fe1ef9c..c554f8d3d 100644 --- a/src/memory_graph/long_term_manager.py +++ b/src/memory_graph/long_term_manager.py @@ -66,6 +66,13 @@ class LongTermMemoryManager: self._similar_memory_cache: dict[str, list[Memory]] = {} self._cache_max_size = 100 + # 错误/重试统计与配置 + self._max_process_retries = 2 + self._retry_backoff = 0.5 + self._total_processed = 0 + self._failed_single_memory_count = 0 + self._retry_attempts = 0 + logger.info( f"长期记忆管理器已创建 (batch_size={batch_size}, " f"search_top_k={search_top_k}, decay_factor={long_term_decay_factor:.2f})" @@ -202,6 +209,10 @@ class LongTermMemoryManager: else: result["failed_count"] += 1 + # 更新全局计数 + self._total_processed += result["processed_count"] + self._failed_single_memory_count += result["failed_count"] + # 处理完批次后,批量生成embeddings await self._flush_pending_embeddings() @@ -217,26 +228,45 @@ class LongTermMemoryManager: Returns: 处理结果或None(如果失败) """ - try: - # 步骤1: 在长期记忆中检索相似记忆 - similar_memories = await self._search_similar_long_term_memories(stm) + # 增加重试机制以应对 LLM/执行的临时失败 + attempt = 0 + last_exc: Exception | None = None + while attempt <= self._max_process_retries: + try: + # 步骤1: 在长期记忆中检索相似记忆 + similar_memories = await self._search_similar_long_term_memories(stm) - # 步骤2: LLM 决策如何更新图结构 - operations = await self._decide_graph_operations(stm, similar_memories) + # 步骤2: LLM 决策如何更新图结构 + operations = await self._decide_graph_operations(stm, similar_memories) - # 步骤3: 执行图操作 - success = await self._execute_graph_operations(operations, stm) + # 步骤3: 执行图操作 + success = await self._execute_graph_operations(operations, stm) - if success: - return { - "success": True, - "operations": [op.operation_type for op in operations] - } - return None + if success: + return { + "success": True, + "operations": [op.operation_type for op in operations] + } - except Exception as e: - logger.error(f"处理短期记忆 {stm.id} 失败: {e}") - return None + # 如果执行返回 False,视为一次失败,准备重试 + last_exc = RuntimeError("_execute_graph_operations 返回 False") + raise last_exc + + except Exception as e: + last_exc = e + attempt += 1 + if attempt <= self._max_process_retries: + self._retry_attempts += 1 + backoff = self._retry_backoff * attempt + logger.warning( + f"处理短期记忆 {stm.id} 时发生可恢复错误,重试 {attempt}/{self._max_process_retries},等待 {backoff}s: {e}" + ) + await asyncio.sleep(backoff) + continue + # 超过重试次数,记录失败并返回 None + logger.error(f"处理短期记忆 {stm.id} 最终失败: {last_exc}") + self._failed_single_memory_count += 1 + return None async def _search_similar_long_term_memories( self, stm: ShortTermMemory diff --git a/src/memory_graph/short_term_manager.py b/src/memory_graph/short_term_manager.py index 42ab076ef..763f1525d 100644 --- a/src/memory_graph/short_term_manager.py +++ b/src/memory_graph/short_term_manager.py @@ -648,15 +648,15 @@ class ShortTermMemoryManager: else: low_importance_memories.append(mem) - # 如果低重要性记忆数量超过了上限(说明积压严重) - # 我们需要清理掉一部分,而不是转移它们 - if len(low_importance_memories) > self.max_memories: + # 如果总体记忆数量超过了上限,优先清理低重要性最早创建的记忆 + if len(self.memories) > self.max_memories: # 目标保留数量(降至上限的 90%) target_keep_count = int(self.max_memories * 0.9) - num_to_remove = len(low_importance_memories) - target_keep_count + # 需要删除的数量(从当前总数降到 target_keep_count) + num_to_remove = len(self.memories) - target_keep_count - if num_to_remove > 0: - # 按创建时间排序,删除最早的 + if num_to_remove > 0 and low_importance_memories: + # 按创建时间排序,删除最早的低重要性记忆 low_importance_memories.sort(key=lambda x: x.created_at) to_remove = low_importance_memories[:num_to_remove] @@ -664,7 +664,7 @@ class ShortTermMemoryManager: remove_ids = {mem.id for mem in to_remove} self.memories = [mem for mem in self.memories if mem.id not in remove_ids] for mem_id in remove_ids: - del self._memory_id_index[mem_id] + self._memory_id_index.pop(mem_id, None) self._similarity_cache.pop(mem_id, None) logger.info( @@ -675,6 +675,16 @@ class ShortTermMemoryManager: # 触发保存 asyncio.create_task(self._save_to_disk()) + # 优先返回高重要性候选 + if candidates: + return candidates + + # 如果没有高重要性候选但总体超过上限,返回按创建时间最早的低重要性记忆作为后备转移候选 + if len(self.memories) > self.max_memories: + needed = len(self.memories) - self.max_memories + 1 + low_importance_memories.sort(key=lambda x: x.created_at) + return low_importance_memories[:needed] + return candidates async def clear_transferred_memories(self, memory_ids: list[str]) -> None: