diff --git a/src/common/database/api/crud.py b/src/common/database/api/crud.py index a82b2a3a5..8a9a75de6 100644 --- a/src/common/database/api/crud.py +++ b/src/common/database/api/crud.py @@ -266,7 +266,14 @@ class CRUDBase: await session.refresh(instance) # 注意:commit在get_db_session的context manager退出时自动执行 # 但为了明确性,这里不需要显式commit - return instance + + # 注意:create不清除缓存,因为: + # 1. 新记录不会影响已有的单条查询缓存(get/get_by) + # 2. get_multi的缓存会自然过期(TTL机制) + # 3. 清除所有缓存代价太大,影响性能 + # 如果需要强一致性,应该在查询时设置use_cache=False + + return instance async def update( self, @@ -459,8 +466,15 @@ class CRUDBase: for instance in instances: await session.refresh(instance) - - return instances + + # 批量创建的缓存策略: + # bulk_create通常用于批量导入场景,此时清除缓存是合理的 + # 因为可能创建大量记录,缓存的列表查询会明显过期 + cache = await get_cache() + await cache.clear() + logger.info(f"批量创建{len(instances)}条{self.model_name}记录后已清除缓存") + + return instances async def bulk_update( self, diff --git a/src/common/database/optimization/batch_scheduler.py b/src/common/database/optimization/batch_scheduler.py index e5d6bd23a..7498a7b16 100644 --- a/src/common/database/optimization/batch_scheduler.py +++ b/src/common/database/optimization/batch_scheduler.py @@ -393,8 +393,10 @@ class AdaptiveBatchScheduler: ) -> None: """批量执行更新操作""" async with get_db_session() as session: - for op in operations: - try: + results = [] + try: + # 🔧 修复:收集所有操作后一次性commit,而不是循环中多次commit + for op in operations: # 构建更新语句 stmt = update(op.model_class) for key, value in op.conditions.items(): @@ -404,23 +406,29 @@ class AdaptiveBatchScheduler: if op.data: stmt = stmt.values(**op.data) - # 执行更新 + # 执行更新(但不commit) result = await session.execute(stmt) - await session.commit() - - # 设置结果 + results.append((op, result.rowcount)) + + # 所有操作成功后,一次性commit + await session.commit() + + # 设置所有操作的结果 + for op, rowcount in results: if op.future and not op.future.done(): - op.future.set_result(result.rowcount) + op.future.set_result(rowcount) if op.callback: try: - op.callback(result.rowcount) + op.callback(rowcount) except Exception as e: logger.warning(f"回调执行失败: {e}") - except Exception as e: - logger.error(f"更新失败: {e}", exc_info=True) - await session.rollback() + except Exception as e: + logger.error(f"批量更新失败: {e}", exc_info=True) + await session.rollback() + # 所有操作都失败 + for op in operations: if op.future and not op.future.done(): op.future.set_exception(e) @@ -430,31 +438,39 @@ class AdaptiveBatchScheduler: ) -> None: """批量执行删除操作""" async with get_db_session() as session: - for op in operations: - try: + results = [] + try: + # 🔧 修复:收集所有操作后一次性commit,而不是循环中多次commit + for op in operations: # 构建删除语句 stmt = delete(op.model_class) for key, value in op.conditions.items(): attr = getattr(op.model_class, key) stmt = stmt.where(attr == value) - # 执行删除 + # 执行删除(但不commit) result = await session.execute(stmt) - await session.commit() - - # 设置结果 + results.append((op, result.rowcount)) + + # 所有操作成功后,一次性commit + await session.commit() + + # 设置所有操作的结果 + for op, rowcount in results: if op.future and not op.future.done(): - op.future.set_result(result.rowcount) + op.future.set_result(rowcount) if op.callback: try: - op.callback(result.rowcount) + op.callback(rowcount) except Exception as e: logger.warning(f"回调执行失败: {e}") - except Exception as e: - logger.error(f"删除失败: {e}", exc_info=True) - await session.rollback() + except Exception as e: + logger.error(f"批量删除失败: {e}", exc_info=True) + await session.rollback() + # 所有操作都失败 + for op in operations: if op.future and not op.future.done(): op.future.set_exception(e)