merge: 合并feature/database-refactoring的关键bug修复和性能优化

This commit is contained in:
Windpicker-owo
2025-11-01 17:47:28 +08:00
3 changed files with 84 additions and 36 deletions

View File

@@ -99,37 +99,55 @@ class MessageStorageBatcher:
success_count = 0 success_count = 0
try: try:
# 准备所有消息对象 # 🔧 优化准备字典数据而不是ORM对象使用批量INSERT
messages_objects = [] messages_dicts = []
for msg_data in messages_to_store: for msg_data in messages_to_store:
try: try:
message_obj = await self._prepare_message_object( message_dict = await self._prepare_message_dict(
msg_data['message'], msg_data['message'],
msg_data['chat_stream'] msg_data['chat_stream']
) )
if message_obj: if message_dict:
messages_objects.append(message_obj) messages_dicts.append(message_dict)
except Exception as e: except Exception as e:
logger.error(f"准备消息对象失败: {e}") logger.error(f"准备消息数据失败: {e}")
continue continue
# 批量写入数据库 # 批量写入数据库 - 使用高效的批量INSERT
if messages_objects: if messages_dicts:
from sqlalchemy import insert
async with get_db_session() as session: async with get_db_session() as session:
session.add_all(messages_objects) stmt = insert(Messages).values(messages_dicts)
await session.execute(stmt)
await session.commit() await session.commit()
success_count = len(messages_objects) success_count = len(messages_dicts)
elapsed = time.time() - start_time elapsed = time.time() - start_time
logger.info( logger.info(
f"批量存储了 {success_count}/{len(messages_to_store)} 条消息 " f"批量存储了 {success_count}/{len(messages_to_store)} 条消息 "
f"(耗时: {elapsed:.3f}秒)" f"(耗时: {elapsed:.3f}, 平均 {elapsed/max(success_count,1)*1000:.2f}ms/条)"
) )
except Exception as e: except Exception as e:
logger.error(f"批量存储消息失败: {e}", exc_info=True) logger.error(f"批量存储消息失败: {e}", exc_info=True)
async def _prepare_message_dict(self, message, chat_stream):
"""准备消息字典数据用于批量INSERT
这个方法准备字典而不是ORM对象性能更高
"""
message_obj = await self._prepare_message_object(message, chat_stream)
if message_obj is None:
return None
# 将ORM对象转换为字典只包含列字段
message_dict = {}
for column in Messages.__table__.columns:
message_dict[column.name] = getattr(message_obj, column.name)
return message_dict
async def _prepare_message_object(self, message, chat_stream): async def _prepare_message_object(self, message, chat_stream):
"""准备消息对象(从原 store_message 逻辑提取)""" """准备消息对象(从原 store_message 逻辑提取)"""
try: try:

View File

@@ -266,7 +266,14 @@ class CRUDBase:
await session.refresh(instance) await session.refresh(instance)
# 注意commit在get_db_session的context manager退出时自动执行 # 注意commit在get_db_session的context manager退出时自动执行
# 但为了明确性这里不需要显式commit # 但为了明确性这里不需要显式commit
return instance
# 注意create不清除缓存因为
# 1. 新记录不会影响已有的单条查询缓存get/get_by
# 2. get_multi的缓存会自然过期TTL机制
# 3. 清除所有缓存代价太大,影响性能
# 如果需要强一致性应该在查询时设置use_cache=False
return instance
async def update( async def update(
self, self,
@@ -459,8 +466,15 @@ class CRUDBase:
for instance in instances: for instance in instances:
await session.refresh(instance) await session.refresh(instance)
return instances # 批量创建的缓存策略:
# bulk_create通常用于批量导入场景此时清除缓存是合理的
# 因为可能创建大量记录,缓存的列表查询会明显过期
cache = await get_cache()
await cache.clear()
logger.info(f"批量创建{len(instances)}{self.model_name}记录后已清除缓存")
return instances
async def bulk_update( async def bulk_update(
self, self,

View File

@@ -393,8 +393,10 @@ class AdaptiveBatchScheduler:
) -> None: ) -> None:
"""批量执行更新操作""" """批量执行更新操作"""
async with get_db_session() as session: async with get_db_session() as session:
for op in operations: results = []
try: try:
# 🔧 修复收集所有操作后一次性commit而不是循环中多次commit
for op in operations:
# 构建更新语句 # 构建更新语句
stmt = update(op.model_class) stmt = update(op.model_class)
for key, value in op.conditions.items(): for key, value in op.conditions.items():
@@ -404,23 +406,29 @@ class AdaptiveBatchScheduler:
if op.data: if op.data:
stmt = stmt.values(**op.data) stmt = stmt.values(**op.data)
# 执行更新 # 执行更新但不commit
result = await session.execute(stmt) result = await session.execute(stmt)
await session.commit() results.append((op, result.rowcount))
# 设置结果 # 所有操作成功后一次性commit
await session.commit()
# 设置所有操作的结果
for op, rowcount in results:
if op.future and not op.future.done(): if op.future and not op.future.done():
op.future.set_result(result.rowcount) op.future.set_result(rowcount)
if op.callback: if op.callback:
try: try:
op.callback(result.rowcount) op.callback(rowcount)
except Exception as e: except Exception as e:
logger.warning(f"回调执行失败: {e}") logger.warning(f"回调执行失败: {e}")
except Exception as e: except Exception as e:
logger.error(f"更新失败: {e}", exc_info=True) logger.error(f"批量更新失败: {e}", exc_info=True)
await session.rollback() await session.rollback()
# 所有操作都失败
for op in operations:
if op.future and not op.future.done(): if op.future and not op.future.done():
op.future.set_exception(e) op.future.set_exception(e)
@@ -430,31 +438,39 @@ class AdaptiveBatchScheduler:
) -> None: ) -> None:
"""批量执行删除操作""" """批量执行删除操作"""
async with get_db_session() as session: async with get_db_session() as session:
for op in operations: results = []
try: try:
# 🔧 修复收集所有操作后一次性commit而不是循环中多次commit
for op in operations:
# 构建删除语句 # 构建删除语句
stmt = delete(op.model_class) stmt = delete(op.model_class)
for key, value in op.conditions.items(): for key, value in op.conditions.items():
attr = getattr(op.model_class, key) attr = getattr(op.model_class, key)
stmt = stmt.where(attr == value) stmt = stmt.where(attr == value)
# 执行删除 # 执行删除但不commit
result = await session.execute(stmt) result = await session.execute(stmt)
await session.commit() results.append((op, result.rowcount))
# 设置结果 # 所有操作成功后一次性commit
await session.commit()
# 设置所有操作的结果
for op, rowcount in results:
if op.future and not op.future.done(): if op.future and not op.future.done():
op.future.set_result(result.rowcount) op.future.set_result(rowcount)
if op.callback: if op.callback:
try: try:
op.callback(result.rowcount) op.callback(rowcount)
except Exception as e: except Exception as e:
logger.warning(f"回调执行失败: {e}") logger.warning(f"回调执行失败: {e}")
except Exception as e: except Exception as e:
logger.error(f"删除失败: {e}", exc_info=True) logger.error(f"批量删除失败: {e}", exc_info=True)
await session.rollback() await session.rollback()
# 所有操作都失败
for op in operations:
if op.future and not op.future.done(): if op.future and not op.future.done():
op.future.set_exception(e) op.future.set_exception(e)