refactor(db): 使用迁移函数替代 create_all 初始化数据库

将数据库初始化过程从直接调用 `Base.metadata.create_all` 修改为调用新的 `check_and_migrate_database` 函数。

这一更改旨在实现更灵活的数据库模式管理,允许在不丢失现有数据的情况下,自动检查并添加新的列或表,从而增强了数据库迁移的健壮性。
This commit is contained in:
minecraft1024a
2025-08-17 14:31:58 +08:00
committed by Windpicker-owo
parent 011096068e
commit 0144321254
3 changed files with 220 additions and 3 deletions

View File

@@ -0,0 +1,78 @@
# mmc/src/common/database/db_migration.py
from sqlalchemy import inspect, text
from sqlalchemy.engine import Engine
from src.common.database.sqlalchemy_models import Base, get_engine
from src.common.logger import get_logger
logger = get_logger("db_migration")
def check_and_migrate_database():
"""
检查数据库结构并自动迁移(添加缺失的表和列)。
"""
logger.info("正在检查数据库结构并执行自动迁移...")
engine = get_engine()
inspector = inspect(engine)
# 1. 获取数据库中所有已存在的表名
db_table_names = set(inspector.get_table_names())
# 2. 遍历所有在代码中定义的模型
for table_name, table in Base.metadata.tables.items():
logger.debug(f"正在检查表: {table_name}")
# 3. 如果表不存在,则创建它
if table_name not in db_table_names:
logger.info(f"'{table_name}' 不存在,正在创建...")
try:
table.create(engine)
logger.info(f"'{table_name}' 创建成功。")
except Exception as e:
logger.error(f"创建表 '{table_name}' 失败: {e}")
continue
# 4. 如果表已存在,则检查并添加缺失的列
db_columns = {col["name"] for col in inspector.get_columns(table_name)}
model_columns = {col.name for col in table.c}
missing_columns = model_columns - db_columns
if not missing_columns:
logger.debug(f"'{table_name}' 结构一致,无需修改。")
continue
logger.info(f"在表 '{table_name}' 中发现缺失的列: {', '.join(missing_columns)}")
with engine.connect() as connection:
trans = connection.begin()
try:
for column_name in missing_columns:
column = table.c[column_name]
# 构造并执行 ALTER TABLE 语句
try:
column_type = column.type.compile(engine.dialect)
sql = f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}"
# 添加默认值和非空约束的处理
if column.default is not None:
default_value = column.default.arg
if isinstance(default_value, str):
sql += f" DEFAULT '{default_value}'"
else:
sql += f" DEFAULT {default_value}"
if not column.nullable:
sql += " NOT NULL"
connection.execute(text(sql))
logger.info(f"成功向表 '{table_name}' 添加列 '{column_name}'")
except Exception as e:
logger.error(f"向表 '{table_name}' 添加列 '{column_name}' 失败: {e}")
trans.commit()
except Exception as e:
logger.error(f"在表 '{table_name}' 添加列时发生错误,事务已回滚: {e}")
trans.rollback()
logger.info("数据库结构检查与自动迁移完成。")