This commit is contained in:
Windpicker-owo
2025-12-08 18:11:33 +08:00
8 changed files with 1109 additions and 82 deletions

View File

@@ -2,7 +2,7 @@
职责:
- 批量调度
- 多级缓存
- 多级缓存(内存缓存 + Redis缓存
- 数据预加载
"""
@@ -14,6 +14,8 @@ from .batch_scheduler import (
close_batch_scheduler,
get_batch_scheduler,
)
from .cache_backend import CacheBackend
from .cache_backend import CacheStats as BaseCacheStats
from .cache_manager import (
CacheEntry,
CacheStats,
@@ -21,6 +23,7 @@ from .cache_manager import (
MultiLevelCache,
close_cache,
get_cache,
get_cache_backend_type,
)
from .preloader import (
AccessPattern,
@@ -30,27 +33,36 @@ from .preloader import (
get_preloader,
record_preload_access,
)
from .redis_cache import RedisCache, close_redis_cache, get_redis_cache
__all__ = [
"AccessPattern",
# Batch Scheduler
"AdaptiveBatchScheduler",
"BaseCacheStats",
"BatchOperation",
"BatchStats",
# Cache Backend (Abstract)
"CacheBackend",
"CacheEntry",
"CacheStats",
"CommonDataPreloader",
# Preloader
"DataPreloader",
"LRUCache",
# Cache
# Memory Cache
"MultiLevelCache",
"Priority",
# Redis Cache
"RedisCache",
"close_batch_scheduler",
"close_cache",
"close_preloader",
"close_redis_cache",
"get_batch_scheduler",
"get_cache",
"get_cache_backend_type",
"get_preloader",
"record_preload_access",
"get_redis_cache"
]

View File

@@ -0,0 +1,210 @@
"""缓存后端抽象基类
定义统一的缓存接口,支持多种缓存后端实现:
- MemoryCache: 内存多级缓存L1 + L2
- RedisCache: Redis 分布式缓存
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
@dataclass
class CacheStats:
"""缓存统计信息
Attributes:
hits: 命中次数
misses: 未命中次数
evictions: 淘汰次数
total_size: 总大小(字节)
item_count: 条目数量
"""
hits: int = 0
misses: int = 0
evictions: int = 0
total_size: int = 0
item_count: int = 0
@property
def hit_rate(self) -> float:
"""命中率"""
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
@property
def eviction_rate(self) -> float:
"""淘汰率"""
return self.evictions / self.item_count if self.item_count > 0 else 0.0
class CacheBackend(ABC):
"""缓存后端抽象基类
定义统一的缓存操作接口,所有缓存实现必须继承此类
"""
@abstractmethod
async def get(self, key: str) -> Any | None:
"""从缓存获取数据
Args:
key: 缓存键
Returns:
缓存值,如果不存在返回 None
"""
pass
@abstractmethod
async def set(
self,
key: str,
value: Any,
ttl: float | None = None,
) -> None:
"""设置缓存值
Args:
key: 缓存键
value: 缓存值
ttl: 过期时间None 表示使用默认 TTL
"""
pass
@abstractmethod
async def delete(self, key: str) -> bool:
"""删除缓存条目
Args:
key: 缓存键
Returns:
是否成功删除
"""
pass
@abstractmethod
async def exists(self, key: str) -> bool:
"""检查键是否存在
Args:
key: 缓存键
Returns:
键是否存在
"""
pass
@abstractmethod
async def clear(self) -> None:
"""清空所有缓存"""
pass
@abstractmethod
async def get_stats(self) -> dict[str, Any]:
"""获取缓存统计信息
Returns:
包含命中率、条目数等统计数据的字典
"""
pass
@abstractmethod
async def close(self) -> None:
"""关闭缓存连接/清理资源"""
pass
async def get_or_load(
self,
key: str,
loader: Any,
ttl: float | None = None,
) -> Any | None:
"""获取缓存或通过 loader 加载
Args:
key: 缓存键
loader: 数据加载函数(同步或异步)
ttl: 过期时间(秒)
Returns:
缓存值或加载的值
"""
import asyncio
# 尝试从缓存获取
value = await self.get(key)
if value is not None:
return value
# 缓存未命中,使用 loader 加载
if loader is not None:
if asyncio.iscoroutinefunction(loader):
value = await loader()
else:
value = loader()
if value is not None:
await self.set(key, value, ttl=ttl)
return value
return None
async def delete_pattern(self, pattern: str) -> int:
"""删除匹配模式的所有键(可选实现)
Args:
pattern: 键模式(支持 * 通配符)
Returns:
删除的键数量
"""
# 默认实现:不支持模式删除
raise NotImplementedError("此缓存后端不支持模式删除")
async def mget(self, keys: list[str]) -> dict[str, Any]:
"""批量获取多个键的值(可选实现)
Args:
keys: 键列表
Returns:
键值对字典,不存在的键不包含在结果中
"""
# 默认实现:逐个获取
result = {}
for key in keys:
value = await self.get(key)
if value is not None:
result[key] = value
return result
async def mset(
self,
mapping: dict[str, Any],
ttl: float | None = None,
) -> None:
"""批量设置多个键值对(可选实现)
Args:
mapping: 键值对字典
ttl: 过期时间(秒)
"""
# 默认实现:逐个设置
for key, value in mapping.items():
await self.set(key, value, ttl=ttl)
@property
@abstractmethod
def backend_type(self) -> str:
"""返回缓存后端类型标识"""
pass
@property
def is_distributed(self) -> bool:
"""是否为分布式缓存(默认 False"""
return False

View File

@@ -6,6 +6,10 @@
- LRU淘汰策略自动淘汰最少使用的数据
- 智能预热:启动时预加载高频数据
- 统计信息:命中率、淘汰率等监控数据
支持多种缓存后端:
- memory: 内存多级缓存(默认)
- redis: Redis 分布式缓存
"""
import asyncio
@@ -16,6 +20,7 @@ from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Generic, TypeVar
from src.common.database.optimization.cache_backend import CacheBackend
from src.common.logger import get_logger
from src.common.memory_utils import estimate_cache_item_size
@@ -243,7 +248,7 @@ class LRUCache(Generic[T]):
return 1024
class MultiLevelCache:
class MultiLevelCache(CacheBackend):
"""多级缓存管理器
实现两级缓存架构:
@@ -251,6 +256,8 @@ class MultiLevelCache:
- L2: 扩展缓存大容量长TTL
查询时先查L1未命中再查L2未命中再从数据源加载
实现 CacheBackend 接口,可与 Redis 缓存互换使用
"""
def __init__(
@@ -328,8 +335,8 @@ class MultiLevelCache:
self,
key: str,
value: Any,
size: int | None = None,
ttl: float | None = None,
size: int | None = None,
) -> None:
"""设置缓存值
@@ -338,8 +345,8 @@ class MultiLevelCache:
Args:
key: 缓存键
value: 缓存值
size: 数据大小(字节)
ttl: 自定义过期时间如果为None则使用默认TTL
size: 数据大小(字节)
"""
# 估算数据大小(如果未提供)
if size is None:
@@ -372,16 +379,53 @@ class MultiLevelCache:
await self.l1_cache.set(key, value, size)
await self.l2_cache.set(key, value, size)
async def delete(self, key: str) -> None:
async def delete(self, key: str) -> bool:
"""删除缓存条目
同时从L1和L2删除
Args:
key: 缓存键
Returns:
是否有条目被删除
"""
await self.l1_cache.delete(key)
await self.l2_cache.delete(key)
l1_deleted = await self.l1_cache.delete(key)
l2_deleted = await self.l2_cache.delete(key)
return l1_deleted or l2_deleted
async def exists(self, key: str) -> bool:
"""检查键是否存在于缓存中
Args:
key: 缓存键
Returns:
键是否存在
"""
# 检查 L1
if await self.l1_cache.get(key) is not None:
return True
# 检查 L2
if await self.l2_cache.get(key) is not None:
return True
return False
async def close(self) -> None:
"""关闭缓存(停止清理任务并清空)"""
await self.stop_cleanup_task()
await self.clear()
logger.info("多级缓存已关闭")
@property
def backend_type(self) -> str:
"""返回缓存后端类型标识"""
return "memory"
@property
def is_distributed(self) -> bool:
"""内存缓存不是分布式的"""
return False
async def clear(self) -> None:
"""清空所有缓存"""
@@ -440,8 +484,8 @@ class MultiLevelCache:
# 计算共享键和独占键
shared_keys = l1_keys & l2_keys
l1_keys - l2_keys
l2_keys - l1_keys
l1_only_keys = l1_keys - l2_keys # noqa: F841
l2_only_keys = l2_keys - l1_keys # noqa: F841
# 🔧 修复:并行计算内存使用,避免锁嵌套
l1_size_task = asyncio.create_task(self._calculate_memory_usage_safe(self.l1_cache, l1_keys))
@@ -749,18 +793,22 @@ class MultiLevelCache:
return cleaned_count
# 全局缓存实例
_global_cache: MultiLevelCache | None = None
# 全局缓存实例(支持多种后端类型)
_global_cache: CacheBackend | None = None
_cache_lock = asyncio.Lock()
_cache_backend_type: str = "memory" # 记录当前使用的后端类型
async def get_cache() -> MultiLevelCache:
async def get_cache() -> CacheBackend:
"""获取全局缓存实例(单例)
从配置文件读取缓存参数,如果配置未加载则使用默认值
如果配置中禁用了缓存返回一个最小化的缓存实例容量为1
根据配置自动选择缓存后端:
- cache_backend = "memory": 使用内存多级缓存(默认
- cache_backend = "redis": 使用 Redis 分布式缓存
如果配置中禁用了缓存,返回一个最小化的缓存实例
"""
global _global_cache
global _global_cache, _cache_backend_type
if _global_cache is None:
async with _cache_lock:
@@ -774,7 +822,7 @@ async def get_cache() -> MultiLevelCache:
# 检查是否启用缓存
if not db_config.enable_database_cache:
logger.info("数据库缓存已禁用,使用最小化缓存实例")
logger.info("数据库缓存已禁用,使用最小化内存缓存实例")
_global_cache = MultiLevelCache(
l1_max_size=1,
l1_ttl=1,
@@ -782,51 +830,109 @@ async def get_cache() -> MultiLevelCache:
l2_ttl=1,
max_memory_mb=1,
)
_cache_backend_type = "memory"
return _global_cache
l1_max_size = db_config.cache_l1_max_size
l1_ttl = db_config.cache_l1_ttl
l2_max_size = db_config.cache_l2_max_size
l2_ttl = db_config.cache_l2_ttl
max_memory_mb = db_config.cache_max_memory_mb
max_item_size_mb = db_config.cache_max_item_size_mb
cleanup_interval = db_config.cache_cleanup_interval
# 根据配置选择缓存后端
backend = db_config.cache_backend.lower()
_cache_backend_type = backend
if backend == "redis":
# 使用 Redis 缓存
_global_cache = await _create_redis_cache(db_config)
else:
# 默认使用内存缓存
_global_cache = await _create_memory_cache(db_config)
logger.info(
f"从配置加载缓存参数: L1({l1_max_size}/{l1_ttl}s), "
f"L2({l2_max_size}/{l2_ttl}s), 内存限制({max_memory_mb}MB), "
f"单项限制({max_item_size_mb}MB)"
)
except Exception as e:
# 配置未加载,使用默认
logger.warning(f"无法从配置加载缓存参数,使用默认: {e}")
l1_max_size = 1000
l1_ttl = 60
l2_max_size = 10000
l2_ttl = 300
max_memory_mb = 100
max_item_size_mb = 1
cleanup_interval = 60
_global_cache = MultiLevelCache(
l1_max_size=l1_max_size,
l1_ttl=l1_ttl,
l2_max_size=l2_max_size,
l2_ttl=l2_ttl,
max_memory_mb=max_memory_mb,
max_item_size_mb=max_item_size_mb,
)
await _global_cache.start_cleanup_task(interval=cleanup_interval)
# 配置未加载,使用默认内存缓存
logger.warning(f"无法从配置加载缓存参数,使用默认内存缓存: {e}")
_global_cache = MultiLevelCache()
_cache_backend_type = "memory"
await _global_cache.start_cleanup_task(interval=60)
return _global_cache
async def _create_memory_cache(db_config: Any) -> MultiLevelCache:
"""创建内存多级缓存"""
l1_max_size = db_config.cache_l1_max_size
l1_ttl = db_config.cache_l1_ttl
l2_max_size = db_config.cache_l2_max_size
l2_ttl = db_config.cache_l2_ttl
max_memory_mb = db_config.cache_max_memory_mb
max_item_size_mb = db_config.cache_max_item_size_mb
cleanup_interval = db_config.cache_cleanup_interval
logger.info(
f"创建内存缓存: L1({l1_max_size}/{l1_ttl}s), "
f"L2({l2_max_size}/{l2_ttl}s), 内存限制({max_memory_mb}MB)"
)
cache = MultiLevelCache(
l1_max_size=l1_max_size,
l1_ttl=l1_ttl,
l2_max_size=l2_max_size,
l2_ttl=l2_ttl,
max_memory_mb=max_memory_mb,
max_item_size_mb=max_item_size_mb,
)
await cache.start_cleanup_task(interval=cleanup_interval)
return cache
async def _create_redis_cache(db_config: Any) -> CacheBackend:
"""创建 Redis 缓存
Raises:
RuntimeError: Redis 连接失败时抛出异常
"""
from src.common.database.optimization.redis_cache import RedisCache
logger.info(
f"创建 Redis 缓存: {db_config.redis_host}:{db_config.redis_port}/{db_config.redis_db}, "
f"前缀={db_config.redis_key_prefix}, TTL={db_config.redis_default_ttl}s"
)
cache = RedisCache(
host=db_config.redis_host,
port=db_config.redis_port,
password=db_config.redis_password or None,
db=db_config.redis_db,
key_prefix=db_config.redis_key_prefix,
default_ttl=db_config.redis_default_ttl,
pool_size=db_config.redis_connection_pool_size,
socket_timeout=db_config.redis_socket_timeout,
ssl=db_config.redis_ssl,
)
# 测试连接
if await cache.health_check():
logger.info("Redis 缓存连接成功")
return cache
else:
await cache.close()
raise RuntimeError(
f"Redis 连接测试失败: {db_config.redis_host}:{db_config.redis_port}"
"请检查 Redis 服务是否运行,或将 cache_backend 改为 'memory'"
)
def get_cache_backend_type() -> str:
"""获取当前使用的缓存后端类型
Returns:
"memory""redis"
"""
return _cache_backend_type
async def close_cache() -> None:
"""关闭全局缓存"""
global _global_cache
global _global_cache, _cache_backend_type
if _global_cache is not None:
await _global_cache.stop_cleanup_task()
await _global_cache.clear()
await _global_cache.close()
logger.info(f"全局缓存已关闭 (后端: {_cache_backend_type})")
_global_cache = None
logger.info("全局缓存已关闭")
_cache_backend_type = "memory"

View File

@@ -0,0 +1,554 @@
"""Redis 缓存后端实现
基于 redis-py 的异步 Redis 缓存实现,支持:
- 异步连接池
- 自动序列化/反序列化
- TTL 过期管理
- 模式删除
- 批量操作
- 统计信息
"""
import asyncio
import json
import pickle
from typing import Any
from src.common.database.optimization.cache_backend import CacheBackend, CacheStats
from src.common.logger import get_logger
logger = get_logger("redis_cache")
import redis.asyncio as aioredis
class RedisCache(CacheBackend):
"""Redis 缓存后端
特性:
- 分布式缓存:支持多实例共享
- 自动序列化:支持 JSON 和 Pickle
- TTL 管理Redis 原生过期机制
- 模式删除:支持通配符删除
- 连接池:高效连接复用
"""
def __init__(
self,
host: str = "localhost",
port: int = 6379,
password: str | None = None,
db: int = 0,
key_prefix: str = "mofox:",
default_ttl: int = 600,
pool_size: int = 10,
socket_timeout: float = 5.0,
ssl: bool = False,
serializer: str = "json", # "json" 或 "pickle"
):
"""初始化 Redis 缓存
Args:
host: Redis 服务器地址
port: Redis 服务器端口
password: Redis 密码(可选)
db: Redis 数据库编号
key_prefix: 缓存键前缀
default_ttl: 默认过期时间(秒)
pool_size: 连接池大小
socket_timeout: socket 超时时间(秒)
ssl: 是否启用 SSL
serializer: 序列化方式json 或 pickle
"""
self.host = host
self.port = port
self.password = password if password else None
self.db = db
self.key_prefix = key_prefix
self.default_ttl = default_ttl
self.pool_size = pool_size
self.socket_timeout = socket_timeout
self.ssl = ssl
self.serializer = serializer
# 连接池和客户端(延迟初始化)
self._pool: Any = None
self._client: Any = None
self._lock = asyncio.Lock()
self._is_closing = False
# 统计信息
self._stats = CacheStats()
self._stats_lock = asyncio.Lock()
logger.info(
f"Redis 缓存初始化: {host}:{port}/{db}, "
f"前缀={key_prefix}, TTL={default_ttl}s, "
f"序列化={serializer}"
)
async def _ensure_connection(self) -> Any:
"""确保 Redis 连接已建立"""
if self._client is not None:
return self._client
async with self._lock:
if self._client is not None:
return self._client
try:
# 创建连接池 (使用 aioredis 模块确保类型安全)
self._pool = aioredis.ConnectionPool(
host=self.host,
port=self.port,
password=self.password,
db=self.db,
max_connections=self.pool_size,
socket_timeout=self.socket_timeout,
socket_connect_timeout=self.socket_timeout,
decode_responses=False, # 我们自己处理序列化
ssl=self.ssl,
)
# 创建客户端
self._client = aioredis.Redis(connection_pool=self._pool)
# 测试连接
await self._client.ping()
logger.info(f"Redis 连接成功: {self.host}:{self.port}/{self.db}")
return self._client
except Exception as e:
logger.error(f"Redis 连接失败: {e}")
self._client = None
self._pool = None
raise
def _make_key(self, key: str) -> str:
"""生成带前缀的完整键名"""
return f"{self.key_prefix}{key}"
def _serialize(self, value: Any) -> bytes:
"""序列化值"""
if self.serializer == "json":
try:
return json.dumps(value, ensure_ascii=False, default=str).encode("utf-8")
except (TypeError, ValueError):
# JSON 序列化失败,回退到 pickle
return pickle.dumps(value, protocol=pickle.HIGHEST_PROTOCOL)
else:
return pickle.dumps(value, protocol=pickle.HIGHEST_PROTOCOL)
def _deserialize(self, data: bytes) -> Any:
"""反序列化值"""
if self.serializer == "json":
try:
return json.loads(data.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
# JSON 反序列化失败,尝试 pickle
try:
return pickle.loads(data)
except Exception:
return None
else:
try:
return pickle.loads(data)
except Exception:
return None
async def get(self, key: str) -> Any | None:
"""从缓存获取数据"""
try:
client = await self._ensure_connection()
full_key = self._make_key(key)
data = await client.get(full_key)
async with self._stats_lock:
if data is not None:
self._stats.hits += 1
return self._deserialize(data)
else:
self._stats.misses += 1
return None
except Exception as e:
logger.error(f"Redis GET 失败 [{key}]: {e}")
async with self._stats_lock:
self._stats.misses += 1
return None
async def set(
self,
key: str,
value: Any,
ttl: float | None = None,
) -> None:
"""设置缓存值"""
try:
client = await self._ensure_connection()
full_key = self._make_key(key)
data = self._serialize(value)
# 使用 TTL
expire_time = int(ttl) if ttl is not None else self.default_ttl
await client.setex(full_key, expire_time, data)
logger.debug(f"Redis SET: {key} (TTL={expire_time}s)")
except Exception as e:
logger.error(f"Redis SET 失败 [{key}]: {e}")
async def delete(self, key: str) -> bool:
"""删除缓存条目"""
try:
client = await self._ensure_connection()
full_key = self._make_key(key)
result = await client.delete(full_key)
if result > 0:
async with self._stats_lock:
self._stats.evictions += 1
logger.debug(f"Redis DEL: {key}")
return True
return False
except Exception as e:
logger.error(f"Redis DEL 失败 [{key}]: {e}")
return False
async def exists(self, key: str) -> bool:
"""检查键是否存在"""
try:
client = await self._ensure_connection()
full_key = self._make_key(key)
return bool(await client.exists(full_key))
except Exception as e:
logger.error(f"Redis EXISTS 失败 [{key}]: {e}")
return False
async def clear(self) -> None:
"""清空所有带前缀的缓存"""
try:
client = await self._ensure_connection()
pattern = self._make_key("*")
# 使用 SCAN 避免阻塞
cursor = 0
deleted_count = 0
while True:
cursor, keys = await client.scan(cursor, match=pattern, count=100)
if keys:
await client.delete(*keys)
deleted_count += len(keys)
if cursor == 0:
break
async with self._stats_lock:
self._stats = CacheStats()
logger.info(f"Redis 缓存已清空: 删除 {deleted_count} 个键")
except Exception as e:
logger.error(f"Redis CLEAR 失败: {e}")
async def delete_pattern(self, pattern: str) -> int:
"""删除匹配模式的所有键
Args:
pattern: 键模式(支持 * 通配符)
Returns:
删除的键数量
"""
try:
client = await self._ensure_connection()
full_pattern = self._make_key(pattern)
# 使用 SCAN 避免阻塞
cursor = 0
deleted_count = 0
while True:
cursor, keys = await client.scan(cursor, match=full_pattern, count=100)
if keys:
await client.delete(*keys)
deleted_count += len(keys)
if cursor == 0:
break
async with self._stats_lock:
self._stats.evictions += deleted_count
logger.debug(f"Redis 模式删除: {pattern} -> {deleted_count} 个键")
return deleted_count
except Exception as e:
logger.error(f"Redis 模式删除失败 [{pattern}]: {e}")
return 0
async def mget(self, keys: list[str]) -> dict[str, Any]:
"""批量获取多个键的值"""
if not keys:
return {}
try:
client = await self._ensure_connection()
full_keys = [self._make_key(k) for k in keys]
values = await client.mget(full_keys)
result = {}
hits = 0
misses = 0
for key, value in zip(keys, values):
if value is not None:
result[key] = self._deserialize(value)
hits += 1
else:
misses += 1
async with self._stats_lock:
self._stats.hits += hits
self._stats.misses += misses
return result
except Exception as e:
logger.error(f"Redis MGET 失败: {e}")
return {}
async def mset(
self,
mapping: dict[str, Any],
ttl: float | None = None,
) -> None:
"""批量设置多个键值对"""
if not mapping:
return
try:
client = await self._ensure_connection()
expire_time = int(ttl) if ttl is not None else self.default_ttl
# 使用 pipeline 提高效率
async with client.pipeline(transaction=False) as pipe:
for key, value in mapping.items():
full_key = self._make_key(key)
data = self._serialize(value)
pipe.setex(full_key, expire_time, data)
await pipe.execute()
logger.debug(f"Redis MSET: {len(mapping)} 个键")
except Exception as e:
logger.error(f"Redis MSET 失败: {e}")
async def get_stats(self) -> dict[str, Any]:
"""获取缓存统计信息"""
try:
client = await self._ensure_connection()
# 获取 Redis 服务器信息
info = await client.info("memory")
# keyspace_info 可用于扩展统计, 暂时不获取避免开销
# keyspace_info = await client.info("keyspace")
# 统计带前缀的键数量
pattern = self._make_key("*")
key_count = 0
cursor = 0
while True:
cursor, keys = await client.scan(cursor, match=pattern, count=1000)
key_count += len(keys)
if cursor == 0:
break
async with self._stats_lock:
return {
"backend": "redis",
"hits": self._stats.hits,
"misses": self._stats.misses,
"hit_rate": self._stats.hit_rate,
"evictions": self._stats.evictions,
"key_count": key_count,
"redis_memory_used_mb": info.get("used_memory", 0) / (1024 * 1024),
"redis_memory_peak_mb": info.get("used_memory_peak", 0) / (1024 * 1024),
"redis_connected_clients": info.get("connected_clients", 0),
"key_prefix": self.key_prefix,
"default_ttl": self.default_ttl,
}
except Exception as e:
logger.error(f"获取 Redis 统计信息失败: {e}")
async with self._stats_lock:
return {
"backend": "redis",
"hits": self._stats.hits,
"misses": self._stats.misses,
"hit_rate": self._stats.hit_rate,
"evictions": self._stats.evictions,
"error": str(e),
}
async def close(self) -> None:
"""关闭 Redis 连接"""
self._is_closing = True
if self._client is not None:
try:
await self._client.aclose()
logger.info("Redis 连接已关闭")
except Exception as e:
logger.error(f"关闭 Redis 连接失败: {e}")
finally:
self._client = None
self._pool = None
@property
def backend_type(self) -> str:
"""返回缓存后端类型标识"""
return "redis"
@property
def is_distributed(self) -> bool:
"""Redis 是分布式缓存"""
return True
async def health_check(self) -> bool:
"""健康检查"""
try:
client = await self._ensure_connection()
await client.ping()
return True
except Exception:
return False
async def ttl(self, key: str) -> int:
"""获取键的剩余 TTL
Args:
key: 缓存键
Returns:
剩余秒数,-1 表示无过期时间,-2 表示键不存在
"""
try:
client = await self._ensure_connection()
full_key = self._make_key(key)
return await client.ttl(full_key)
except Exception as e:
logger.error(f"Redis TTL 失败 [{key}]: {e}")
return -2
async def expire(self, key: str, ttl: int) -> bool:
"""更新键的 TTL
Args:
key: 缓存键
ttl: 新的过期时间(秒)
Returns:
是否成功
"""
try:
client = await self._ensure_connection()
full_key = self._make_key(key)
return bool(await client.expire(full_key, ttl))
except Exception as e:
logger.error(f"Redis EXPIRE 失败 [{key}]: {e}")
return False
async def incr(self, key: str, amount: int = 1) -> int:
"""原子递增
Args:
key: 缓存键
amount: 递增量
Returns:
递增后的值
"""
try:
client = await self._ensure_connection()
full_key = self._make_key(key)
return await client.incrby(full_key, amount)
except Exception as e:
logger.error(f"Redis INCR 失败 [{key}]: {e}")
return 0
async def decr(self, key: str, amount: int = 1) -> int:
"""原子递减
Args:
key: 缓存键
amount: 递减量
Returns:
递减后的值
"""
try:
client = await self._ensure_connection()
full_key = self._make_key(key)
return await client.decrby(full_key, amount)
except Exception as e:
logger.error(f"Redis DECR 失败 [{key}]: {e}")
return 0
# 全局 Redis 缓存实例
_global_redis_cache: RedisCache | None = None
_redis_cache_lock = asyncio.Lock()
async def get_redis_cache() -> RedisCache:
"""获取全局 Redis 缓存实例(单例)"""
global _global_redis_cache
if _global_redis_cache is None:
async with _redis_cache_lock:
if _global_redis_cache is None:
# 从配置加载参数
try:
from src.config.config import global_config
assert global_config is not None
db_config = global_config.database
_global_redis_cache = RedisCache(
host=db_config.redis_host,
port=db_config.redis_port,
password=db_config.redis_password or None,
db=db_config.redis_db,
key_prefix=db_config.redis_key_prefix,
default_ttl=db_config.redis_default_ttl,
pool_size=db_config.redis_connection_pool_size,
socket_timeout=db_config.redis_socket_timeout,
ssl=db_config.redis_ssl,
)
except Exception as e:
logger.warning(f"无法从配置加载 Redis 参数,使用默认值: {e}")
_global_redis_cache = RedisCache()
return _global_redis_cache
async def close_redis_cache() -> None:
"""关闭全局 Redis 缓存"""
global _global_redis_cache
if _global_redis_cache is not None:
await _global_redis_cache.close()
_global_redis_cache = None
logger.info("全局 Redis 缓存已关闭")

View File

@@ -44,6 +44,12 @@ class DatabaseConfig(ValidatedConfigBase):
# 数据库缓存配置
enable_database_cache: bool = Field(default=True, description="是否启用数据库查询缓存系统")
cache_backend: str = Field(
default="memory",
description="缓存后端类型: memory(内存缓存) 或 redis(Redis缓存)",
)
# 内存缓存配置 (cache_backend = "memory" 时生效)
cache_l1_max_size: int = Field(default=1000, ge=100, le=50000, description="L1缓存最大条目数热数据内存占用约1-5MB")
cache_l1_ttl: int = Field(default=300, ge=10, le=3600, description="L1缓存生存时间")
cache_l2_max_size: int = Field(default=10000, ge=1000, le=100000, description="L2缓存最大条目数温数据内存占用约10-50MB")
@@ -52,6 +58,17 @@ class DatabaseConfig(ValidatedConfigBase):
cache_max_memory_mb: int = Field(default=100, ge=10, le=1000, description="缓存最大内存占用MB超过此值将触发强制清理")
cache_max_item_size_mb: int = Field(default=1, ge=1, le=100, description="单个缓存条目最大大小MB超过此值将不缓存")
# Redis缓存配置 (cache_backend = "redis" 时生效)
redis_host: str = Field(default="localhost", description="Redis服务器地址")
redis_port: int = Field(default=6379, ge=1, le=65535, description="Redis服务器端口")
redis_password: str = Field(default="", description="Redis密码可选")
redis_db: int = Field(default=0, ge=0, le=15, description="Redis数据库编号")
redis_key_prefix: str = Field(default="mofox:", description="Redis缓存键前缀")
redis_default_ttl: int = Field(default=600, ge=60, le=86400, description="Redis默认缓存过期时间")
redis_connection_pool_size: int = Field(default=10, ge=1, le=100, description="Redis连接池大小")
redis_socket_timeout: float = Field(default=5.0, ge=1.0, le=30.0, description="Redis socket超时时间")
redis_ssl: bool = Field(default=False, description="是否启用Redis SSL连接")
class BotConfig(ValidatedConfigBase):
"""QQ机器人配置类"""