refactor(memory): 移除废弃的记忆系统备份文件,优化消息管理器架构

移除了deprecated_backup目录下的所有废弃记忆系统文件,包括增强记忆适配器、钩子、集成层、重排序器、元数据索引、多阶段检索和向量存储等模块。同时优化了消息管理器,集成了批量数据库写入器、流缓存管理器和自适应流管理器,提升了系统性能和可维护性。
This commit is contained in:
Windpicker-owo
2025-10-04 01:38:41 +08:00
parent 1eb41f8372
commit 04e7776a45
16 changed files with 1975 additions and 5203 deletions

View File

@@ -23,6 +23,8 @@ class StreamLoopManager:
def __init__(self, max_concurrent_streams: int | None = None):
# 流循环任务管理
self.stream_loops: dict[str, asyncio.Task] = {}
# 跟踪流使用的管理器类型
self.stream_management_type: dict[str, str] = {} # stream_id -> "adaptive" or "fallback"
# 统计信息
self.stats: dict[str, Any] = {
@@ -99,7 +101,7 @@ class StreamLoopManager:
logger.info("流循环管理器已停止")
async def start_stream_loop(self, stream_id: str, force: bool = False) -> bool:
"""启动指定流的循环任务
"""启动指定流的循环任务 - 优化版本使用自适应管理器
Args:
stream_id: 流ID
@@ -113,6 +115,71 @@ class StreamLoopManager:
logger.debug(f"{stream_id} 循环已在运行")
return True
# 使用自适应流管理器获取槽位
use_adaptive = False
try:
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager, StreamPriority
adaptive_manager = get_adaptive_stream_manager()
if adaptive_manager.is_running:
# 确定流优先级
priority = self._determine_stream_priority(stream_id)
# 获取处理槽位
slot_acquired = await adaptive_manager.acquire_stream_slot(
stream_id=stream_id,
priority=priority,
force=force
)
if slot_acquired:
use_adaptive = True
logger.debug(f"成功获取流处理槽位: {stream_id} (优先级: {priority.name})")
else:
logger.debug(f"自适应管理器拒绝槽位请求: {stream_id},尝试回退方案")
else:
logger.debug(f"自适应管理器未运行,使用原始方法")
except Exception as e:
logger.debug(f"自适应管理器获取槽位失败,使用原始方法: {e}")
# 如果自适应管理器失败或未运行,使用回退方案
if not use_adaptive:
if not await self._fallback_acquire_slot(stream_id, force):
logger.debug(f"回退方案也失败: {stream_id}")
return False
# 创建流循环任务
try:
loop_task = asyncio.create_task(
self._stream_loop_worker(stream_id),
name=f"stream_loop_{stream_id}"
)
self.stream_loops[stream_id] = loop_task
# 记录管理器类型
self.stream_management_type[stream_id] = "adaptive" if use_adaptive else "fallback"
# 更新统计信息
self.stats["active_streams"] += 1
self.stats["total_loops"] += 1
logger.info(f"启动流循环任务: {stream_id} (管理器: {'adaptive' if use_adaptive else 'fallback'})")
return True
except Exception as e:
logger.error(f"启动流循环任务失败 {stream_id}: {e}")
# 释放槽位
if use_adaptive:
try:
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager
adaptive_manager = get_adaptive_stream_manager()
adaptive_manager.release_stream_slot(stream_id)
except:
pass
return False
async def _fallback_acquire_slot(self, stream_id: str, force: bool) -> bool:
"""回退方案:获取槽位(原始方法)"""
# 判断是否需要强制分发
should_force = force or self._should_force_dispatch_for_stream(stream_id)
@@ -149,6 +216,28 @@ class StreamLoopManager:
del self.stream_loops[stream_id]
current_streams -= 1 # 更新当前流数量
return True
def _determine_stream_priority(self, stream_id: str) -> "StreamPriority":
"""确定流优先级"""
try:
from src.chat.message_manager.adaptive_stream_manager import StreamPriority
# 这里可以基于流的历史数据、用户身份等确定优先级
# 简化版本基于流ID的哈希值分配优先级
hash_value = hash(stream_id) % 10
if hash_value >= 8: # 20% 高优先级
return StreamPriority.HIGH
elif hash_value >= 5: # 30% 中等优先级
return StreamPriority.NORMAL
else: # 50% 低优先级
return StreamPriority.LOW
except Exception:
from src.chat.message_manager.adaptive_stream_manager import StreamPriority
return StreamPriority.NORMAL
# 创建流循环任务
try:
task = asyncio.create_task(
@@ -201,13 +290,13 @@ class StreamLoopManager:
logger.info(f"停止流循环: {stream_id} (剩余: {len(self.stream_loops)})")
return True
async def _stream_loop(self, stream_id: str) -> None:
"""单个流的无限循环
async def _stream_loop_worker(self, stream_id: str) -> None:
"""单个流的工作循环 - 优化版本
Args:
stream_id: 流ID
"""
logger.info(f"流循环开始: {stream_id}")
logger.info(f"流循环工作器启动: {stream_id}")
try:
while self.is_running:
@@ -223,6 +312,18 @@ class StreamLoopManager:
unread_count = self._get_unread_count(context)
force_dispatch = self._needs_force_dispatch_for_context(context, unread_count)
# 3. 更新自适应管理器指标
try:
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager
adaptive_manager = get_adaptive_stream_manager()
adaptive_manager.update_stream_metrics(
stream_id,
message_rate=unread_count / 5.0 if unread_count > 0 else 0.0, # 简化计算
last_activity=time.time()
)
except Exception as e:
logger.debug(f"更新流指标失败: {e}")
has_messages = force_dispatch or await self._has_messages_to_process(context)
if has_messages:
@@ -278,6 +379,24 @@ class StreamLoopManager:
del self.stream_loops[stream_id]
logger.debug(f"清理流循环标记: {stream_id}")
# 根据管理器类型释放相应的槽位
management_type = self.stream_management_type.get(stream_id, "fallback")
if management_type == "adaptive":
# 释放自适应管理器的槽位
try:
from src.chat.message_manager.adaptive_stream_manager import get_adaptive_stream_manager
adaptive_manager = get_adaptive_stream_manager()
adaptive_manager.release_stream_slot(stream_id)
logger.debug(f"释放自适应流处理槽位: {stream_id}")
except Exception as e:
logger.debug(f"释放自适应流处理槽位失败: {e}")
else:
logger.debug(f"{stream_id} 使用回退方案,无需释放自适应槽位")
# 清理管理器类型记录
if stream_id in self.stream_management_type:
del self.stream_management_type[stream_id]
logger.info(f"流循环结束: {stream_id}")
async def _get_stream_context(self, stream_id: str) -> Any | None: