From 6bd4170c90372d1dfc535ea17f3e0cde54ae479b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9B=85=E8=AF=BA=E7=8B=90?= <212194964+foxcyber907@users.noreply.github.com> Date: Sat, 16 Aug 2025 15:54:20 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=BF=81=E7=A7=BB=E8=84=9A?= =?UTF-8?q?=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- migration_script.py | 120 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 migration_script.py diff --git a/migration_script.py b/migration_script.py new file mode 100644 index 000000000..293d3e08b --- /dev/null +++ b/migration_script.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +安全地向 llm_usage 表添加列 model_assign_name 及其索引 +支持 MySQL / MariaDB(PyMySQL) 和 SQLite(sqlite3) +""" + +from __future__ import annotations +import re +import sqlite3 +import pymysql +from contextlib import closing + +# ===== 按需修改 ===== +# 1) MySQL 示例 +# CONNECTION = dict( +# host="127.0.0.1", +# port=3306, +# user="root", +# password="123456", +# database="test_db", +# charset="utf8mb4", +# driver="mysql" # 关键标识 +# ) + +# 2) SQLite 示例 +CONNECTION = dict( + database="test.db", # 数据库文件路径 + driver="sqlite" +) +# ===================== + +COLUMN_NAME = "model_assign_name" +COLUMN_TYPE = "VARCHAR(100)" +INDEX_NAME = f"idx_llmusage_{COLUMN_NAME}" +TABLE_NAME = "llm_usage" + + +# -------------------------------------------------- +# MySQL 实现 +# -------------------------------------------------- +def _mysql_ensure(cfg: dict) -> None: + with closing(pymysql.connect(**{k: v for k, v in cfg.items() if k != "driver"})) as conn, closing(conn.cursor()) as cur: + # 1. 列是否存在 + cur.execute(""" + SELECT COUNT(*) FROM information_schema.COLUMNS + WHERE table_schema = %s + AND table_name = %s + AND column_name = %s + """, (cfg["database"], TABLE_NAME, COLUMN_NAME)) + col_exists = cur.fetchone()[0] > 0 + + # 2. 索引是否存在 + cur.execute(""" + SELECT COUNT(*) FROM information_schema.STATISTICS + WHERE table_schema = %s + AND table_name = %s + AND index_name = %s + """, (cfg["database"], TABLE_NAME, INDEX_NAME)) + idx_exists = cur.fetchone()[0] > 0 + + if not col_exists: + sql = f"ALTER TABLE {TABLE_NAME} ADD COLUMN {COLUMN_NAME} {COLUMN_TYPE}" + print(f"[MySQL DDL] {sql}") + cur.execute(sql) + + if not idx_exists: + sql = f"CREATE INDEX {INDEX_NAME} ON {TABLE_NAME} ({COLUMN_NAME})" + print(f"[MySQL DDL] {sql}") + cur.execute(sql) + + conn.commit() + + +# -------------------------------------------------- +# SQLite 实现 +# -------------------------------------------------- +def _sqlite_ensure(cfg: dict) -> None: + db = cfg["database"] + with closing(sqlite3.connect(db)) as conn, closing(conn.cursor()) as cur: + # 1. 列是否存在 + cur.execute("PRAGMA table_info({})".format(TABLE_NAME)) + cols = {row[1] for row in cur.fetchall()} + col_exists = COLUMN_NAME in cols + + # 2. 索引是否存在 + cur.execute("PRAGMA index_list({})".format(TABLE_NAME)) + idxs = {row[1] for row in cur.fetchall()} + idx_exists = INDEX_NAME in idxs + + # SQLite ≥3.35 支持 ALTER TABLE ADD COLUMN IF NOT EXISTS + if not col_exists: + sql = f"ALTER TABLE {TABLE_NAME} ADD COLUMN {COLUMN_NAME} {COLUMN_TYPE}" + print(f"[SQLite DDL] {sql}") + cur.execute(sql) + + if not idx_exists: + sql = f"CREATE INDEX {INDEX_NAME} ON {TABLE_NAME} ({COLUMN_NAME})" + print(f"[SQLite DDL] {sql}") + cur.execute(sql) + + conn.commit() + + +# -------------------------------------------------- +# 调度器 +# -------------------------------------------------- +def ensure_column_and_index(cfg: dict) -> None: + driver = cfg.get("driver", "").lower() + if driver == "mysql": + _mysql_ensure(cfg) + elif driver == "sqlite": + _sqlite_ensure(cfg) + else: + raise ValueError("connection.driver 必须是 'mysql' 或 'sqlite'") + print("✅ 完成。") + + +if __name__ == "__main__": + ensure_column_and_index(CONNECTION) \ No newline at end of file