diff --git a/src/chat/message_receive/storage.py b/src/chat/message_receive/storage.py index 8e3e617dd..23ba8ecb4 100644 --- a/src/chat/message_receive/storage.py +++ b/src/chat/message_receive/storage.py @@ -99,37 +99,55 @@ class MessageStorageBatcher: success_count = 0 try: - # 准备所有消息对象 - messages_objects = [] + # 🔧 优化:准备字典数据而不是ORM对象,使用批量INSERT + messages_dicts = [] for msg_data in messages_to_store: try: - message_obj = await self._prepare_message_object( + message_dict = await self._prepare_message_dict( msg_data['message'], msg_data['chat_stream'] ) - if message_obj: - messages_objects.append(message_obj) + if message_dict: + messages_dicts.append(message_dict) except Exception as e: - logger.error(f"准备消息对象失败: {e}") + logger.error(f"准备消息数据失败: {e}") continue - # 批量写入数据库 - if messages_objects: + # 批量写入数据库 - 使用高效的批量INSERT + if messages_dicts: + from sqlalchemy import insert async with get_db_session() as session: - session.add_all(messages_objects) + stmt = insert(Messages).values(messages_dicts) + await session.execute(stmt) await session.commit() - success_count = len(messages_objects) + success_count = len(messages_dicts) elapsed = time.time() - start_time logger.info( f"批量存储了 {success_count}/{len(messages_to_store)} 条消息 " - f"(耗时: {elapsed:.3f}秒)" + f"(耗时: {elapsed:.3f}秒, 平均 {elapsed/max(success_count,1)*1000:.2f}ms/条)" ) except Exception as e: logger.error(f"批量存储消息失败: {e}", exc_info=True) + async def _prepare_message_dict(self, message, chat_stream): + """准备消息字典数据(用于批量INSERT) + + 这个方法准备字典而不是ORM对象,性能更高 + """ + message_obj = await self._prepare_message_object(message, chat_stream) + if message_obj is None: + return None + + # 将ORM对象转换为字典(只包含列字段) + message_dict = {} + for column in Messages.__table__.columns: + message_dict[column.name] = getattr(message_obj, column.name) + + return message_dict + async def _prepare_message_object(self, message, chat_stream): """准备消息对象(从原 store_message 逻辑提取)""" try: