feat(database): 为 SQLite 添加 WAL 模式支持和性能优化配置

增加 SQLite 数据库性能优化功能,包括:
- 启用 WAL 模式提高并发性能
- 设置合理的超时时间和同步级别
- 添加数据库定期维护功能
- 在会话初始化时自动应用优化配置
- 增加连接超时时间从30秒到60秒

这些优化显著提升了 SQLite 数据库在高并发场景下的性能和稳定性。
This commit is contained in:
Windpicker-owo
2025-09-28 22:04:44 +08:00
parent 28bce19d27
commit 8b034e21c6
2 changed files with 81 additions and 7 deletions

View File

@@ -194,7 +194,7 @@ async def db_query(
# 首先获取要更新的记录 # 首先获取要更新的记录
result = await session.execute(query) result = await session.execute(query)
records_to_update = result.scalars().all() records_to_update = result.scalars().all()
# 更新每个记录 # 更新每个记录
affected_rows = 0 affected_rows = 0
for record in records_to_update: for record in records_to_update:
@@ -202,7 +202,7 @@ async def db_query(
if hasattr(record, field): if hasattr(record, field):
setattr(record, field, value) setattr(record, field, value)
affected_rows += 1 affected_rows += 1
return affected_rows return affected_rows
elif query_type == "delete": elif query_type == "delete":
@@ -217,13 +217,13 @@ async def db_query(
# 首先获取要删除的记录 # 首先获取要删除的记录
result = await session.execute(query) result = await session.execute(query)
records_to_delete = result.scalars().all() records_to_delete = result.scalars().all()
# 删除记录 # 删除记录
affected_rows = 0 affected_rows = 0
for record in records_to_delete: for record in records_to_delete:
session.delete(record) session.delete(record)
affected_rows += 1 affected_rows += 1
return affected_rows return affected_rows
elif query_type == "count": elif query_type == "count":
@@ -416,4 +416,4 @@ async def store_action_info(
except Exception as e: except Exception as e:
logger.error(f"[SQLAlchemy] 存储动作信息时发生错误: {e}") logger.error(f"[SQLAlchemy] 存储动作信息时发生错误: {e}")
traceback.print_exc() traceback.print_exc()
return None return None

View File

@@ -9,7 +9,7 @@ import time
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Optional, Any, Dict, AsyncGenerator from typing import Optional, Any, Dict, AsyncGenerator
from sqlalchemy import Column, String, Float, Integer, Boolean, Text, Index, DateTime from sqlalchemy import Column, String, Float, Integer, Boolean, Text, Index, DateTime, text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
@@ -22,6 +22,66 @@ logger = get_logger("sqlalchemy_models")
Base = declarative_base() Base = declarative_base()
async def enable_sqlite_wal_mode(engine):
"""为 SQLite 启用 WAL 模式以提高并发性能"""
try:
async with engine.begin() as conn:
# 启用 WAL 模式
await conn.execute(text("PRAGMA journal_mode = WAL"))
# 设置适中的同步级别,平衡性能和安全性
await conn.execute(text("PRAGMA synchronous = NORMAL"))
# 启用外键约束
await conn.execute(text("PRAGMA foreign_keys = ON"))
# 设置 busy_timeout避免锁定错误
await conn.execute(text("PRAGMA busy_timeout = 60000")) # 60秒
logger.info("[SQLite] WAL 模式已启用,并发性能已优化")
except Exception as e:
logger.warning(f"[SQLite] 启用 WAL 模式失败: {e},将使用默认配置")
async def maintain_sqlite_database():
"""定期维护 SQLite 数据库性能"""
try:
engine, SessionLocal = await initialize_database()
if not engine:
return
async with engine.begin() as conn:
# 检查并确保 WAL 模式仍然启用
result = await conn.execute(text("PRAGMA journal_mode"))
journal_mode = result.scalar()
if journal_mode != "wal":
await conn.execute(text("PRAGMA journal_mode = WAL"))
logger.info("[SQLite] WAL 模式已重新启用")
# 优化数据库性能
await conn.execute(text("PRAGMA synchronous = NORMAL"))
await conn.execute(text("PRAGMA busy_timeout = 60000"))
await conn.execute(text("PRAGMA foreign_keys = ON"))
# 定期清理(可选,根据需要启用)
# await conn.execute(text("PRAGMA optimize"))
logger.info("[SQLite] 数据库维护完成")
except Exception as e:
logger.warning(f"[SQLite] 数据库维护失败: {e}")
def get_sqlite_performance_config():
"""获取 SQLite 性能优化配置"""
return {
"journal_mode": "WAL", # 提高并发性能
"synchronous": "NORMAL", # 平衡性能和安全性
"busy_timeout": 60000, # 60秒超时
"foreign_keys": "ON", # 启用外键约束
"cache_size": -10000, # 10MB 缓存
"temp_store": "MEMORY", # 临时存储使用内存
"mmap_size": 268435456, # 256MB 内存映射
}
# MySQL兼容的字段类型辅助函数 # MySQL兼容的字段类型辅助函数
def get_string_field(max_length=255, **kwargs): def get_string_field(max_length=255, **kwargs):
""" """
@@ -679,7 +739,7 @@ async def initialize_database():
{ {
"connect_args": { "connect_args": {
"check_same_thread": False, "check_same_thread": False,
"timeout": 30, "timeout": 60, # 增加超时时间
}, },
} }
) )
@@ -692,6 +752,10 @@ async def initialize_database():
await check_and_migrate_database() await check_and_migrate_database()
# 如果是 SQLite启用 WAL 模式以提高并发性能
if config.database_type == "sqlite":
await enable_sqlite_wal_mode(_engine)
logger.info(f"SQLAlchemy异步数据库初始化成功: {config.database_type}") logger.info(f"SQLAlchemy异步数据库初始化成功: {config.database_type}")
return _engine, _SessionLocal return _engine, _SessionLocal
@@ -705,6 +769,16 @@ async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
if not SessionLocal: if not SessionLocal:
raise RuntimeError("Database session not initialized") raise RuntimeError("Database session not initialized")
session = SessionLocal() session = SessionLocal()
# 对于 SQLite在会话开始时设置 PRAGMA
from src.config.config import global_config
if global_config.database.database_type == "sqlite":
try:
await session.execute(text("PRAGMA busy_timeout = 60000"))
await session.execute(text("PRAGMA foreign_keys = ON"))
except Exception as e:
logger.warning(f"[SQLite] 设置会话 PRAGMA 失败: {e}")
yield session yield session
except Exception as e: except Exception as e:
logger.error(f"数据库会话错误: {e}") logger.error(f"数据库会话错误: {e}")