From c4a9a3fdb881fbcf7c65dc3df00a2987a6bc1852 Mon Sep 17 00:00:00 2001 From: minecraft1024a Date: Sun, 21 Sep 2025 13:05:13 +0800 Subject: [PATCH] =?UTF-8?q?feat(db):=20=E5=A2=9E=E5=BC=BA=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E8=87=AA=E5=8A=A8=E8=BF=81=E7=A7=BB=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E4=BB=A5=E6=94=AF=E6=8C=81=E7=B4=A2=E5=BC=95=E5=88=9B?= =?UTF-8?q?=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 重构并增强了数据库自动迁移逻辑,以提供更健壮和全面的模式管理。 主要更新包括: - **支持索引创建**: 迁移脚本现在会自动检测并创建模型中定义但数据库中缺失的索引。 - **重构迁移流程**: 1. 首先一次性创建所有缺失的表,提高初始设置效率。 2. 然后,逐表检查并添加缺失的列和索引,使逻辑更清晰。 - **改进 SQLAlchemy 用法**: - 使用 `AddColumn` 和 `CreateIndex` DDL 结构代替原始 SQL 字符串,提高了代码的可读性和数据库方言的兼容性。 - 优化了 `inspector` 的使用方式,减少了重复调用。 - **增强日志记录**: 提供了更详细的日志输出,清晰地展示了正在执行的操作(如创建表、添加列、创建索引),并改进了错误报告。 --- src/common/database/db_migration.py | 155 +++++++++++++++++----------- 1 file changed, 92 insertions(+), 63 deletions(-) diff --git a/src/common/database/db_migration.py b/src/common/database/db_migration.py index 9d2be9e5b..a1633d76c 100644 --- a/src/common/database/db_migration.py +++ b/src/common/database/db_migration.py @@ -1,6 +1,8 @@ # mmc/src/common/database/db_migration.py -from sqlalchemy import inspect, text +from sqlalchemy import inspect +from sqlalchemy.schema import AddColumn, CreateIndex + from src.common.database.sqlalchemy_models import Base, get_engine from src.common.logger import get_logger @@ -9,79 +11,106 @@ logger = get_logger("db_migration") async def check_and_migrate_database(): """ - 异步检查数据库结构并自动迁移(添加缺失的表和列)。 + 异步检查数据库结构并自动迁移。 + - 自动创建不存在的表。 + - 自动为现有表添加缺失的列。 + - 自动为现有表创建缺失的索引。 """ logger.info("正在检查数据库结构并执行自动迁移...") engine = await get_engine() - - # 使用异步引擎获取inspector + async with engine.connect() as connection: # 在同步上下文中运行inspector操作 - inspector = await connection.run_sync(lambda sync_conn: inspect(sync_conn)) - - # 1. 获取数据库中所有已存在的表名 - db_table_names = await connection.run_sync(lambda sync_conn: set(inspect(sync_conn).get_table_names())) + def get_inspector(sync_conn): + return inspect(sync_conn) - # 2. 遍历所有在代码中定义的模型 + inspector = await connection.run_sync(get_inspector) + + # 在同步lambda中传递inspector + db_table_names = await connection.run_sync(lambda conn: set(inspector.get_table_names(conn))) + + # 1. 首先处理表的创建 + tables_to_create = [] for table_name, table in Base.metadata.tables.items(): - logger.debug(f"正在检查表: {table_name}") - - # 3. 如果表不存在,则创建它 if table_name not in db_table_names: - logger.info(f"表 '{table_name}' 不存在,正在创建...") - try: - await connection.run_sync(lambda sync_conn: table.create(sync_conn)) - logger.info(f"表 '{table_name}' 创建成功。") - except Exception as e: - logger.error(f"创建表 '{table_name}' 失败: {e}") + tables_to_create.append(table) + + if tables_to_create: + logger.info(f"发现 {len(tables_to_create)} 个不存在的表,正在创建...") + try: + # 一次性创建所有缺失的表 + await connection.run_sync( + lambda sync_conn: Base.metadata.create_all(sync_conn, tables=tables_to_create) + ) + for table in tables_to_create: + logger.info(f"表 '{table.name}' 创建成功。") + db_table_names.add(table.name) # 将新创建的表添加到集合中 + except Exception as e: + logger.error(f"创建表时失败: {e}", exc_info=True) + + # 2. 然后处理现有表的列和索引的添加 + for table_name, table in Base.metadata.tables.items(): + if table_name not in db_table_names: + logger.warning(f"跳过检查表 '{table_name}',因为它在创建步骤中可能已失败。") continue - # 4. 如果表已存在,则检查并添加缺失的列 - db_columns = await connection.run_sync( - lambda sync_conn: {col["name"] for col in inspect(sync_conn).get_columns(table_name)} - ) - model_columns = {col.name for col in table.c} + logger.debug(f"正在检查表 '{table_name}' 的列和索引...") - missing_columns = model_columns - db_columns - if not missing_columns: - logger.debug(f"表 '{table_name}' 结构一致,无需修改。") + try: + # 检查并添加缺失的列 + db_columns = await connection.run_sync( + lambda conn: {col["name"] for col in inspector.get_columns(table_name, conn)} + ) + model_columns = {col.name for col in table.c} + missing_columns = model_columns - db_columns + + if missing_columns: + logger.info(f"在表 '{table_name}' 中发现缺失的列: {', '.join(missing_columns)}") + async with connection.begin() as trans: + for column_name in missing_columns: + try: + column = table.c[column_name] + add_column_ddl = AddColumn(table_name, column) + await connection.execute(add_column_ddl) + logger.info(f"成功向表 '{table_name}' 添加列 '{column_name}'。") + except Exception as e: + logger.error( + f"向表 '{table_name}' 添加列 '{column_name}' 失败: {e}", + exc_info=True, + ) + await trans.rollback() + break # 如果一列失败,则停止处理此表的其他列 + else: + logger.info(f"表 '{table_name}' 的列结构一致。") + + # 检查并创建缺失的索引 + db_indexes = await connection.run_sync( + lambda conn: {idx["name"] for idx in inspector.get_indexes(table_name, conn)} + ) + model_indexes = {idx.name for idx in table.indexes} + missing_indexes = model_indexes - db_indexes + + if missing_indexes: + logger.info(f"在表 '{table_name}' 中发现缺失的索引: {', '.join(missing_indexes)}") + async with connection.begin() as trans: + for index_name in missing_indexes: + try: + index_obj = next((idx for idx in table.indexes if idx.name == index_name), None) + if index_obj is not None: + await connection.execute(CreateIndex(index_obj)) + logger.info(f"成功为表 '{table_name}' 创建索引 '{index_name}'。") + except Exception as e: + logger.error( + f"为表 '{table_name}' 创建索引 '{index_name}' 失败: {e}", + exc_info=True, + ) + await trans.rollback() + break # 如果一个索引失败,则停止处理此表的其他索引 + else: + logger.debug(f"表 '{table_name}' 的索引一致。") + + except Exception as e: + logger.error(f"在处理表 '{table_name}' 时发生意外错误: {e}", exc_info=True) continue - logger.info(f"在表 '{table_name}' 中发现缺失的列: {', '.join(missing_columns)}") - - # 开始事务来添加缺失的列 - async with connection.begin() as trans: - try: - for column_name in missing_columns: - column = table.c[column_name] - - # 构造并执行 ALTER TABLE 语句 - try: - # 在同步上下文中编译列类型 - column_type = await connection.run_sync( - lambda sync_conn: column.type.compile(sync_conn.dialect) - ) - sql = f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}" - - # 添加默认值和非空约束的处理 - if column.default is not None: - default_value = column.default.arg - if isinstance(default_value, str): - sql += f" DEFAULT '{default_value}'" - else: - sql += f" DEFAULT {default_value}" - - if not column.nullable: - sql += " NOT NULL" - - await connection.execute(text(sql)) - logger.info(f"成功向表 '{table_name}' 添加列 '{column_name}'。") - except Exception as e: - logger.error(f"向表 '{table_name}' 添加列 '{column_name}' 失败: {e}") - - except Exception as e: - logger.error(f"在表 '{table_name}' 添加列时发生错误,事务已回滚: {e}") - await trans.rollback() - raise - logger.info("数据库结构检查与自动迁移完成。")