feat: 实现方案A - 缓存字典而非SQLAlchemy对象

核心改进:
- 添加 _model_to_dict() 和 _dict_to_model() 辅助函数
- CRUD.get/get_by/get_multi 现在缓存字典而非对象
- QueryBuilder.first/all 现在缓存字典而非对象
- 从缓存恢复时重建detached对象,所有字段已加载

优势:
- 彻底避免'not bound to Session'错误
- 缓存数据独立于Session生命周期
- 对象反序列化后所有字段可直接访问
- 提高缓存可靠性和数据可用性

技术细节:
- 缓存层存储纯字典数据(可序列化)
- 查询时在session内预加载所有列
- 返回前转换为字典并缓存
- 缓存命中时从字典重建对象
- 重建的对象虽然detached但所有字段已填充
This commit is contained in:
Windpicker-owo
2025-11-01 16:50:50 +08:00
parent 87dc72c837
commit d187174353
2 changed files with 91 additions and 33 deletions

View File

@@ -10,6 +10,7 @@ from typing import Any, Optional, Type, TypeVar
from sqlalchemy import and_, delete, func, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.inspection import inspect
from src.common.database.core.models import Base
from src.common.database.core.session import get_db_session
@@ -27,6 +28,42 @@ logger = get_logger("database.crud")
T = TypeVar("T", bound=Base)
def _model_to_dict(instance: Base) -> dict[str, Any]:
"""将 SQLAlchemy 模型实例转换为字典
Args:
instance: SQLAlchemy 模型实例
Returns:
字典表示,包含所有列的值
"""
result = {}
for column in instance.__table__.columns:
try:
result[column.name] = getattr(instance, column.name)
except Exception as e:
logger.warning(f"无法访问字段 {column.name}: {e}")
result[column.name] = None
return result
def _dict_to_model(model_class: Type[T], data: dict[str, Any]) -> T:
"""从字典创建 SQLAlchemy 模型实例 (detached状态)
Args:
model_class: SQLAlchemy 模型类
data: 字典数据
Returns:
模型实例 (detached, 所有字段已加载)
"""
instance = model_class()
for key, value in data.items():
if hasattr(instance, key):
setattr(instance, key, value)
return instance
class CRUDBase:
"""基础CRUD操作类
@@ -58,13 +95,14 @@ class CRUDBase:
"""
cache_key = f"{self.model_name}:id:{id}"
# 尝试从缓存获取
# 尝试从缓存获取 (缓存的是字典)
if use_cache:
cache = await get_cache()
cached = await cache.get(cache_key)
if cached is not None:
cached_dict = await cache.get(cache_key)
if cached_dict is not None:
logger.debug(f"缓存命中: {cache_key}")
return cached
# 从字典恢复对象
return _dict_to_model(self.model, cached_dict)
# 从数据库查询
async with get_db_session() as session:
@@ -72,10 +110,19 @@ class CRUDBase:
result = await session.execute(stmt)
instance = result.scalar_one_or_none()
# 写入缓存
if instance is not None and use_cache:
cache = await get_cache()
await cache.set(cache_key, instance)
if instance is not None:
# 预加载所有字段
for column in self.model.__table__.columns:
try:
getattr(instance, column.name)
except Exception:
pass
# 转换为字典并写入缓存
if use_cache:
instance_dict = _model_to_dict(instance)
cache = await get_cache()
await cache.set(cache_key, instance_dict)
return instance
@@ -95,13 +142,14 @@ class CRUDBase:
"""
cache_key = f"{self.model_name}:filter:{str(sorted(filters.items()))}"
# 尝试从缓存获取
# 尝试从缓存获取 (缓存的是字典)
if use_cache:
cache = await get_cache()
cached = await cache.get(cache_key)
if cached is not None:
cached_dict = await cache.get(cache_key)
if cached_dict is not None:
logger.debug(f"缓存命中: {cache_key}")
return cached
# 从字典恢复对象
return _dict_to_model(self.model, cached_dict)
# 从数据库查询
async with get_db_session() as session:
@@ -122,10 +170,11 @@ class CRUDBase:
except Exception:
pass # 忽略访问错误
# 写入缓存
# 转换为字典并写入缓存
if use_cache:
instance_dict = _model_to_dict(instance)
cache = await get_cache()
await cache.set(cache_key, instance)
await cache.set(cache_key, instance_dict)
return instance
@@ -149,13 +198,14 @@ class CRUDBase:
"""
cache_key = f"{self.model_name}:multi:{skip}:{limit}:{str(sorted(filters.items()))}"
# 尝试从缓存获取
# 尝试从缓存获取 (缓存的是字典列表)
if use_cache:
cache = await get_cache()
cached = await cache.get(cache_key)
if cached is not None:
cached_dicts = await cache.get(cache_key)
if cached_dicts is not None:
logger.debug(f"缓存命中: {cache_key}")
return cached
# 从字典列表恢复对象列表
return [_dict_to_model(self.model, d) for d in cached_dicts]
# 从数据库查询
async with get_db_session() as session:
@@ -173,7 +223,7 @@ class CRUDBase:
stmt = stmt.offset(skip).limit(limit)
result = await session.execute(stmt)
instances = result.scalars().all()
instances = list(result.scalars().all())
# 触发所有实例的列加载,避免 detached 后的延迟加载问题
for instance in instances:
@@ -183,10 +233,11 @@ class CRUDBase:
except Exception:
pass # 忽略访问错误
# 写入缓存
# 转换为字典列表并写入缓存
if use_cache:
instances_dicts = [_model_to_dict(inst) for inst in instances]
cache = await get_cache()
await cache.set(cache_key, instances)
await cache.set(cache_key, instances_dicts)
return instances

View File

@@ -18,6 +18,9 @@ from src.common.database.core.session import get_db_session
from src.common.database.optimization import get_cache, get_preloader
from src.common.logger import get_logger
# 导入 CRUD 辅助函数以避免重复定义
from src.common.database.api.crud import _dict_to_model, _model_to_dict
logger = get_logger("database.query")
T = TypeVar("T", bound="Base")
@@ -191,13 +194,14 @@ class QueryBuilder(Generic[T]):
"""
cache_key = ":".join(self._cache_key_parts) + ":all"
# 尝试从缓存获取
# 尝试从缓存获取 (缓存的是字典列表)
if self._use_cache:
cache = await get_cache()
cached = await cache.get(cache_key)
if cached is not None:
cached_dicts = await cache.get(cache_key)
if cached_dicts is not None:
logger.debug(f"缓存命中: {cache_key}")
return cached
# 从字典列表恢复对象列表
return [_dict_to_model(self.model, d) for d in cached_dicts]
# 从数据库查询
async with get_db_session() as session:
@@ -212,10 +216,11 @@ class QueryBuilder(Generic[T]):
except Exception:
pass
# 写入缓存
# 转换为字典列表并写入缓存
if self._use_cache:
instances_dicts = [_model_to_dict(inst) for inst in instances]
cache = await get_cache()
await cache.set(cache_key, instances)
await cache.set(cache_key, instances_dicts)
return instances
@@ -227,13 +232,14 @@ class QueryBuilder(Generic[T]):
"""
cache_key = ":".join(self._cache_key_parts) + ":first"
# 尝试从缓存获取
# 尝试从缓存获取 (缓存的是字典)
if self._use_cache:
cache = await get_cache()
cached = await cache.get(cache_key)
if cached is not None:
cached_dict = await cache.get(cache_key)
if cached_dict is not None:
logger.debug(f"缓存命中: {cache_key}")
return cached
# 从字典恢复对象
return _dict_to_model(self.model, cached_dict)
# 从数据库查询
async with get_db_session() as session:
@@ -248,10 +254,11 @@ class QueryBuilder(Generic[T]):
except Exception:
pass
# 写入缓存
# 转换为字典并写入缓存
if instance is not None and self._use_cache:
instance_dict = _model_to_dict(instance)
cache = await get_cache()
await cache.set(cache_key, instance)
await cache.set(cache_key, instance_dict)
return instance