|
|
@@ -0,0 +1,138 @@
|
|
|
+import sqlite3
|
|
|
+import json
|
|
|
+from collections import namedtuple
|
|
|
+
|
|
|
+Column = namedtuple('Column', ['name', 'type', 'notnull', 'dflt_value'])
|
|
|
+ErrorReport = {
|
|
|
+ 'schema_errors': [],
|
|
|
+ 'data_errors': []
|
|
|
+}
|
|
|
+
|
|
|
+def get_tables(conn):
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';")
|
|
|
+ return [row[0] for row in cursor.fetchall()]
|
|
|
+
|
|
|
+def get_columns(conn, table_name):
|
|
|
+ cursor = conn.execute(f"PRAGMA table_info('{table_name}')")
|
|
|
+ columns = []
|
|
|
+ for row in cursor:
|
|
|
+ columns.append(Column(row[1], row[2], bool(row[3]), row[4]))
|
|
|
+ return columns
|
|
|
+
|
|
|
+def compare_and_alter_schema(source_conn, target_conn):
|
|
|
+ source_tables = get_tables(source_conn)
|
|
|
+ target_tables = get_tables(target_conn)
|
|
|
+
|
|
|
+ for table in source_tables:
|
|
|
+ if table not in target_tables:
|
|
|
+ try:
|
|
|
+ create_stmt = source_conn.execute(
|
|
|
+ "SELECT sql FROM sqlite_master WHERE type='table' AND name=?;",
|
|
|
+ (table,)
|
|
|
+ ).fetchone()[0]
|
|
|
+ target_conn.execute(create_stmt)
|
|
|
+ target_conn.commit()
|
|
|
+ except Exception as e:
|
|
|
+ ErrorReport['schema_errors'].append(f"Failed to create table {table}: {str(e)}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ source_cols = get_columns(source_conn, table)
|
|
|
+ target_cols = get_columns(target_conn, table)
|
|
|
+ target_col_names = [col.name for col in target_cols]
|
|
|
+
|
|
|
+ for col in source_cols:
|
|
|
+ if col.name not in target_col_names:
|
|
|
+ try:
|
|
|
+ alter_sql = f'ALTER TABLE "{table}" ADD COLUMN "{col.name}" {col.type}'
|
|
|
+ if col.notnull:
|
|
|
+ alter_sql += ' NOT NULL'
|
|
|
+ if col.dflt_value is not None:
|
|
|
+ alter_sql += f' DEFAULT {col.dflt_value}'
|
|
|
+ target_conn.execute(alter_sql)
|
|
|
+ target_conn.commit()
|
|
|
+ except Exception as e:
|
|
|
+ ErrorReport['schema_errors'].append(
|
|
|
+ f"Table {table}: Failed to add column {col.name}: {str(e)}"
|
|
|
+ )
|
|
|
+
|
|
|
+ for tcol in target_cols:
|
|
|
+ if tcol.name not in [col.name for col in source_cols]:
|
|
|
+ if tcol.notnull and tcol.dflt_value is None:
|
|
|
+ ErrorReport['schema_errors'].append(
|
|
|
+ f"Table {table}: Target column '{tcol.name}' is NOT NULL with no default but missing in source"
|
|
|
+ )
|
|
|
+
|
|
|
+def generate_data_migration_script(source_conn, filename='migration_script.sql'):
|
|
|
+ with open(filename, 'w') as f:
|
|
|
+ tables = get_tables(source_conn)
|
|
|
+ for table in tables:
|
|
|
+ cols = get_columns(source_conn, table)
|
|
|
+ col_names = [f'"{col.name}"' for col in cols]
|
|
|
+ cols_str = ', '.join(col_names)
|
|
|
+ f.write(f'INSERT INTO "{table}" ({cols_str})\n')
|
|
|
+ f.write(f'SELECT {cols_str} FROM source_db."{table}";\n\n')
|
|
|
+
|
|
|
+def migrate_data(source_conn, target_conn):
|
|
|
+ source_tables = get_tables(source_conn)
|
|
|
+
|
|
|
+ for table in source_tables:
|
|
|
+ try:
|
|
|
+ source_cols = get_columns(source_conn, table)
|
|
|
+ col_names = [col.name for col in source_cols]
|
|
|
+ placeholders = ', '.join(['?'] * len(col_names))
|
|
|
+ insert_sql = f'INSERT INTO "{table}" ({", ".join(col_names)}) VALUES ({placeholders})'
|
|
|
+
|
|
|
+ source_cur = source_conn.cursor()
|
|
|
+ source_cur.execute(f'SELECT * FROM "{table}"')
|
|
|
+
|
|
|
+ target_cur = target_conn.cursor()
|
|
|
+
|
|
|
+ while True:
|
|
|
+ rows = source_cur.fetchmany(100)
|
|
|
+ if not rows:
|
|
|
+ break
|
|
|
+
|
|
|
+ for row in rows:
|
|
|
+ try:
|
|
|
+ target_cur.execute(insert_sql, row)
|
|
|
+ target_conn.commit()
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ ErrorReport['data_errors'].append({
|
|
|
+ 'table': table,
|
|
|
+ 'row_data': row,
|
|
|
+ 'error': str(e)
|
|
|
+ })
|
|
|
+ target_conn.rollback()
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ ErrorReport['data_errors'].append({
|
|
|
+ 'table': table,
|
|
|
+ 'error': f"General migration error: {str(e)}"
|
|
|
+ })
|
|
|
+
|
|
|
+def save_error_report(filename='migration_errors.json'):
|
|
|
+ with open(filename, 'w') as f:
|
|
|
+ json.dump(ErrorReport, f, indent=2)
|
|
|
+
|
|
|
+def main(source_db, target_db):
|
|
|
+ source_conn = sqlite3.connect(source_db)
|
|
|
+ target_conn = sqlite3.connect(target_db)
|
|
|
+
|
|
|
+ # Attach source DB to target connection for SQL script generation
|
|
|
+ target_conn.execute(f"ATTACH DATABASE '{source_db}' AS source_db")
|
|
|
+
|
|
|
+ compare_and_alter_schema(source_conn, target_conn)
|
|
|
+ generate_data_migration_script(source_conn)
|
|
|
+ migrate_data(source_conn, target_conn)
|
|
|
+ save_error_report()
|
|
|
+
|
|
|
+ source_conn.close()
|
|
|
+ target_conn.close()
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ import sys
|
|
|
+ if len(sys.argv) != 3:
|
|
|
+ print("Usage: python migrate_db.py <source.db> <target.db>")
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+ main(sys.argv[1], sys.argv[2])
|