fix: 修复批处理中的多次commit问题,bulk_create后清除缓存

This commit is contained in:
Windpicker-owo
2025-11-01 17:31:31 +08:00
parent 1857754bbe
commit f872a1ff82
2 changed files with 55 additions and 25 deletions

View File

@@ -393,8 +393,10 @@ class AdaptiveBatchScheduler:
) -> None:
"""批量执行更新操作"""
async with get_db_session() as session:
for op in operations:
try:
results = []
try:
# 🔧 修复收集所有操作后一次性commit而不是循环中多次commit
for op in operations:
# 构建更新语句
stmt = update(op.model_class)
for key, value in op.conditions.items():
@@ -404,23 +406,29 @@ class AdaptiveBatchScheduler:
if op.data:
stmt = stmt.values(**op.data)
# 执行更新
# 执行更新但不commit
result = await session.execute(stmt)
await session.commit()
# 设置结果
results.append((op, result.rowcount))
# 所有操作成功后一次性commit
await session.commit()
# 设置所有操作的结果
for op, rowcount in results:
if op.future and not op.future.done():
op.future.set_result(result.rowcount)
op.future.set_result(rowcount)
if op.callback:
try:
op.callback(result.rowcount)
op.callback(rowcount)
except Exception as e:
logger.warning(f"回调执行失败: {e}")
except Exception as e:
logger.error(f"更新失败: {e}", exc_info=True)
await session.rollback()
except Exception as e:
logger.error(f"批量更新失败: {e}", exc_info=True)
await session.rollback()
# 所有操作都失败
for op in operations:
if op.future and not op.future.done():
op.future.set_exception(e)
@@ -430,31 +438,39 @@ class AdaptiveBatchScheduler:
) -> None:
"""批量执行删除操作"""
async with get_db_session() as session:
for op in operations:
try:
results = []
try:
# 🔧 修复收集所有操作后一次性commit而不是循环中多次commit
for op in operations:
# 构建删除语句
stmt = delete(op.model_class)
for key, value in op.conditions.items():
attr = getattr(op.model_class, key)
stmt = stmt.where(attr == value)
# 执行删除
# 执行删除但不commit
result = await session.execute(stmt)
await session.commit()
# 设置结果
results.append((op, result.rowcount))
# 所有操作成功后一次性commit
await session.commit()
# 设置所有操作的结果
for op, rowcount in results:
if op.future and not op.future.done():
op.future.set_result(result.rowcount)
op.future.set_result(rowcount)
if op.callback:
try:
op.callback(result.rowcount)
op.callback(rowcount)
except Exception as e:
logger.warning(f"回调执行失败: {e}")
except Exception as e:
logger.error(f"删除失败: {e}", exc_info=True)
await session.rollback()
except Exception as e:
logger.error(f"批量删除失败: {e}", exc_info=True)
await session.rollback()
# 所有操作都失败
for op in operations:
if op.future and not op.future.done():
op.future.set_exception(e)