增强聊天管理器和数据库API,添加自动注册和异步清理功能,优化模型转换为字典的逻辑

This commit is contained in:
Windpicker-owo
2025-11-20 16:48:18 +08:00
parent 03c80a08fb
commit 6ecf5a36f2
5 changed files with 215 additions and 74 deletions

View File

@@ -6,6 +6,9 @@
- 智能预加载:关联数据自动预加载
"""
import operator
from collections.abc import Callable
from functools import lru_cache
from typing import Any, TypeVar
from sqlalchemy import delete, func, select, update
@@ -25,6 +28,43 @@ logger = get_logger("database.crud")
T = TypeVar("T", bound=Base)
@lru_cache(maxsize=256)
def _get_model_column_names(model: type[Base]) -> tuple[str, ...]:
"""获取模型的列名称列表"""
return tuple(column.name for column in model.__table__.columns)
@lru_cache(maxsize=256)
def _get_model_field_set(model: type[Base]) -> frozenset[str]:
"""获取模型的有效字段集合"""
return frozenset(_get_model_column_names(model))
@lru_cache(maxsize=256)
def _get_model_value_fetcher(model: type[Base]) -> Callable[[Base], tuple[Any, ...]]:
"""为模型准备attrgetter用于批量获取属性值"""
column_names = _get_model_column_names(model)
if not column_names:
return lambda _: ()
if len(column_names) == 1:
attr_name = column_names[0]
def _single(instance: Base) -> tuple[Any, ...]:
return (getattr(instance, attr_name),)
return _single
getter = operator.attrgetter(*column_names)
def _multi(instance: Base) -> tuple[Any, ...]:
values = getter(instance)
return values if isinstance(values, tuple) else (values,)
return _multi
def _model_to_dict(instance: Base) -> dict[str, Any]:
"""将 SQLAlchemy 模型实例转换为字典
@@ -32,16 +72,27 @@ def _model_to_dict(instance: Base) -> dict[str, Any]:
instance: SQLAlchemy 模型实例
Returns:
字典表示,包含所有列的
字典表示的模型实例的字段
"""
result = {}
for column in instance.__table__.columns:
try:
result[column.name] = getattr(instance, column.name)
except Exception as e:
logger.warning(f"无法访问字段 {column.name}: {e}")
result[column.name] = None
return result
if instance is None:
return {}
model = type(instance)
column_names = _get_model_column_names(model)
fetch_values = _get_model_value_fetcher(model)
try:
values = fetch_values(instance)
return dict(zip(column_names, values))
except Exception as exc:
logger.warning(f"无法转换模型 {model.__name__}: {exc}")
fallback = {}
for column in column_names:
try:
fallback[column] = getattr(instance, column)
except Exception:
fallback[column] = None
return fallback
def _dict_to_model(model_class: type[T], data: dict[str, Any]) -> T:
@@ -55,8 +106,9 @@ def _dict_to_model(model_class: type[T], data: dict[str, Any]) -> T:
模型实例 (detached, 所有字段已加载)
"""
instance = model_class()
valid_fields = _get_model_field_set(model_class)
for key, value in data.items():
if hasattr(instance, key):
if key in valid_fields:
setattr(instance, key, value)
return instance