diff --git a/src/chat/message_receive/storage.py b/src/chat/message_receive/storage.py index 0fcfce989..e82715e7b 100644 --- a/src/chat/message_receive/storage.py +++ b/src/chat/message_receive/storage.py @@ -99,37 +99,55 @@ class MessageStorageBatcher: success_count = 0 try: - # 准备所有消息对象 - messages_objects = [] + # 🔧 优化:准备字典数据而不是ORM对象,使用批量INSERT + messages_dicts = [] for msg_data in messages_to_store: try: - message_obj = await self._prepare_message_object( + message_dict = await self._prepare_message_dict( msg_data['message'], msg_data['chat_stream'] ) - if message_obj: - messages_objects.append(message_obj) + if message_dict: + messages_dicts.append(message_dict) except Exception as e: - logger.error(f"准备消息对象失败: {e}") + logger.error(f"准备消息数据失败: {e}") continue - # 批量写入数据库 - if messages_objects: + # 批量写入数据库 - 使用高效的批量INSERT + if messages_dicts: + from sqlalchemy import insert async with get_db_session() as session: - session.add_all(messages_objects) + stmt = insert(Messages).values(messages_dicts) + await session.execute(stmt) await session.commit() - success_count = len(messages_objects) + success_count = len(messages_dicts) elapsed = time.time() - start_time logger.info( f"批量存储了 {success_count}/{len(messages_to_store)} 条消息 " - f"(耗时: {elapsed:.3f}秒)" + f"(耗时: {elapsed:.3f}秒, 平均 {elapsed/max(success_count,1)*1000:.2f}ms/条)" ) except Exception as e: logger.error(f"批量存储消息失败: {e}", exc_info=True) + async def _prepare_message_dict(self, message, chat_stream): + """准备消息字典数据(用于批量INSERT) + + 这个方法准备字典而不是ORM对象,性能更高 + """ + message_obj = await self._prepare_message_object(message, chat_stream) + if message_obj is None: + return None + + # 将ORM对象转换为字典(只包含列字段) + message_dict = {} + for column in Messages.__table__.columns: + message_dict[column.name] = getattr(message_obj, column.name) + + return message_dict + async def _prepare_message_object(self, message, chat_stream): """准备消息对象(从原 store_message 逻辑提取)""" try: diff --git a/src/common/database/api/crud.py b/src/common/database/api/crud.py index a82b2a3a5..8a9a75de6 100644 --- a/src/common/database/api/crud.py +++ b/src/common/database/api/crud.py @@ -266,7 +266,14 @@ class CRUDBase: await session.refresh(instance) # 注意:commit在get_db_session的context manager退出时自动执行 # 但为了明确性,这里不需要显式commit - return instance + + # 注意:create不清除缓存,因为: + # 1. 新记录不会影响已有的单条查询缓存(get/get_by) + # 2. get_multi的缓存会自然过期(TTL机制) + # 3. 清除所有缓存代价太大,影响性能 + # 如果需要强一致性,应该在查询时设置use_cache=False + + return instance async def update( self, @@ -459,8 +466,15 @@ class CRUDBase: for instance in instances: await session.refresh(instance) - - return instances + + # 批量创建的缓存策略: + # bulk_create通常用于批量导入场景,此时清除缓存是合理的 + # 因为可能创建大量记录,缓存的列表查询会明显过期 + cache = await get_cache() + await cache.clear() + logger.info(f"批量创建{len(instances)}条{self.model_name}记录后已清除缓存") + + return instances async def bulk_update( self, diff --git a/src/common/database/optimization/batch_scheduler.py b/src/common/database/optimization/batch_scheduler.py index e5d6bd23a..7498a7b16 100644 --- a/src/common/database/optimization/batch_scheduler.py +++ b/src/common/database/optimization/batch_scheduler.py @@ -393,8 +393,10 @@ class AdaptiveBatchScheduler: ) -> None: """批量执行更新操作""" async with get_db_session() as session: - for op in operations: - try: + results = [] + try: + # 🔧 修复:收集所有操作后一次性commit,而不是循环中多次commit + for op in operations: # 构建更新语句 stmt = update(op.model_class) for key, value in op.conditions.items(): @@ -404,23 +406,29 @@ class AdaptiveBatchScheduler: if op.data: stmt = stmt.values(**op.data) - # 执行更新 + # 执行更新(但不commit) result = await session.execute(stmt) - await session.commit() - - # 设置结果 + results.append((op, result.rowcount)) + + # 所有操作成功后,一次性commit + await session.commit() + + # 设置所有操作的结果 + for op, rowcount in results: if op.future and not op.future.done(): - op.future.set_result(result.rowcount) + op.future.set_result(rowcount) if op.callback: try: - op.callback(result.rowcount) + op.callback(rowcount) except Exception as e: logger.warning(f"回调执行失败: {e}") - except Exception as e: - logger.error(f"更新失败: {e}", exc_info=True) - await session.rollback() + except Exception as e: + logger.error(f"批量更新失败: {e}", exc_info=True) + await session.rollback() + # 所有操作都失败 + for op in operations: if op.future and not op.future.done(): op.future.set_exception(e) @@ -430,31 +438,39 @@ class AdaptiveBatchScheduler: ) -> None: """批量执行删除操作""" async with get_db_session() as session: - for op in operations: - try: + results = [] + try: + # 🔧 修复:收集所有操作后一次性commit,而不是循环中多次commit + for op in operations: # 构建删除语句 stmt = delete(op.model_class) for key, value in op.conditions.items(): attr = getattr(op.model_class, key) stmt = stmt.where(attr == value) - # 执行删除 + # 执行删除(但不commit) result = await session.execute(stmt) - await session.commit() - - # 设置结果 + results.append((op, result.rowcount)) + + # 所有操作成功后,一次性commit + await session.commit() + + # 设置所有操作的结果 + for op, rowcount in results: if op.future and not op.future.done(): - op.future.set_result(result.rowcount) + op.future.set_result(rowcount) if op.callback: try: - op.callback(result.rowcount) + op.callback(rowcount) except Exception as e: logger.warning(f"回调执行失败: {e}") - except Exception as e: - logger.error(f"删除失败: {e}", exc_info=True) - await session.rollback() + except Exception as e: + logger.error(f"批量删除失败: {e}", exc_info=True) + await session.rollback() + # 所有操作都失败 + for op in operations: if op.future and not op.future.done(): op.future.set_exception(e)