perf: 优化批量消息存储,使用insert().values()替代add_all()

This commit is contained in:
Windpicker-owo
2025-11-01 17:43:47 +08:00
parent f872a1ff82
commit 56df2d2252

View File

@@ -99,37 +99,55 @@ class MessageStorageBatcher:
success_count = 0
try:
# 准备所有消息对象
messages_objects = []
# 🔧 优化准备字典数据而不是ORM对象使用批量INSERT
messages_dicts = []
for msg_data in messages_to_store:
try:
message_obj = await self._prepare_message_object(
message_dict = await self._prepare_message_dict(
msg_data['message'],
msg_data['chat_stream']
)
if message_obj:
messages_objects.append(message_obj)
if message_dict:
messages_dicts.append(message_dict)
except Exception as e:
logger.error(f"准备消息对象失败: {e}")
logger.error(f"准备消息数据失败: {e}")
continue
# 批量写入数据库
if messages_objects:
# 批量写入数据库 - 使用高效的批量INSERT
if messages_dicts:
from sqlalchemy import insert
async with get_db_session() as session:
session.add_all(messages_objects)
stmt = insert(Messages).values(messages_dicts)
await session.execute(stmt)
await session.commit()
success_count = len(messages_objects)
success_count = len(messages_dicts)
elapsed = time.time() - start_time
logger.info(
f"批量存储了 {success_count}/{len(messages_to_store)} 条消息 "
f"(耗时: {elapsed:.3f}秒)"
f"(耗时: {elapsed:.3f}, 平均 {elapsed/max(success_count,1)*1000:.2f}ms/条)"
)
except Exception as e:
logger.error(f"批量存储消息失败: {e}", exc_info=True)
async def _prepare_message_dict(self, message, chat_stream):
"""准备消息字典数据用于批量INSERT
这个方法准备字典而不是ORM对象性能更高
"""
message_obj = await self._prepare_message_object(message, chat_stream)
if message_obj is None:
return None
# 将ORM对象转换为字典只包含列字段
message_dict = {}
for column in Messages.__table__.columns:
message_dict[column.name] = getattr(message_obj, column.name)
return message_dict
async def _prepare_message_object(self, message, chat_stream):
"""准备消息对象(从原 store_message 逻辑提取)"""
try: