优化缓存条目大小估算,添加向量存储标记,清理待处理消息逻辑
This commit is contained in:
@@ -10,6 +10,7 @@
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
@@ -40,6 +41,9 @@ class PerceptualMemoryManager:
|
||||
activation_threshold: int = 3,
|
||||
recall_top_k: int = 5,
|
||||
recall_similarity_threshold: float = 0.55,
|
||||
pending_message_ttl: int = 600,
|
||||
max_pending_per_stream: int = 50,
|
||||
max_pending_messages: int = 2000,
|
||||
):
|
||||
"""
|
||||
初始化感知记忆层管理器
|
||||
@@ -51,6 +55,9 @@ class PerceptualMemoryManager:
|
||||
activation_threshold: 激活阈值(召回次数)
|
||||
recall_top_k: 召回时返回的最大块数
|
||||
recall_similarity_threshold: 召回的相似度阈值
|
||||
pending_message_ttl: 待组块消息最大保留时间(秒)
|
||||
max_pending_per_stream: 单个流允许的待组块消息上限
|
||||
max_pending_messages: 全部流的待组块消息总上限
|
||||
"""
|
||||
self.data_dir = data_dir or Path("data/memory_graph")
|
||||
self.data_dir.mkdir(parents=True, exist_ok=True)
|
||||
@@ -61,6 +68,9 @@ class PerceptualMemoryManager:
|
||||
self.activation_threshold = activation_threshold
|
||||
self.recall_top_k = recall_top_k
|
||||
self.recall_similarity_threshold = recall_similarity_threshold
|
||||
self.pending_message_ttl = max(0, pending_message_ttl)
|
||||
self.max_pending_per_stream = max(0, max_pending_per_stream)
|
||||
self.max_pending_messages = max(0, max_pending_messages)
|
||||
|
||||
# 核心数据
|
||||
self.perceptual_memory: PerceptualMemory | None = None
|
||||
@@ -104,6 +114,8 @@ class PerceptualMemoryManager:
|
||||
max_blocks=self.max_blocks,
|
||||
block_size=self.block_size,
|
||||
)
|
||||
else:
|
||||
self._cleanup_pending_messages()
|
||||
|
||||
self._initialized = True
|
||||
logger.info(
|
||||
@@ -138,18 +150,28 @@ class PerceptualMemoryManager:
|
||||
await self.initialize()
|
||||
|
||||
try:
|
||||
# 添加到待处理消息队列
|
||||
self.perceptual_memory.pending_messages.append(message)
|
||||
|
||||
if not hasattr(self.perceptual_memory, "pending_messages"):
|
||||
self.perceptual_memory.pending_messages = []
|
||||
|
||||
self._cleanup_pending_messages()
|
||||
|
||||
stream_id = message.get("stream_id", "unknown")
|
||||
self._normalize_message_timestamp(message)
|
||||
self.perceptual_memory.pending_messages.append(message)
|
||||
self._enforce_pending_limits(stream_id)
|
||||
|
||||
logger.debug(
|
||||
f"消息已添加到待处理队列 (stream={stream_id[:8]}, "
|
||||
f"总数={len(self.perceptual_memory.pending_messages)})"
|
||||
)
|
||||
|
||||
# 按 stream_id 检查是否达到创建块的条件
|
||||
stream_messages = [msg for msg in self.perceptual_memory.pending_messages if msg.get("stream_id") == stream_id]
|
||||
|
||||
stream_messages = [
|
||||
msg
|
||||
for msg in self.perceptual_memory.pending_messages
|
||||
if msg.get("stream_id") == stream_id
|
||||
]
|
||||
|
||||
if len(stream_messages) >= self.block_size:
|
||||
new_block = await self._create_memory_block(stream_id)
|
||||
return new_block
|
||||
@@ -171,6 +193,7 @@ class PerceptualMemoryManager:
|
||||
新创建的记忆块,失败返回 None
|
||||
"""
|
||||
try:
|
||||
self._cleanup_pending_messages()
|
||||
# 只取出指定 stream_id 的 block_size 条消息
|
||||
stream_messages = [msg for msg in self.perceptual_memory.pending_messages if msg.get("stream_id") == stream_id]
|
||||
|
||||
@@ -227,6 +250,82 @@ class PerceptualMemoryManager:
|
||||
logger.error(f"创建记忆块失败: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
def _normalize_message_timestamp(self, message: dict[str, Any]) -> float:
|
||||
"""确保消息包含 timestamp 字段并返回其值。"""
|
||||
raw_ts = message.get("timestamp", message.get("time"))
|
||||
try:
|
||||
timestamp = float(raw_ts)
|
||||
except (TypeError, ValueError):
|
||||
timestamp = time.time()
|
||||
message["timestamp"] = timestamp
|
||||
return timestamp
|
||||
|
||||
def _cleanup_pending_messages(self) -> None:
|
||||
"""移除过期/超限的待组块消息,避免内存无限增长。"""
|
||||
if not self.perceptual_memory or not getattr(self.perceptual_memory, "pending_messages", None):
|
||||
return
|
||||
|
||||
pending = self.perceptual_memory.pending_messages
|
||||
now = time.time()
|
||||
removed = 0
|
||||
|
||||
if self.pending_message_ttl > 0:
|
||||
filtered: list[dict[str, Any]] = []
|
||||
ttl = float(self.pending_message_ttl)
|
||||
for msg in pending:
|
||||
ts = msg.get("timestamp") or msg.get("time")
|
||||
try:
|
||||
ts_value = float(ts)
|
||||
except (TypeError, ValueError):
|
||||
ts_value = time.time()
|
||||
msg["timestamp"] = ts_value
|
||||
if now - ts_value <= ttl:
|
||||
filtered.append(msg)
|
||||
else:
|
||||
removed += 1
|
||||
|
||||
if removed:
|
||||
pending[:] = filtered
|
||||
|
||||
# 全局上限,按 FIFO 丢弃最旧的消息
|
||||
if self.max_pending_messages > 0 and len(pending) > self.max_pending_messages:
|
||||
overflow = len(pending) - self.max_pending_messages
|
||||
del pending[:overflow]
|
||||
removed += overflow
|
||||
|
||||
if removed:
|
||||
logger.debug(f"清理待组块消息 {removed} 条 (剩余 {len(pending)})")
|
||||
|
||||
def _enforce_pending_limits(self, stream_id: str) -> None:
|
||||
"""保证单个 stream 的待组块消息不超过限制。"""
|
||||
if (
|
||||
not self.perceptual_memory
|
||||
or not getattr(self.perceptual_memory, "pending_messages", None)
|
||||
or self.max_pending_per_stream <= 0
|
||||
):
|
||||
return
|
||||
|
||||
pending = self.perceptual_memory.pending_messages
|
||||
indexes = [
|
||||
idx
|
||||
for idx, msg in enumerate(pending)
|
||||
if msg.get("stream_id") == stream_id
|
||||
]
|
||||
|
||||
overflow = len(indexes) - self.max_pending_per_stream
|
||||
if overflow <= 0:
|
||||
return
|
||||
|
||||
for idx in reversed(indexes[:overflow]):
|
||||
pending.pop(idx)
|
||||
|
||||
logger.warning(
|
||||
"stream %s 待组块消息过多,丢弃 %d 条旧消息 (保留 %d 条)",
|
||||
stream_id,
|
||||
overflow,
|
||||
self.max_pending_per_stream,
|
||||
)
|
||||
|
||||
def _combine_messages(self, messages: list[dict[str, Any]]) -> str:
|
||||
"""
|
||||
合并多条消息为单一文本
|
||||
@@ -508,6 +607,8 @@ class PerceptualMemoryManager:
|
||||
if not self.perceptual_memory:
|
||||
return
|
||||
|
||||
self._cleanup_pending_messages()
|
||||
|
||||
# 保存到 JSON 文件
|
||||
import orjson
|
||||
|
||||
|
||||
Reference in New Issue
Block a user