新增迁移脚本

This commit is contained in:
雅诺狐
2025-08-16 15:54:20 +08:00
parent 7dfaf54c9c
commit 0f0619762b

120
migration_script.py Normal file
View File

@@ -0,0 +1,120 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
安全地向 llm_usage 表添加列 model_assign_name 及其索引
支持 MySQL / MariaDBPyMySQL 和 SQLitesqlite3
"""
from __future__ import annotations
import re
import sqlite3
import pymysql
from contextlib import closing
# ===== 按需修改 =====
# 1) MySQL 示例
# CONNECTION = dict(
# host="127.0.0.1",
# port=3306,
# user="root",
# password="123456",
# database="test_db",
# charset="utf8mb4",
# driver="mysql" # 关键标识
# )
# 2) SQLite 示例
CONNECTION = dict(
database="test.db", # 数据库文件路径
driver="sqlite"
)
# =====================
COLUMN_NAME = "model_assign_name"
COLUMN_TYPE = "VARCHAR(100)"
INDEX_NAME = f"idx_llmusage_{COLUMN_NAME}"
TABLE_NAME = "llm_usage"
# --------------------------------------------------
# MySQL 实现
# --------------------------------------------------
def _mysql_ensure(cfg: dict) -> None:
with closing(pymysql.connect(**{k: v for k, v in cfg.items() if k != "driver"})) as conn, closing(conn.cursor()) as cur:
# 1. 列是否存在
cur.execute("""
SELECT COUNT(*) FROM information_schema.COLUMNS
WHERE table_schema = %s
AND table_name = %s
AND column_name = %s
""", (cfg["database"], TABLE_NAME, COLUMN_NAME))
col_exists = cur.fetchone()[0] > 0
# 2. 索引是否存在
cur.execute("""
SELECT COUNT(*) FROM information_schema.STATISTICS
WHERE table_schema = %s
AND table_name = %s
AND index_name = %s
""", (cfg["database"], TABLE_NAME, INDEX_NAME))
idx_exists = cur.fetchone()[0] > 0
if not col_exists:
sql = f"ALTER TABLE {TABLE_NAME} ADD COLUMN {COLUMN_NAME} {COLUMN_TYPE}"
print(f"[MySQL DDL] {sql}")
cur.execute(sql)
if not idx_exists:
sql = f"CREATE INDEX {INDEX_NAME} ON {TABLE_NAME} ({COLUMN_NAME})"
print(f"[MySQL DDL] {sql}")
cur.execute(sql)
conn.commit()
# --------------------------------------------------
# SQLite 实现
# --------------------------------------------------
def _sqlite_ensure(cfg: dict) -> None:
db = cfg["database"]
with closing(sqlite3.connect(db)) as conn, closing(conn.cursor()) as cur:
# 1. 列是否存在
cur.execute("PRAGMA table_info({})".format(TABLE_NAME))
cols = {row[1] for row in cur.fetchall()}
col_exists = COLUMN_NAME in cols
# 2. 索引是否存在
cur.execute("PRAGMA index_list({})".format(TABLE_NAME))
idxs = {row[1] for row in cur.fetchall()}
idx_exists = INDEX_NAME in idxs
# SQLite ≥3.35 支持 ALTER TABLE ADD COLUMN IF NOT EXISTS
if not col_exists:
sql = f"ALTER TABLE {TABLE_NAME} ADD COLUMN {COLUMN_NAME} {COLUMN_TYPE}"
print(f"[SQLite DDL] {sql}")
cur.execute(sql)
if not idx_exists:
sql = f"CREATE INDEX {INDEX_NAME} ON {TABLE_NAME} ({COLUMN_NAME})"
print(f"[SQLite DDL] {sql}")
cur.execute(sql)
conn.commit()
# --------------------------------------------------
# 调度器
# --------------------------------------------------
def ensure_column_and_index(cfg: dict) -> None:
driver = cfg.get("driver", "").lower()
if driver == "mysql":
_mysql_ensure(cfg)
elif driver == "sqlite":
_sqlite_ensure(cfg)
else:
raise ValueError("connection.driver 必须是 'mysql''sqlite'")
print("✅ 完成。")
if __name__ == "__main__":
ensure_column_and_index(CONNECTION)