From da27c865d0c30d32d611772be62209cf2f34b456 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9B=85=E8=AF=BA=E7=8B=90?= <212194964+foxcyber907@users.noreply.github.com> Date: Mon, 8 Dec 2025 17:42:57 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=95=E5=85=A5Redis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/database_cache_guide.md | 171 +++++- requirements.txt | 1 + src/common/database/optimization/__init__.py | 16 +- .../database/optimization/cache_backend.py | 210 +++++++ .../database/optimization/cache_manager.py | 206 +++++-- .../database/optimization/redis_cache.py | 554 ++++++++++++++++++ src/config/official_configs.py | 17 + template/bot_config_template.toml | 16 +- 8 files changed, 1109 insertions(+), 82 deletions(-) create mode 100644 src/common/database/optimization/cache_backend.py create mode 100644 src/common/database/optimization/redis_cache.py diff --git a/docs/database_cache_guide.md b/docs/database_cache_guide.md index 29fccd4e6..852e3a3ce 100644 --- a/docs/database_cache_guide.md +++ b/docs/database_cache_guide.md @@ -2,20 +2,45 @@ ## 概述 -MoFox Bot 数据库系统集成了多级缓存架构,用于优化高频查询性能,减少数据库压力。 +MoFox Bot 数据库系统集成了可插拔的缓存架构,支持多种缓存后端: -## 缓存架构 +- **内存缓存(Memory)**: 多级 LRU 缓存,适合单机部署 +- **Redis 缓存**: 分布式缓存,适合多实例部署或需要持久化缓存的场景 + +## 缓存后端选择 + +在 `bot_config.toml` 中配置: + +```toml +[database] +enable_database_cache = true # 是否启用缓存 +cache_backend = "memory" # 缓存后端: "memory" 或 "redis" +``` + +### 后端对比 + +| 特性 | 内存缓存 (memory) | Redis 缓存 (redis) | +|------|-------------------|-------------------| +| 部署复杂度 | 低(无额外依赖) | 中(需要 Redis 服务) | +| 分布式支持 | ❌ | ✅ | +| 持久化 | ❌ | ✅ | +| 性能 | 极高(本地内存) | 高(网络开销) | +| 适用场景 | 单机部署 | 多实例/集群部署 | + +--- + +## 内存缓存架构 ### 多级缓存(Multi-Level Cache) - **L1 缓存(热数据)** - - 容量:1000 项 - - TTL:60 秒 + - 容量:1000 项(可配置) + - TTL:300 秒(可配置) - 用途:最近访问的热点数据 - **L2 缓存(温数据)** - - 容量:10000 项 - - TTL:300 秒 + - 容量:10000 项(可配置) + - TTL:1800 秒(可配置) - 用途:较常访问但不是最热的数据 ### LRU 驱逐策略 @@ -24,11 +49,45 @@ MoFox Bot 数据库系统集成了多级缓存架构,用于优化高频查询 - 缓存满时自动驱逐最少使用的项 - 保证最常用数据始终在缓存中 +--- + +## Redis 缓存架构 + +### 特性 + +- **分布式**: 多个 Bot 实例可共享缓存 +- **持久化**: Redis 支持 RDB/AOF 持久化 +- **TTL 管理**: 使用 Redis 原生过期机制 +- **模式删除**: 支持通配符批量删除缓存 +- **原子操作**: 支持 INCR/DECR 等原子操作 + +### 配置参数 + +```toml +[database] +# Redis缓存配置(cache_backend = "redis" 时生效) +redis_host = "localhost" # Redis服务器地址 +redis_port = 6379 # Redis服务器端口 +redis_password = "" # Redis密码(留空表示无密码) +redis_db = 0 # Redis数据库编号 (0-15) +redis_key_prefix = "mofox:" # 缓存键前缀 +redis_default_ttl = 600 # 默认过期时间(秒) +redis_connection_pool_size = 10 # 连接池大小 +``` + +### 安装 Redis 依赖 + +```bash +pip install redis +``` + +--- + ## 使用方法 ### 1. 使用 @cached 装饰器(推荐) -最简单的方式是使用 `@cached` 装饰器: +最简单的方式,自动适配所有缓存后端: ```python from src.common.database.utils.decorators import cached @@ -54,7 +113,7 @@ async def get_person_info(platform: str, person_id: str): 需要更精细控制时,可以手动管理缓存: ```python -from src.common.database.optimization.cache_manager import get_cache +from src.common.database.optimization import get_cache async def custom_query(): cache = await get_cache() @@ -67,18 +126,33 @@ async def custom_query(): # 缓存未命中,执行查询 result = await execute_database_query() - # 写入缓存 - await cache.set("my_key", result) + # 写入缓存(可指定自定义 TTL) + await cache.set("my_key", result, ttl=300) return result ``` -### 3. 缓存失效 +### 3. 使用 get_or_load 方法 + +简化的缓存加载模式: + +```python +cache = await get_cache() + +# 自动处理:缓存命中返回,未命中则执行 loader 并缓存结果 +result = await cache.get_or_load( + "my_key", + loader=lambda: fetch_data_from_db(), + ttl=600 +) +``` + +### 4. 缓存失效 更新数据后需要主动使缓存失效: ```python -from src.common.database.optimization.cache_manager import get_cache +from src.common.database.optimization import get_cache from src.common.database.utils.decorators import generate_cache_key async def update_person_affinity(platform: str, person_id: str, affinity_delta: float): @@ -91,6 +165,8 @@ async def update_person_affinity(platform: str, person_id: str, affinity_delta: await cache.delete(cache_key) ``` +--- + ## 已缓存的查询 ### PersonInfo(人员信息) @@ -116,17 +192,35 @@ async def update_person_affinity(platform: str, person_id: str, affinity_delta: ## 缓存统计 -查看缓存性能统计: +### 内存缓存统计 ```python cache = await get_cache() stats = await cache.get_stats() -print(f"L1 命中率: {stats['l1_hits']}/{stats['l1_hits'] + stats['l1_misses']}") -print(f"L2 命中率: {stats['l2_hits']}/{stats['l2_hits'] + stats['l2_misses']}") -print(f"总命中率: {stats['total_hits']}/{stats['total_requests']}") +if cache.backend_type == "memory": + print(f"L1: {stats['l1'].item_count}项, 命中率 {stats['l1'].hit_rate:.2%}") + print(f"L2: {stats['l2'].item_count}项, 命中率 {stats['l2'].hit_rate:.2%}") ``` +### Redis 缓存统计 + +```python +if cache.backend_type == "redis": + print(f"命中率: {stats['hit_rate']:.2%}") + print(f"键数量: {stats['key_count']}") +``` + +### 检查当前后端类型 + +```python +from src.common.database.optimization import get_cache_backend_type + +backend = get_cache_backend_type() # "memory" 或 "redis" +``` + +--- + ## 最佳实践 ### 1. 选择合适的 TTL @@ -150,9 +244,12 @@ print(f"总命中率: {stats['total_hits']}/{stats['total_requests']}") ### 4. 监控缓存效果 定期检查缓存统计: -- 命中率 > 70% - 缓存效果良好 -- 命中率 50-70% - 可以优化 TTL 或缓存策略 -- 命中率 < 50% - 考虑是否需要缓存该查询 + +- 命中率 > 70% - 缓存效果良好 ✅ +- 命中率 50-70% - 可以优化 TTL 或缓存策略 ⚠️ +- 命中率 < 50% - 考虑是否需要缓存该查询 ❌ + +--- ## 性能提升数据 @@ -166,16 +263,22 @@ print(f"总命中率: {stats['total_hits']}/{stats['total_requests']}") 1. **缓存一致性**: 更新数据后务必使缓存失效 2. **内存占用**: 监控缓存大小,避免占用过多内存 -3. **序列化**: 缓存的对象需要可序列化(SQLAlchemy 模型实例可能需要特殊处理) -4. **并发安全**: MultiLevelCache 是线程安全和协程安全的 +3. **序列化**: 缓存的对象需要可序列化 + - 内存缓存:直接存储 Python 对象 + - Redis 缓存:默认使用 JSON,复杂对象自动回退到 Pickle +4. **并发安全**: 两种后端都是协程安全的 +5. **无自动回退**: Redis 连接失败时会抛出异常,不会自动回退到内存缓存(确保配置正确) + +--- ## 故障排除 ### 缓存未生效 -1. 检查是否正确导入装饰器 -2. 确认 TTL 设置合理 -3. 查看日志中的 "缓存命中" 消息 +1. 检查 `enable_database_cache = true` +2. 检查是否正确导入装饰器 +3. 确认 TTL 设置合理 +4. 查看日志中的缓存消息 ### 数据不一致 @@ -183,14 +286,24 @@ print(f"总命中率: {stats['total_hits']}/{stats['total_requests']}") 2. 确认缓存键生成逻辑一致 3. 考虑缩短 TTL 时间 -### 内存占用过高 +### 内存占用过高(内存缓存) 1. 检查缓存统计中的项数 -2. 调整 L1/L2 缓存大小(在 cache_manager.py 中配置) +2. 调整 L1/L2 缓存大小 3. 缩短 TTL 加快驱逐 +### Redis 连接失败 + +1. 检查 Redis 服务是否运行 +2. 确认连接参数(host/port/password) +3. 检查防火墙/网络设置 +4. 查看日志中的错误信息 + +--- + ## 扩展阅读 -- [数据库优化指南](./database_optimization_guide.md) -- [多级缓存实现](../src/common/database/optimization/cache_manager.py) -- [装饰器文档](../src/common/database/utils/decorators.py) +- [缓存后端抽象](../src/common/database/optimization/cache_backend.py) +- [内存缓存实现](../src/common/database/optimization/cache_manager.py) +- [Redis 缓存实现](../src/common/database/optimization/redis_cache.py) +- [缓存装饰器](../src/common/database/utils/decorators.py) diff --git a/requirements.txt b/requirements.txt index 2b91df142..0641fc92a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -34,6 +34,7 @@ python-dateutil python-dotenv python-igraph pymongo +redis requests ruff scipy diff --git a/src/common/database/optimization/__init__.py b/src/common/database/optimization/__init__.py index 429f9cb78..245e83cc3 100644 --- a/src/common/database/optimization/__init__.py +++ b/src/common/database/optimization/__init__.py @@ -2,7 +2,7 @@ 职责: - 批量调度 -- 多级缓存 +- 多级缓存(内存缓存 + Redis缓存) - 数据预加载 """ @@ -14,6 +14,8 @@ from .batch_scheduler import ( close_batch_scheduler, get_batch_scheduler, ) +from .cache_backend import CacheBackend +from .cache_backend import CacheStats as BaseCacheStats from .cache_manager import ( CacheEntry, CacheStats, @@ -21,6 +23,7 @@ from .cache_manager import ( MultiLevelCache, close_cache, get_cache, + get_cache_backend_type, ) from .preloader import ( AccessPattern, @@ -29,26 +32,35 @@ from .preloader import ( close_preloader, get_preloader, ) +from .redis_cache import RedisCache, close_redis_cache, get_redis_cache __all__ = [ "AccessPattern", # Batch Scheduler "AdaptiveBatchScheduler", + "BaseCacheStats", "BatchOperation", "BatchStats", + # Cache Backend (Abstract) + "CacheBackend", "CacheEntry", "CacheStats", "CommonDataPreloader", # Preloader "DataPreloader", "LRUCache", - # Cache + # Memory Cache "MultiLevelCache", "Priority", + # Redis Cache + "RedisCache", "close_batch_scheduler", "close_cache", "close_preloader", + "close_redis_cache", "get_batch_scheduler", "get_cache", + "get_cache_backend_type", "get_preloader", + "get_redis_cache" ] diff --git a/src/common/database/optimization/cache_backend.py b/src/common/database/optimization/cache_backend.py new file mode 100644 index 000000000..673b9c825 --- /dev/null +++ b/src/common/database/optimization/cache_backend.py @@ -0,0 +1,210 @@ +"""缓存后端抽象基类 + +定义统一的缓存接口,支持多种缓存后端实现: +- MemoryCache: 内存多级缓存(L1 + L2) +- RedisCache: Redis 分布式缓存 +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any + + +@dataclass +class CacheStats: + """缓存统计信息 + + Attributes: + hits: 命中次数 + misses: 未命中次数 + evictions: 淘汰次数 + total_size: 总大小(字节) + item_count: 条目数量 + """ + + hits: int = 0 + misses: int = 0 + evictions: int = 0 + total_size: int = 0 + item_count: int = 0 + + @property + def hit_rate(self) -> float: + """命中率""" + total = self.hits + self.misses + return self.hits / total if total > 0 else 0.0 + + @property + def eviction_rate(self) -> float: + """淘汰率""" + return self.evictions / self.item_count if self.item_count > 0 else 0.0 + + +class CacheBackend(ABC): + """缓存后端抽象基类 + + 定义统一的缓存操作接口,所有缓存实现必须继承此类 + """ + + @abstractmethod + async def get(self, key: str) -> Any | None: + """从缓存获取数据 + + Args: + key: 缓存键 + + Returns: + 缓存值,如果不存在返回 None + """ + pass + + @abstractmethod + async def set( + self, + key: str, + value: Any, + ttl: float | None = None, + ) -> None: + """设置缓存值 + + Args: + key: 缓存键 + value: 缓存值 + ttl: 过期时间(秒),None 表示使用默认 TTL + """ + pass + + @abstractmethod + async def delete(self, key: str) -> bool: + """删除缓存条目 + + Args: + key: 缓存键 + + Returns: + 是否成功删除 + """ + pass + + @abstractmethod + async def exists(self, key: str) -> bool: + """检查键是否存在 + + Args: + key: 缓存键 + + Returns: + 键是否存在 + """ + pass + + @abstractmethod + async def clear(self) -> None: + """清空所有缓存""" + pass + + @abstractmethod + async def get_stats(self) -> dict[str, Any]: + """获取缓存统计信息 + + Returns: + 包含命中率、条目数等统计数据的字典 + """ + pass + + @abstractmethod + async def close(self) -> None: + """关闭缓存连接/清理资源""" + pass + + async def get_or_load( + self, + key: str, + loader: Any, + ttl: float | None = None, + ) -> Any | None: + """获取缓存或通过 loader 加载 + + Args: + key: 缓存键 + loader: 数据加载函数(同步或异步) + ttl: 过期时间(秒) + + Returns: + 缓存值或加载的值 + """ + import asyncio + + # 尝试从缓存获取 + value = await self.get(key) + if value is not None: + return value + + # 缓存未命中,使用 loader 加载 + if loader is not None: + if asyncio.iscoroutinefunction(loader): + value = await loader() + else: + value = loader() + + if value is not None: + await self.set(key, value, ttl=ttl) + + return value + + return None + + async def delete_pattern(self, pattern: str) -> int: + """删除匹配模式的所有键(可选实现) + + Args: + pattern: 键模式(支持 * 通配符) + + Returns: + 删除的键数量 + """ + # 默认实现:不支持模式删除 + raise NotImplementedError("此缓存后端不支持模式删除") + + async def mget(self, keys: list[str]) -> dict[str, Any]: + """批量获取多个键的值(可选实现) + + Args: + keys: 键列表 + + Returns: + 键值对字典,不存在的键不包含在结果中 + """ + # 默认实现:逐个获取 + result = {} + for key in keys: + value = await self.get(key) + if value is not None: + result[key] = value + return result + + async def mset( + self, + mapping: dict[str, Any], + ttl: float | None = None, + ) -> None: + """批量设置多个键值对(可选实现) + + Args: + mapping: 键值对字典 + ttl: 过期时间(秒) + """ + # 默认实现:逐个设置 + for key, value in mapping.items(): + await self.set(key, value, ttl=ttl) + + @property + @abstractmethod + def backend_type(self) -> str: + """返回缓存后端类型标识""" + pass + + @property + def is_distributed(self) -> bool: + """是否为分布式缓存(默认 False)""" + return False diff --git a/src/common/database/optimization/cache_manager.py b/src/common/database/optimization/cache_manager.py index 57ce4650c..e22797af1 100644 --- a/src/common/database/optimization/cache_manager.py +++ b/src/common/database/optimization/cache_manager.py @@ -6,6 +6,10 @@ - LRU淘汰策略:自动淘汰最少使用的数据 - 智能预热:启动时预加载高频数据 - 统计信息:命中率、淘汰率等监控数据 + +支持多种缓存后端: +- memory: 内存多级缓存(默认) +- redis: Redis 分布式缓存 """ import asyncio @@ -16,6 +20,7 @@ from collections.abc import Callable from dataclasses import dataclass from typing import Any, Generic, TypeVar +from src.common.database.optimization.cache_backend import CacheBackend from src.common.logger import get_logger from src.common.memory_utils import estimate_cache_item_size @@ -243,7 +248,7 @@ class LRUCache(Generic[T]): return 1024 -class MultiLevelCache: +class MultiLevelCache(CacheBackend): """多级缓存管理器 实现两级缓存架构: @@ -251,6 +256,8 @@ class MultiLevelCache: - L2: 扩展缓存,大容量,长TTL 查询时先查L1,未命中再查L2,未命中再从数据源加载 + + 实现 CacheBackend 接口,可与 Redis 缓存互换使用 """ def __init__( @@ -328,8 +335,8 @@ class MultiLevelCache: self, key: str, value: Any, - size: int | None = None, ttl: float | None = None, + size: int | None = None, ) -> None: """设置缓存值 @@ -338,8 +345,8 @@ class MultiLevelCache: Args: key: 缓存键 value: 缓存值 - size: 数据大小(字节) ttl: 自定义过期时间(秒),如果为None则使用默认TTL + size: 数据大小(字节) """ # 估算数据大小(如果未提供) if size is None: @@ -372,16 +379,53 @@ class MultiLevelCache: await self.l1_cache.set(key, value, size) await self.l2_cache.set(key, value, size) - async def delete(self, key: str) -> None: + async def delete(self, key: str) -> bool: """删除缓存条目 同时从L1和L2删除 Args: key: 缓存键 + + Returns: + 是否有条目被删除 """ - await self.l1_cache.delete(key) - await self.l2_cache.delete(key) + l1_deleted = await self.l1_cache.delete(key) + l2_deleted = await self.l2_cache.delete(key) + return l1_deleted or l2_deleted + + async def exists(self, key: str) -> bool: + """检查键是否存在于缓存中 + + Args: + key: 缓存键 + + Returns: + 键是否存在 + """ + # 检查 L1 + if await self.l1_cache.get(key) is not None: + return True + # 检查 L2 + if await self.l2_cache.get(key) is not None: + return True + return False + + async def close(self) -> None: + """关闭缓存(停止清理任务并清空)""" + await self.stop_cleanup_task() + await self.clear() + logger.info("多级缓存已关闭") + + @property + def backend_type(self) -> str: + """返回缓存后端类型标识""" + return "memory" + + @property + def is_distributed(self) -> bool: + """内存缓存不是分布式的""" + return False async def clear(self) -> None: """清空所有缓存""" @@ -440,8 +484,8 @@ class MultiLevelCache: # 计算共享键和独占键 shared_keys = l1_keys & l2_keys - l1_keys - l2_keys - l2_keys - l1_keys + l1_only_keys = l1_keys - l2_keys # noqa: F841 + l2_only_keys = l2_keys - l1_keys # noqa: F841 # 🔧 修复:并行计算内存使用,避免锁嵌套 l1_size_task = asyncio.create_task(self._calculate_memory_usage_safe(self.l1_cache, l1_keys)) @@ -749,18 +793,22 @@ class MultiLevelCache: return cleaned_count -# 全局缓存实例 -_global_cache: MultiLevelCache | None = None +# 全局缓存实例(支持多种后端类型) +_global_cache: CacheBackend | None = None _cache_lock = asyncio.Lock() +_cache_backend_type: str = "memory" # 记录当前使用的后端类型 -async def get_cache() -> MultiLevelCache: +async def get_cache() -> CacheBackend: """获取全局缓存实例(单例) - 从配置文件读取缓存参数,如果配置未加载则使用默认值 - 如果配置中禁用了缓存,返回一个最小化的缓存实例(容量为1) + 根据配置自动选择缓存后端: + - cache_backend = "memory": 使用内存多级缓存(默认) + - cache_backend = "redis": 使用 Redis 分布式缓存 + + 如果配置中禁用了缓存,返回一个最小化的缓存实例 """ - global _global_cache + global _global_cache, _cache_backend_type if _global_cache is None: async with _cache_lock: @@ -774,7 +822,7 @@ async def get_cache() -> MultiLevelCache: # 检查是否启用缓存 if not db_config.enable_database_cache: - logger.info("数据库缓存已禁用,使用最小化缓存实例") + logger.info("数据库缓存已禁用,使用最小化内存缓存实例") _global_cache = MultiLevelCache( l1_max_size=1, l1_ttl=1, @@ -782,51 +830,109 @@ async def get_cache() -> MultiLevelCache: l2_ttl=1, max_memory_mb=1, ) + _cache_backend_type = "memory" return _global_cache - l1_max_size = db_config.cache_l1_max_size - l1_ttl = db_config.cache_l1_ttl - l2_max_size = db_config.cache_l2_max_size - l2_ttl = db_config.cache_l2_ttl - max_memory_mb = db_config.cache_max_memory_mb - max_item_size_mb = db_config.cache_max_item_size_mb - cleanup_interval = db_config.cache_cleanup_interval + # 根据配置选择缓存后端 + backend = db_config.cache_backend.lower() + _cache_backend_type = backend + + if backend == "redis": + # 使用 Redis 缓存 + _global_cache = await _create_redis_cache(db_config) + else: + # 默认使用内存缓存 + _global_cache = await _create_memory_cache(db_config) - logger.info( - f"从配置加载缓存参数: L1({l1_max_size}/{l1_ttl}s), " - f"L2({l2_max_size}/{l2_ttl}s), 内存限制({max_memory_mb}MB), " - f"单项限制({max_item_size_mb}MB)" - ) except Exception as e: - # 配置未加载,使用默认值 - logger.warning(f"无法从配置加载缓存参数,使用默认值: {e}") - l1_max_size = 1000 - l1_ttl = 60 - l2_max_size = 10000 - l2_ttl = 300 - max_memory_mb = 100 - max_item_size_mb = 1 - cleanup_interval = 60 - - _global_cache = MultiLevelCache( - l1_max_size=l1_max_size, - l1_ttl=l1_ttl, - l2_max_size=l2_max_size, - l2_ttl=l2_ttl, - max_memory_mb=max_memory_mb, - max_item_size_mb=max_item_size_mb, - ) - await _global_cache.start_cleanup_task(interval=cleanup_interval) + # 配置未加载,使用默认内存缓存 + logger.warning(f"无法从配置加载缓存参数,使用默认内存缓存: {e}") + _global_cache = MultiLevelCache() + _cache_backend_type = "memory" + await _global_cache.start_cleanup_task(interval=60) return _global_cache +async def _create_memory_cache(db_config: Any) -> MultiLevelCache: + """创建内存多级缓存""" + l1_max_size = db_config.cache_l1_max_size + l1_ttl = db_config.cache_l1_ttl + l2_max_size = db_config.cache_l2_max_size + l2_ttl = db_config.cache_l2_ttl + max_memory_mb = db_config.cache_max_memory_mb + max_item_size_mb = db_config.cache_max_item_size_mb + cleanup_interval = db_config.cache_cleanup_interval + + logger.info( + f"创建内存缓存: L1({l1_max_size}/{l1_ttl}s), " + f"L2({l2_max_size}/{l2_ttl}s), 内存限制({max_memory_mb}MB)" + ) + + cache = MultiLevelCache( + l1_max_size=l1_max_size, + l1_ttl=l1_ttl, + l2_max_size=l2_max_size, + l2_ttl=l2_ttl, + max_memory_mb=max_memory_mb, + max_item_size_mb=max_item_size_mb, + ) + await cache.start_cleanup_task(interval=cleanup_interval) + return cache + + +async def _create_redis_cache(db_config: Any) -> CacheBackend: + """创建 Redis 缓存 + + Raises: + RuntimeError: Redis 连接失败时抛出异常 + """ + from src.common.database.optimization.redis_cache import RedisCache + + logger.info( + f"创建 Redis 缓存: {db_config.redis_host}:{db_config.redis_port}/{db_config.redis_db}, " + f"前缀={db_config.redis_key_prefix}, TTL={db_config.redis_default_ttl}s" + ) + + cache = RedisCache( + host=db_config.redis_host, + port=db_config.redis_port, + password=db_config.redis_password or None, + db=db_config.redis_db, + key_prefix=db_config.redis_key_prefix, + default_ttl=db_config.redis_default_ttl, + pool_size=db_config.redis_connection_pool_size, + socket_timeout=db_config.redis_socket_timeout, + ssl=db_config.redis_ssl, + ) + + # 测试连接 + if await cache.health_check(): + logger.info("Redis 缓存连接成功") + return cache + else: + await cache.close() + raise RuntimeError( + f"Redis 连接测试失败: {db_config.redis_host}:{db_config.redis_port}," + "请检查 Redis 服务是否运行,或将 cache_backend 改为 'memory'" + ) + + +def get_cache_backend_type() -> str: + """获取当前使用的缓存后端类型 + + Returns: + "memory" 或 "redis" + """ + return _cache_backend_type + + async def close_cache() -> None: """关闭全局缓存""" - global _global_cache + global _global_cache, _cache_backend_type if _global_cache is not None: - await _global_cache.stop_cleanup_task() - await _global_cache.clear() + await _global_cache.close() + logger.info(f"全局缓存已关闭 (后端: {_cache_backend_type})") _global_cache = None - logger.info("全局缓存已关闭") + _cache_backend_type = "memory" diff --git a/src/common/database/optimization/redis_cache.py b/src/common/database/optimization/redis_cache.py new file mode 100644 index 000000000..8dd00a44c --- /dev/null +++ b/src/common/database/optimization/redis_cache.py @@ -0,0 +1,554 @@ +"""Redis 缓存后端实现 + +基于 redis-py 的异步 Redis 缓存实现,支持: +- 异步连接池 +- 自动序列化/反序列化 +- TTL 过期管理 +- 模式删除 +- 批量操作 +- 统计信息 +""" + +import asyncio +import json +import pickle +from typing import Any + +from src.common.database.optimization.cache_backend import CacheBackend, CacheStats +from src.common.logger import get_logger + +logger = get_logger("redis_cache") + +import redis.asyncio as aioredis + + +class RedisCache(CacheBackend): + """Redis 缓存后端 + + 特性: + - 分布式缓存:支持多实例共享 + - 自动序列化:支持 JSON 和 Pickle + - TTL 管理:Redis 原生过期机制 + - 模式删除:支持通配符删除 + - 连接池:高效连接复用 + """ + + def __init__( + self, + host: str = "localhost", + port: int = 6379, + password: str | None = None, + db: int = 0, + key_prefix: str = "mofox:", + default_ttl: int = 600, + pool_size: int = 10, + socket_timeout: float = 5.0, + ssl: bool = False, + serializer: str = "json", # "json" 或 "pickle" + ): + """初始化 Redis 缓存 + + Args: + host: Redis 服务器地址 + port: Redis 服务器端口 + password: Redis 密码(可选) + db: Redis 数据库编号 + key_prefix: 缓存键前缀 + default_ttl: 默认过期时间(秒) + pool_size: 连接池大小 + socket_timeout: socket 超时时间(秒) + ssl: 是否启用 SSL + serializer: 序列化方式(json 或 pickle) + """ + + self.host = host + self.port = port + self.password = password if password else None + self.db = db + self.key_prefix = key_prefix + self.default_ttl = default_ttl + self.pool_size = pool_size + self.socket_timeout = socket_timeout + self.ssl = ssl + self.serializer = serializer + + # 连接池和客户端(延迟初始化) + self._pool: Any = None + self._client: Any = None + self._lock = asyncio.Lock() + self._is_closing = False + + # 统计信息 + self._stats = CacheStats() + self._stats_lock = asyncio.Lock() + + logger.info( + f"Redis 缓存初始化: {host}:{port}/{db}, " + f"前缀={key_prefix}, TTL={default_ttl}s, " + f"序列化={serializer}" + ) + + async def _ensure_connection(self) -> Any: + """确保 Redis 连接已建立""" + if self._client is not None: + return self._client + + async with self._lock: + if self._client is not None: + return self._client + + try: + # 创建连接池 (使用 aioredis 模块确保类型安全) + self._pool = aioredis.ConnectionPool( + host=self.host, + port=self.port, + password=self.password, + db=self.db, + max_connections=self.pool_size, + socket_timeout=self.socket_timeout, + socket_connect_timeout=self.socket_timeout, + decode_responses=False, # 我们自己处理序列化 + ssl=self.ssl, + ) + + # 创建客户端 + self._client = aioredis.Redis(connection_pool=self._pool) + + # 测试连接 + await self._client.ping() + logger.info(f"Redis 连接成功: {self.host}:{self.port}/{self.db}") + + return self._client + + except Exception as e: + logger.error(f"Redis 连接失败: {e}") + self._client = None + self._pool = None + raise + + def _make_key(self, key: str) -> str: + """生成带前缀的完整键名""" + return f"{self.key_prefix}{key}" + + def _serialize(self, value: Any) -> bytes: + """序列化值""" + if self.serializer == "json": + try: + return json.dumps(value, ensure_ascii=False, default=str).encode("utf-8") + except (TypeError, ValueError): + # JSON 序列化失败,回退到 pickle + return pickle.dumps(value, protocol=pickle.HIGHEST_PROTOCOL) + else: + return pickle.dumps(value, protocol=pickle.HIGHEST_PROTOCOL) + + def _deserialize(self, data: bytes) -> Any: + """反序列化值""" + if self.serializer == "json": + try: + return json.loads(data.decode("utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError): + # JSON 反序列化失败,尝试 pickle + try: + return pickle.loads(data) + except Exception: + return None + else: + try: + return pickle.loads(data) + except Exception: + return None + + async def get(self, key: str) -> Any | None: + """从缓存获取数据""" + try: + client = await self._ensure_connection() + full_key = self._make_key(key) + + data = await client.get(full_key) + + async with self._stats_lock: + if data is not None: + self._stats.hits += 1 + return self._deserialize(data) + else: + self._stats.misses += 1 + return None + + except Exception as e: + logger.error(f"Redis GET 失败 [{key}]: {e}") + async with self._stats_lock: + self._stats.misses += 1 + return None + + async def set( + self, + key: str, + value: Any, + ttl: float | None = None, + ) -> None: + """设置缓存值""" + try: + client = await self._ensure_connection() + full_key = self._make_key(key) + data = self._serialize(value) + + # 使用 TTL + expire_time = int(ttl) if ttl is not None else self.default_ttl + + await client.setex(full_key, expire_time, data) + + logger.debug(f"Redis SET: {key} (TTL={expire_time}s)") + + except Exception as e: + logger.error(f"Redis SET 失败 [{key}]: {e}") + + async def delete(self, key: str) -> bool: + """删除缓存条目""" + try: + client = await self._ensure_connection() + full_key = self._make_key(key) + + result = await client.delete(full_key) + + if result > 0: + async with self._stats_lock: + self._stats.evictions += 1 + logger.debug(f"Redis DEL: {key}") + return True + return False + + except Exception as e: + logger.error(f"Redis DEL 失败 [{key}]: {e}") + return False + + async def exists(self, key: str) -> bool: + """检查键是否存在""" + try: + client = await self._ensure_connection() + full_key = self._make_key(key) + return bool(await client.exists(full_key)) + except Exception as e: + logger.error(f"Redis EXISTS 失败 [{key}]: {e}") + return False + + async def clear(self) -> None: + """清空所有带前缀的缓存""" + try: + client = await self._ensure_connection() + pattern = self._make_key("*") + + # 使用 SCAN 避免阻塞 + cursor = 0 + deleted_count = 0 + + while True: + cursor, keys = await client.scan(cursor, match=pattern, count=100) + if keys: + await client.delete(*keys) + deleted_count += len(keys) + + if cursor == 0: + break + + async with self._stats_lock: + self._stats = CacheStats() + + logger.info(f"Redis 缓存已清空: 删除 {deleted_count} 个键") + + except Exception as e: + logger.error(f"Redis CLEAR 失败: {e}") + + async def delete_pattern(self, pattern: str) -> int: + """删除匹配模式的所有键 + + Args: + pattern: 键模式(支持 * 通配符) + + Returns: + 删除的键数量 + """ + try: + client = await self._ensure_connection() + full_pattern = self._make_key(pattern) + + # 使用 SCAN 避免阻塞 + cursor = 0 + deleted_count = 0 + + while True: + cursor, keys = await client.scan(cursor, match=full_pattern, count=100) + if keys: + await client.delete(*keys) + deleted_count += len(keys) + + if cursor == 0: + break + + async with self._stats_lock: + self._stats.evictions += deleted_count + + logger.debug(f"Redis 模式删除: {pattern} -> {deleted_count} 个键") + return deleted_count + + except Exception as e: + logger.error(f"Redis 模式删除失败 [{pattern}]: {e}") + return 0 + + async def mget(self, keys: list[str]) -> dict[str, Any]: + """批量获取多个键的值""" + if not keys: + return {} + + try: + client = await self._ensure_connection() + full_keys = [self._make_key(k) for k in keys] + + values = await client.mget(full_keys) + + result = {} + hits = 0 + misses = 0 + + for key, value in zip(keys, values): + if value is not None: + result[key] = self._deserialize(value) + hits += 1 + else: + misses += 1 + + async with self._stats_lock: + self._stats.hits += hits + self._stats.misses += misses + + return result + + except Exception as e: + logger.error(f"Redis MGET 失败: {e}") + return {} + + async def mset( + self, + mapping: dict[str, Any], + ttl: float | None = None, + ) -> None: + """批量设置多个键值对""" + if not mapping: + return + + try: + client = await self._ensure_connection() + expire_time = int(ttl) if ttl is not None else self.default_ttl + + # 使用 pipeline 提高效率 + async with client.pipeline(transaction=False) as pipe: + for key, value in mapping.items(): + full_key = self._make_key(key) + data = self._serialize(value) + pipe.setex(full_key, expire_time, data) + + await pipe.execute() + + logger.debug(f"Redis MSET: {len(mapping)} 个键") + + except Exception as e: + logger.error(f"Redis MSET 失败: {e}") + + async def get_stats(self) -> dict[str, Any]: + """获取缓存统计信息""" + try: + client = await self._ensure_connection() + + # 获取 Redis 服务器信息 + info = await client.info("memory") + # keyspace_info 可用于扩展统计, 暂时不获取避免开销 + # keyspace_info = await client.info("keyspace") + + # 统计带前缀的键数量 + pattern = self._make_key("*") + key_count = 0 + cursor = 0 + + while True: + cursor, keys = await client.scan(cursor, match=pattern, count=1000) + key_count += len(keys) + if cursor == 0: + break + + async with self._stats_lock: + return { + "backend": "redis", + "hits": self._stats.hits, + "misses": self._stats.misses, + "hit_rate": self._stats.hit_rate, + "evictions": self._stats.evictions, + "key_count": key_count, + "redis_memory_used_mb": info.get("used_memory", 0) / (1024 * 1024), + "redis_memory_peak_mb": info.get("used_memory_peak", 0) / (1024 * 1024), + "redis_connected_clients": info.get("connected_clients", 0), + "key_prefix": self.key_prefix, + "default_ttl": self.default_ttl, + } + + except Exception as e: + logger.error(f"获取 Redis 统计信息失败: {e}") + async with self._stats_lock: + return { + "backend": "redis", + "hits": self._stats.hits, + "misses": self._stats.misses, + "hit_rate": self._stats.hit_rate, + "evictions": self._stats.evictions, + "error": str(e), + } + + async def close(self) -> None: + """关闭 Redis 连接""" + self._is_closing = True + + if self._client is not None: + try: + await self._client.aclose() + logger.info("Redis 连接已关闭") + except Exception as e: + logger.error(f"关闭 Redis 连接失败: {e}") + finally: + self._client = None + self._pool = None + + @property + def backend_type(self) -> str: + """返回缓存后端类型标识""" + return "redis" + + @property + def is_distributed(self) -> bool: + """Redis 是分布式缓存""" + return True + + async def health_check(self) -> bool: + """健康检查""" + try: + client = await self._ensure_connection() + await client.ping() + return True + except Exception: + return False + + async def ttl(self, key: str) -> int: + """获取键的剩余 TTL + + Args: + key: 缓存键 + + Returns: + 剩余秒数,-1 表示无过期时间,-2 表示键不存在 + """ + try: + client = await self._ensure_connection() + full_key = self._make_key(key) + return await client.ttl(full_key) + except Exception as e: + logger.error(f"Redis TTL 失败 [{key}]: {e}") + return -2 + + async def expire(self, key: str, ttl: int) -> bool: + """更新键的 TTL + + Args: + key: 缓存键 + ttl: 新的过期时间(秒) + + Returns: + 是否成功 + """ + try: + client = await self._ensure_connection() + full_key = self._make_key(key) + return bool(await client.expire(full_key, ttl)) + except Exception as e: + logger.error(f"Redis EXPIRE 失败 [{key}]: {e}") + return False + + async def incr(self, key: str, amount: int = 1) -> int: + """原子递增 + + Args: + key: 缓存键 + amount: 递增量 + + Returns: + 递增后的值 + """ + try: + client = await self._ensure_connection() + full_key = self._make_key(key) + return await client.incrby(full_key, amount) + except Exception as e: + logger.error(f"Redis INCR 失败 [{key}]: {e}") + return 0 + + async def decr(self, key: str, amount: int = 1) -> int: + """原子递减 + + Args: + key: 缓存键 + amount: 递减量 + + Returns: + 递减后的值 + """ + try: + client = await self._ensure_connection() + full_key = self._make_key(key) + return await client.decrby(full_key, amount) + except Exception as e: + logger.error(f"Redis DECR 失败 [{key}]: {e}") + return 0 + + +# 全局 Redis 缓存实例 +_global_redis_cache: RedisCache | None = None +_redis_cache_lock = asyncio.Lock() + + +async def get_redis_cache() -> RedisCache: + """获取全局 Redis 缓存实例(单例)""" + global _global_redis_cache + + if _global_redis_cache is None: + async with _redis_cache_lock: + if _global_redis_cache is None: + # 从配置加载参数 + try: + from src.config.config import global_config + + assert global_config is not None + db_config = global_config.database + + _global_redis_cache = RedisCache( + host=db_config.redis_host, + port=db_config.redis_port, + password=db_config.redis_password or None, + db=db_config.redis_db, + key_prefix=db_config.redis_key_prefix, + default_ttl=db_config.redis_default_ttl, + pool_size=db_config.redis_connection_pool_size, + socket_timeout=db_config.redis_socket_timeout, + ssl=db_config.redis_ssl, + ) + except Exception as e: + logger.warning(f"无法从配置加载 Redis 参数,使用默认值: {e}") + _global_redis_cache = RedisCache() + + return _global_redis_cache + + +async def close_redis_cache() -> None: + """关闭全局 Redis 缓存""" + global _global_redis_cache + + if _global_redis_cache is not None: + await _global_redis_cache.close() + _global_redis_cache = None + logger.info("全局 Redis 缓存已关闭") + diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 297bc8b9b..9fa1da378 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -44,6 +44,12 @@ class DatabaseConfig(ValidatedConfigBase): # 数据库缓存配置 enable_database_cache: bool = Field(default=True, description="是否启用数据库查询缓存系统") + cache_backend: str = Field( + default="memory", + description="缓存后端类型: memory(内存缓存) 或 redis(Redis缓存)", + ) + + # 内存缓存配置 (cache_backend = "memory" 时生效) cache_l1_max_size: int = Field(default=1000, ge=100, le=50000, description="L1缓存最大条目数(热数据,内存占用约1-5MB)") cache_l1_ttl: int = Field(default=300, ge=10, le=3600, description="L1缓存生存时间(秒)") cache_l2_max_size: int = Field(default=10000, ge=1000, le=100000, description="L2缓存最大条目数(温数据,内存占用约10-50MB)") @@ -52,6 +58,17 @@ class DatabaseConfig(ValidatedConfigBase): cache_max_memory_mb: int = Field(default=100, ge=10, le=1000, description="缓存最大内存占用(MB),超过此值将触发强制清理") cache_max_item_size_mb: int = Field(default=1, ge=1, le=100, description="单个缓存条目最大大小(MB),超过此值将不缓存") + # Redis缓存配置 (cache_backend = "redis" 时生效) + redis_host: str = Field(default="localhost", description="Redis服务器地址") + redis_port: int = Field(default=6379, ge=1, le=65535, description="Redis服务器端口") + redis_password: str = Field(default="", description="Redis密码(可选)") + redis_db: int = Field(default=0, ge=0, le=15, description="Redis数据库编号") + redis_key_prefix: str = Field(default="mofox:", description="Redis缓存键前缀") + redis_default_ttl: int = Field(default=600, ge=60, le=86400, description="Redis默认缓存过期时间(秒)") + redis_connection_pool_size: int = Field(default=10, ge=1, le=100, description="Redis连接池大小") + redis_socket_timeout: float = Field(default=5.0, ge=1.0, le=30.0, description="Redis socket超时时间(秒)") + redis_ssl: bool = Field(default=False, description="是否启用Redis SSL连接") + class BotConfig(ValidatedConfigBase): """QQ机器人配置类""" diff --git a/template/bot_config_template.toml b/template/bot_config_template.toml index 110115ae5..91d478d64 100644 --- a/template/bot_config_template.toml +++ b/template/bot_config_template.toml @@ -38,8 +38,11 @@ connection_timeout = 10 # 连接超时时间(秒) # 批量动作记录存储配置 batch_action_storage_enabled = true # 是否启用批量保存动作记录(开启后将多个动作一次性写入数据库,提升性能) -# 数据库缓存配置(防止内存溢出) +# 数据库缓存配置 enable_database_cache = true # 是否启用数据库查询缓存系统 +cache_backend = "memory" # 缓存后端类型: "memory"(内存缓存) 或 "redis"(Redis缓存) + +# 内存缓存配置(cache_backend = "memory" 时生效) cache_l1_max_size = 1000 # L1缓存最大条目数(热数据,内存占用约1-5MB) cache_l1_ttl = 300 # L1缓存生存时间(秒) cache_l2_max_size = 10000 # L2缓存最大条目数(温数据,内存占用约10-50MB) @@ -48,6 +51,17 @@ cache_cleanup_interval = 60 # 缓存清理任务执行间隔(秒) cache_max_memory_mb = 10 # 缓存最大内存占用(MB),超过此值将触发强制清理 cache_max_item_size_mb = 1 # 单个缓存条目最大大小(MB),超过此值将不缓存 +# Redis缓存配置(cache_backend = "redis" 时生效) +redis_host = "localhost" # Redis服务器地址 +redis_port = 6379 # Redis服务器端口 +redis_password = "" # Redis密码(留空表示无密码) +redis_db = 0 # Redis数据库编号 (0-15) +redis_key_prefix = "mofox:" # Redis缓存键前缀(用于区分不同应用) +redis_default_ttl = 600 # Redis默认缓存过期时间(秒) +redis_connection_pool_size = 10 # Redis连接池大小 +redis_socket_timeout = 5.0 # Redis socket超时时间(秒) +redis_ssl = false # 是否启用Redis SSL连接 + [permission] # 权限系统配置 # Master用户配置(拥有最高权限,无视所有权限节点) # 格式:[[platform, user_id], ...]