| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import sqlite3
- import json
- from collections import namedtuple
- Column = namedtuple('Column', ['name', 'type', 'notnull', 'dflt_value'])
- ErrorReport = {
- 'schema_errors': [],
- 'data_errors': []
- }
- def get_tables(conn):
- cursor = conn.cursor()
- cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';")
- return [row[0] for row in cursor.fetchall()]
- def get_columns(conn, table_name):
- cursor = conn.execute(f"PRAGMA table_info('{table_name}')")
- columns = []
- for row in cursor:
- columns.append(Column(row[1], row[2], bool(row[3]), row[4]))
- return columns
- def compare_and_alter_schema(source_conn, target_conn):
- source_tables = get_tables(source_conn)
- target_tables = get_tables(target_conn)
- for table in source_tables:
- if table not in target_tables:
- try:
- create_stmt = source_conn.execute(
- "SELECT sql FROM sqlite_master WHERE type='table' AND name=?;",
- (table,)
- ).fetchone()[0]
- target_conn.execute(create_stmt)
- target_conn.commit()
- except Exception as e:
- ErrorReport['schema_errors'].append(f"Failed to create table {table}: {str(e)}")
- continue
- source_cols = get_columns(source_conn, table)
- target_cols = get_columns(target_conn, table)
- target_col_names = [col.name for col in target_cols]
- for col in source_cols:
- if col.name not in target_col_names:
- try:
- alter_sql = f'ALTER TABLE "{table}" ADD COLUMN "{col.name}" {col.type}'
- if col.notnull:
- alter_sql += ' NOT NULL'
- if col.dflt_value is not None:
- alter_sql += f' DEFAULT {col.dflt_value}'
- target_conn.execute(alter_sql)
- target_conn.commit()
- except Exception as e:
- ErrorReport['schema_errors'].append(
- f"Table {table}: Failed to add column {col.name}: {str(e)}"
- )
- for tcol in target_cols:
- if tcol.name not in [col.name for col in source_cols]:
- if tcol.notnull and tcol.dflt_value is None:
- ErrorReport['schema_errors'].append(
- f"Table {table}: Target column '{tcol.name}' is NOT NULL with no default but missing in source"
- )
- def generate_data_migration_script(source_conn, filename='migration_script.sql'):
- with open(filename, 'w') as f:
- tables = get_tables(source_conn)
- for table in tables:
- cols = get_columns(source_conn, table)
- col_names = [f'"{col.name}"' for col in cols]
- cols_str = ', '.join(col_names)
- f.write(f'INSERT INTO "{table}" ({cols_str})\n')
- f.write(f'SELECT {cols_str} FROM source_db."{table}";\n\n')
- def migrate_data(source_conn, target_conn):
- source_tables = get_tables(source_conn)
-
- for table in source_tables:
- try:
- source_cols = get_columns(source_conn, table)
- col_names = [col.name for col in source_cols]
- placeholders = ', '.join(['?'] * len(col_names))
- insert_sql = f'INSERT INTO "{table}" ({", ".join(col_names)}) VALUES ({placeholders})'
-
- source_cur = source_conn.cursor()
- source_cur.execute(f'SELECT * FROM "{table}"')
-
- target_cur = target_conn.cursor()
-
- while True:
- rows = source_cur.fetchmany(100)
- if not rows:
- break
-
- for row in rows:
- try:
- target_cur.execute(insert_sql, row)
- target_conn.commit()
- except sqlite3.Error as e:
- ErrorReport['data_errors'].append({
- 'table': table,
- 'row_data': row,
- 'error': str(e)
- })
- target_conn.rollback()
- except sqlite3.Error as e:
- ErrorReport['data_errors'].append({
- 'table': table,
- 'error': f"General migration error: {str(e)}"
- })
- def save_error_report(filename='migration_errors.json'):
- with open(filename, 'w') as f:
- json.dump(ErrorReport, f, indent=2)
- def main(source_db, target_db):
- source_conn = sqlite3.connect(source_db)
- target_conn = sqlite3.connect(target_db)
-
- # Attach source DB to target connection for SQL script generation
- target_conn.execute(f"ATTACH DATABASE '{source_db}' AS source_db")
-
- compare_and_alter_schema(source_conn, target_conn)
- generate_data_migration_script(source_conn)
- migrate_data(source_conn, target_conn)
- save_error_report()
-
- source_conn.close()
- target_conn.close()
- if __name__ == '__main__':
- import sys
- if len(sys.argv) != 3:
- print("Usage: python migrate_db.py <source.db> <target.db>")
- sys.exit(1)
-
- main(sys.argv[1], sys.argv[2])
|