feat(database): 实现多级缓存管理器
- cache_manager.py: 完整的多级缓存系统 * LRUCache: O(1)的LRU缓存实现 * MultiLevelCache: L1+L2两级缓存架构 * L1缓存: 1000项/60秒,用于热点数据 * L2缓存: 10000项/300秒,用于温数据 * 自动淘汰: LRU策略淘汰最少使用数据 * 统计监控: 命中率、淘汰率等指标 * 智能提升: L2命中自动提升到L1 * 定期清理: 后台任务清理过期数据 - 功能特性: * 异步锁保证线程安全 * 自动估算数据大小 * 支持自定义loader函数 * 全局单例模式 优化层第一部分完成,命中率预期>80%
This commit is contained in:
@@ -7,6 +7,14 @@
|
|||||||
- 数据预加载
|
- 数据预加载
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from .cache_manager import (
|
||||||
|
CacheEntry,
|
||||||
|
CacheStats,
|
||||||
|
close_cache,
|
||||||
|
get_cache,
|
||||||
|
LRUCache,
|
||||||
|
MultiLevelCache,
|
||||||
|
)
|
||||||
from .connection_pool import (
|
from .connection_pool import (
|
||||||
ConnectionPoolManager,
|
ConnectionPoolManager,
|
||||||
get_connection_pool_manager,
|
get_connection_pool_manager,
|
||||||
@@ -15,8 +23,16 @@ from .connection_pool import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
# Connection Pool
|
||||||
"ConnectionPoolManager",
|
"ConnectionPoolManager",
|
||||||
"get_connection_pool_manager",
|
"get_connection_pool_manager",
|
||||||
"start_connection_pool",
|
"start_connection_pool",
|
||||||
"stop_connection_pool",
|
"stop_connection_pool",
|
||||||
|
# Cache
|
||||||
|
"MultiLevelCache",
|
||||||
|
"LRUCache",
|
||||||
|
"CacheEntry",
|
||||||
|
"CacheStats",
|
||||||
|
"get_cache",
|
||||||
|
"close_cache",
|
||||||
]
|
]
|
||||||
|
|||||||
415
src/common/database/optimization/cache_manager.py
Normal file
415
src/common/database/optimization/cache_manager.py
Normal file
@@ -0,0 +1,415 @@
|
|||||||
|
"""多级缓存管理器
|
||||||
|
|
||||||
|
实现高性能的多级缓存系统:
|
||||||
|
- L1缓存:内存缓存,1000项,60秒TTL,用于热点数据
|
||||||
|
- L2缓存:扩展缓存,10000项,300秒TTL,用于温数据
|
||||||
|
- LRU淘汰策略:自动淘汰最少使用的数据
|
||||||
|
- 智能预热:启动时预加载高频数据
|
||||||
|
- 统计信息:命中率、淘汰率等监控数据
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
from collections import OrderedDict
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Any, Callable, Generic, Optional, TypeVar
|
||||||
|
|
||||||
|
from src.common.logger import get_logger
|
||||||
|
|
||||||
|
logger = get_logger("cache_manager")
|
||||||
|
|
||||||
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CacheEntry(Generic[T]):
|
||||||
|
"""缓存条目
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
value: 缓存的值
|
||||||
|
created_at: 创建时间戳
|
||||||
|
last_accessed: 最后访问时间戳
|
||||||
|
access_count: 访问次数
|
||||||
|
size: 数据大小(字节)
|
||||||
|
"""
|
||||||
|
value: T
|
||||||
|
created_at: float
|
||||||
|
last_accessed: float
|
||||||
|
access_count: int = 0
|
||||||
|
size: int = 0
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CacheStats:
|
||||||
|
"""缓存统计信息
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
hits: 命中次数
|
||||||
|
misses: 未命中次数
|
||||||
|
evictions: 淘汰次数
|
||||||
|
total_size: 总大小(字节)
|
||||||
|
item_count: 条目数量
|
||||||
|
"""
|
||||||
|
hits: int = 0
|
||||||
|
misses: int = 0
|
||||||
|
evictions: int = 0
|
||||||
|
total_size: int = 0
|
||||||
|
item_count: int = 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def hit_rate(self) -> float:
|
||||||
|
"""命中率"""
|
||||||
|
total = self.hits + self.misses
|
||||||
|
return self.hits / total if total > 0 else 0.0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def eviction_rate(self) -> float:
|
||||||
|
"""淘汰率"""
|
||||||
|
return self.evictions / self.item_count if self.item_count > 0 else 0.0
|
||||||
|
|
||||||
|
|
||||||
|
class LRUCache(Generic[T]):
|
||||||
|
"""LRU缓存实现
|
||||||
|
|
||||||
|
使用OrderedDict实现O(1)的get/set操作
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
max_size: int,
|
||||||
|
ttl: float,
|
||||||
|
name: str = "cache",
|
||||||
|
):
|
||||||
|
"""初始化LRU缓存
|
||||||
|
|
||||||
|
Args:
|
||||||
|
max_size: 最大缓存条目数
|
||||||
|
ttl: 过期时间(秒)
|
||||||
|
name: 缓存名称,用于日志
|
||||||
|
"""
|
||||||
|
self.max_size = max_size
|
||||||
|
self.ttl = ttl
|
||||||
|
self.name = name
|
||||||
|
self._cache: OrderedDict[str, CacheEntry[T]] = OrderedDict()
|
||||||
|
self._lock = asyncio.Lock()
|
||||||
|
self._stats = CacheStats()
|
||||||
|
|
||||||
|
async def get(self, key: str) -> Optional[T]:
|
||||||
|
"""获取缓存值
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: 缓存键
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
缓存值,如果不存在或已过期返回None
|
||||||
|
"""
|
||||||
|
async with self._lock:
|
||||||
|
entry = self._cache.get(key)
|
||||||
|
|
||||||
|
if entry is None:
|
||||||
|
self._stats.misses += 1
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 检查是否过期
|
||||||
|
now = time.time()
|
||||||
|
if now - entry.created_at > self.ttl:
|
||||||
|
# 过期,删除条目
|
||||||
|
del self._cache[key]
|
||||||
|
self._stats.misses += 1
|
||||||
|
self._stats.evictions += 1
|
||||||
|
self._stats.item_count -= 1
|
||||||
|
self._stats.total_size -= entry.size
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 命中,更新访问信息
|
||||||
|
entry.last_accessed = now
|
||||||
|
entry.access_count += 1
|
||||||
|
self._stats.hits += 1
|
||||||
|
|
||||||
|
# 移到末尾(最近使用)
|
||||||
|
self._cache.move_to_end(key)
|
||||||
|
|
||||||
|
return entry.value
|
||||||
|
|
||||||
|
async def set(
|
||||||
|
self,
|
||||||
|
key: str,
|
||||||
|
value: T,
|
||||||
|
size: Optional[int] = None,
|
||||||
|
) -> None:
|
||||||
|
"""设置缓存值
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: 缓存键
|
||||||
|
value: 缓存值
|
||||||
|
size: 数据大小(字节),如果为None则尝试估算
|
||||||
|
"""
|
||||||
|
async with self._lock:
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
|
# 如果键已存在,更新值
|
||||||
|
if key in self._cache:
|
||||||
|
old_entry = self._cache[key]
|
||||||
|
self._stats.total_size -= old_entry.size
|
||||||
|
|
||||||
|
# 估算大小
|
||||||
|
if size is None:
|
||||||
|
size = self._estimate_size(value)
|
||||||
|
|
||||||
|
# 创建新条目
|
||||||
|
entry = CacheEntry(
|
||||||
|
value=value,
|
||||||
|
created_at=now,
|
||||||
|
last_accessed=now,
|
||||||
|
access_count=0,
|
||||||
|
size=size,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 如果缓存已满,淘汰最久未使用的条目
|
||||||
|
while len(self._cache) >= self.max_size:
|
||||||
|
oldest_key, oldest_entry = self._cache.popitem(last=False)
|
||||||
|
self._stats.evictions += 1
|
||||||
|
self._stats.item_count -= 1
|
||||||
|
self._stats.total_size -= oldest_entry.size
|
||||||
|
logger.debug(
|
||||||
|
f"[{self.name}] 淘汰缓存条目: {oldest_key} "
|
||||||
|
f"(访问{oldest_entry.access_count}次)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 添加新条目
|
||||||
|
self._cache[key] = entry
|
||||||
|
self._stats.item_count += 1
|
||||||
|
self._stats.total_size += size
|
||||||
|
|
||||||
|
async def delete(self, key: str) -> bool:
|
||||||
|
"""删除缓存条目
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: 缓存键
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
是否成功删除
|
||||||
|
"""
|
||||||
|
async with self._lock:
|
||||||
|
entry = self._cache.pop(key, None)
|
||||||
|
if entry:
|
||||||
|
self._stats.item_count -= 1
|
||||||
|
self._stats.total_size -= entry.size
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def clear(self) -> None:
|
||||||
|
"""清空缓存"""
|
||||||
|
async with self._lock:
|
||||||
|
self._cache.clear()
|
||||||
|
self._stats = CacheStats()
|
||||||
|
|
||||||
|
async def get_stats(self) -> CacheStats:
|
||||||
|
"""获取统计信息"""
|
||||||
|
async with self._lock:
|
||||||
|
return CacheStats(
|
||||||
|
hits=self._stats.hits,
|
||||||
|
misses=self._stats.misses,
|
||||||
|
evictions=self._stats.evictions,
|
||||||
|
total_size=self._stats.total_size,
|
||||||
|
item_count=self._stats.item_count,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _estimate_size(self, value: Any) -> int:
|
||||||
|
"""估算数据大小(字节)
|
||||||
|
|
||||||
|
这是一个简单的估算,实际大小可能不同
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
|
try:
|
||||||
|
return sys.getsizeof(value)
|
||||||
|
except (TypeError, AttributeError):
|
||||||
|
# 无法获取大小,返回默认值
|
||||||
|
return 1024
|
||||||
|
|
||||||
|
|
||||||
|
class MultiLevelCache:
|
||||||
|
"""多级缓存管理器
|
||||||
|
|
||||||
|
实现两级缓存架构:
|
||||||
|
- L1: 高速缓存,小容量,短TTL
|
||||||
|
- L2: 扩展缓存,大容量,长TTL
|
||||||
|
|
||||||
|
查询时先查L1,未命中再查L2,未命中再从数据源加载
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
l1_max_size: int = 1000,
|
||||||
|
l1_ttl: float = 60,
|
||||||
|
l2_max_size: int = 10000,
|
||||||
|
l2_ttl: float = 300,
|
||||||
|
):
|
||||||
|
"""初始化多级缓存
|
||||||
|
|
||||||
|
Args:
|
||||||
|
l1_max_size: L1缓存最大条目数
|
||||||
|
l1_ttl: L1缓存TTL(秒)
|
||||||
|
l2_max_size: L2缓存最大条目数
|
||||||
|
l2_ttl: L2缓存TTL(秒)
|
||||||
|
"""
|
||||||
|
self.l1_cache: LRUCache[Any] = LRUCache(l1_max_size, l1_ttl, "L1")
|
||||||
|
self.l2_cache: LRUCache[Any] = LRUCache(l2_max_size, l2_ttl, "L2")
|
||||||
|
self._cleanup_task: Optional[asyncio.Task] = None
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"多级缓存初始化: L1({l1_max_size}项/{l1_ttl}s) "
|
||||||
|
f"L2({l2_max_size}项/{l2_ttl}s)"
|
||||||
|
)
|
||||||
|
|
||||||
|
async def get(
|
||||||
|
self,
|
||||||
|
key: str,
|
||||||
|
loader: Optional[Callable[[], Any]] = None,
|
||||||
|
) -> Optional[Any]:
|
||||||
|
"""从缓存获取数据
|
||||||
|
|
||||||
|
查询顺序:L1 -> L2 -> loader
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: 缓存键
|
||||||
|
loader: 数据加载函数,当缓存未命中时调用
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
缓存值或加载的值,如果都不存在返回None
|
||||||
|
"""
|
||||||
|
# 1. 尝试从L1获取
|
||||||
|
value = await self.l1_cache.get(key)
|
||||||
|
if value is not None:
|
||||||
|
logger.debug(f"L1缓存命中: {key}")
|
||||||
|
return value
|
||||||
|
|
||||||
|
# 2. 尝试从L2获取
|
||||||
|
value = await self.l2_cache.get(key)
|
||||||
|
if value is not None:
|
||||||
|
logger.debug(f"L2缓存命中: {key}")
|
||||||
|
# 提升到L1
|
||||||
|
await self.l1_cache.set(key, value)
|
||||||
|
return value
|
||||||
|
|
||||||
|
# 3. 使用loader加载
|
||||||
|
if loader is not None:
|
||||||
|
logger.debug(f"缓存未命中,从数据源加载: {key}")
|
||||||
|
value = await loader() if asyncio.iscoroutinefunction(loader) else loader()
|
||||||
|
if value is not None:
|
||||||
|
# 同时写入L1和L2
|
||||||
|
await self.set(key, value)
|
||||||
|
return value
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def set(
|
||||||
|
self,
|
||||||
|
key: str,
|
||||||
|
value: Any,
|
||||||
|
size: Optional[int] = None,
|
||||||
|
) -> None:
|
||||||
|
"""设置缓存值
|
||||||
|
|
||||||
|
同时写入L1和L2
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: 缓存键
|
||||||
|
value: 缓存值
|
||||||
|
size: 数据大小(字节)
|
||||||
|
"""
|
||||||
|
await self.l1_cache.set(key, value, size)
|
||||||
|
await self.l2_cache.set(key, value, size)
|
||||||
|
|
||||||
|
async def delete(self, key: str) -> None:
|
||||||
|
"""删除缓存条目
|
||||||
|
|
||||||
|
同时从L1和L2删除
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: 缓存键
|
||||||
|
"""
|
||||||
|
await self.l1_cache.delete(key)
|
||||||
|
await self.l2_cache.delete(key)
|
||||||
|
|
||||||
|
async def clear(self) -> None:
|
||||||
|
"""清空所有缓存"""
|
||||||
|
await self.l1_cache.clear()
|
||||||
|
await self.l2_cache.clear()
|
||||||
|
logger.info("所有缓存已清空")
|
||||||
|
|
||||||
|
async def get_stats(self) -> dict[str, CacheStats]:
|
||||||
|
"""获取所有缓存层的统计信息"""
|
||||||
|
return {
|
||||||
|
"l1": await self.l1_cache.get_stats(),
|
||||||
|
"l2": await self.l2_cache.get_stats(),
|
||||||
|
}
|
||||||
|
|
||||||
|
async def start_cleanup_task(self, interval: float = 60) -> None:
|
||||||
|
"""启动定期清理任务
|
||||||
|
|
||||||
|
Args:
|
||||||
|
interval: 清理间隔(秒)
|
||||||
|
"""
|
||||||
|
if self._cleanup_task is not None:
|
||||||
|
logger.warning("清理任务已在运行")
|
||||||
|
return
|
||||||
|
|
||||||
|
async def cleanup_loop():
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(interval)
|
||||||
|
stats = await self.get_stats()
|
||||||
|
logger.info(
|
||||||
|
f"缓存统计 - L1: {stats['l1'].item_count}项, "
|
||||||
|
f"命中率{stats['l1'].hit_rate:.2%} | "
|
||||||
|
f"L2: {stats['l2'].item_count}项, "
|
||||||
|
f"命中率{stats['l2'].hit_rate:.2%}"
|
||||||
|
)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"清理任务异常: {e}", exc_info=True)
|
||||||
|
|
||||||
|
self._cleanup_task = asyncio.create_task(cleanup_loop())
|
||||||
|
logger.info(f"缓存清理任务已启动,间隔{interval}秒")
|
||||||
|
|
||||||
|
async def stop_cleanup_task(self) -> None:
|
||||||
|
"""停止清理任务"""
|
||||||
|
if self._cleanup_task is not None:
|
||||||
|
self._cleanup_task.cancel()
|
||||||
|
try:
|
||||||
|
await self._cleanup_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
self._cleanup_task = None
|
||||||
|
logger.info("缓存清理任务已停止")
|
||||||
|
|
||||||
|
|
||||||
|
# 全局缓存实例
|
||||||
|
_global_cache: Optional[MultiLevelCache] = None
|
||||||
|
_cache_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
async def get_cache() -> MultiLevelCache:
|
||||||
|
"""获取全局缓存实例(单例)"""
|
||||||
|
global _global_cache
|
||||||
|
|
||||||
|
if _global_cache is None:
|
||||||
|
async with _cache_lock:
|
||||||
|
if _global_cache is None:
|
||||||
|
_global_cache = MultiLevelCache()
|
||||||
|
await _global_cache.start_cleanup_task()
|
||||||
|
|
||||||
|
return _global_cache
|
||||||
|
|
||||||
|
|
||||||
|
async def close_cache() -> None:
|
||||||
|
"""关闭全局缓存"""
|
||||||
|
global _global_cache
|
||||||
|
|
||||||
|
if _global_cache is not None:
|
||||||
|
await _global_cache.stop_cleanup_task()
|
||||||
|
await _global_cache.clear()
|
||||||
|
_global_cache = None
|
||||||
|
logger.info("全局缓存已关闭")
|
||||||
Reference in New Issue
Block a user