From 2aeb06f70894878b7bf076c04914e204ae07d7cc Mon Sep 17 00:00:00 2001 From: Windpicker-owo <3431391539@qq.com> Date: Sat, 1 Nov 2025 17:31:31 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E6=89=B9=E5=A4=84?= =?UTF-8?q?=E7=90=86=E4=B8=AD=E7=9A=84=E5=A4=9A=E6=AC=A1commit=E9=97=AE?= =?UTF-8?q?=E9=A2=98=EF=BC=8Cbulk=5Fcreate=E5=90=8E=E6=B8=85=E9=99=A4?= =?UTF-8?q?=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/database/api/crud.py | 20 ++++++- .../database/optimization/batch_scheduler.py | 60 ++++++++++++------- 2 files changed, 55 insertions(+), 25 deletions(-) diff --git a/src/common/database/api/crud.py b/src/common/database/api/crud.py index a82b2a3a5..8a9a75de6 100644 --- a/src/common/database/api/crud.py +++ b/src/common/database/api/crud.py @@ -266,7 +266,14 @@ class CRUDBase: await session.refresh(instance) # 注意:commit在get_db_session的context manager退出时自动执行 # 但为了明确性,这里不需要显式commit - return instance + + # 注意:create不清除缓存,因为: + # 1. 新记录不会影响已有的单条查询缓存(get/get_by) + # 2. get_multi的缓存会自然过期(TTL机制) + # 3. 清除所有缓存代价太大,影响性能 + # 如果需要强一致性,应该在查询时设置use_cache=False + + return instance async def update( self, @@ -459,8 +466,15 @@ class CRUDBase: for instance in instances: await session.refresh(instance) - - return instances + + # 批量创建的缓存策略: + # bulk_create通常用于批量导入场景,此时清除缓存是合理的 + # 因为可能创建大量记录,缓存的列表查询会明显过期 + cache = await get_cache() + await cache.clear() + logger.info(f"批量创建{len(instances)}条{self.model_name}记录后已清除缓存") + + return instances async def bulk_update( self, diff --git a/src/common/database/optimization/batch_scheduler.py b/src/common/database/optimization/batch_scheduler.py index e5d6bd23a..7498a7b16 100644 --- a/src/common/database/optimization/batch_scheduler.py +++ b/src/common/database/optimization/batch_scheduler.py @@ -393,8 +393,10 @@ class AdaptiveBatchScheduler: ) -> None: """批量执行更新操作""" async with get_db_session() as session: - for op in operations: - try: + results = [] + try: + # 🔧 修复:收集所有操作后一次性commit,而不是循环中多次commit + for op in operations: # 构建更新语句 stmt = update(op.model_class) for key, value in op.conditions.items(): @@ -404,23 +406,29 @@ class AdaptiveBatchScheduler: if op.data: stmt = stmt.values(**op.data) - # 执行更新 + # 执行更新(但不commit) result = await session.execute(stmt) - await session.commit() - - # 设置结果 + results.append((op, result.rowcount)) + + # 所有操作成功后,一次性commit + await session.commit() + + # 设置所有操作的结果 + for op, rowcount in results: if op.future and not op.future.done(): - op.future.set_result(result.rowcount) + op.future.set_result(rowcount) if op.callback: try: - op.callback(result.rowcount) + op.callback(rowcount) except Exception as e: logger.warning(f"回调执行失败: {e}") - except Exception as e: - logger.error(f"更新失败: {e}", exc_info=True) - await session.rollback() + except Exception as e: + logger.error(f"批量更新失败: {e}", exc_info=True) + await session.rollback() + # 所有操作都失败 + for op in operations: if op.future and not op.future.done(): op.future.set_exception(e) @@ -430,31 +438,39 @@ class AdaptiveBatchScheduler: ) -> None: """批量执行删除操作""" async with get_db_session() as session: - for op in operations: - try: + results = [] + try: + # 🔧 修复:收集所有操作后一次性commit,而不是循环中多次commit + for op in operations: # 构建删除语句 stmt = delete(op.model_class) for key, value in op.conditions.items(): attr = getattr(op.model_class, key) stmt = stmt.where(attr == value) - # 执行删除 + # 执行删除(但不commit) result = await session.execute(stmt) - await session.commit() - - # 设置结果 + results.append((op, result.rowcount)) + + # 所有操作成功后,一次性commit + await session.commit() + + # 设置所有操作的结果 + for op, rowcount in results: if op.future and not op.future.done(): - op.future.set_result(result.rowcount) + op.future.set_result(rowcount) if op.callback: try: - op.callback(result.rowcount) + op.callback(rowcount) except Exception as e: logger.warning(f"回调执行失败: {e}") - except Exception as e: - logger.error(f"删除失败: {e}", exc_info=True) - await session.rollback() + except Exception as e: + logger.error(f"批量删除失败: {e}", exc_info=True) + await session.rollback() + # 所有操作都失败 + for op in operations: if op.future and not op.future.done(): op.future.set_exception(e)