import sqlite3 import json from collections import namedtuple Column = namedtuple('Column', ['name', 'type', 'notnull', 'dflt_value']) ErrorReport = { 'schema_errors': [], 'data_errors': [] } def get_tables(conn): cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';") return [row[0] for row in cursor.fetchall()] def get_columns(conn, table_name): cursor = conn.execute(f"PRAGMA table_info('{table_name}')") columns = [] for row in cursor: columns.append(Column(row[1], row[2], bool(row[3]), row[4])) return columns def compare_and_alter_schema(source_conn, target_conn): source_tables = get_tables(source_conn) target_tables = get_tables(target_conn) for table in source_tables: if table not in target_tables: try: create_stmt = source_conn.execute( "SELECT sql FROM sqlite_master WHERE type='table' AND name=?;", (table,) ).fetchone()[0] target_conn.execute(create_stmt) target_conn.commit() except Exception as e: ErrorReport['schema_errors'].append(f"Failed to create table {table}: {str(e)}") continue source_cols = get_columns(source_conn, table) target_cols = get_columns(target_conn, table) target_col_names = [col.name for col in target_cols] for col in source_cols: if col.name not in target_col_names: try: alter_sql = f'ALTER TABLE "{table}" ADD COLUMN "{col.name}" {col.type}' if col.notnull: alter_sql += ' NOT NULL' if col.dflt_value is not None: alter_sql += f' DEFAULT {col.dflt_value}' target_conn.execute(alter_sql) target_conn.commit() except Exception as e: ErrorReport['schema_errors'].append( f"Table {table}: Failed to add column {col.name}: {str(e)}" ) for tcol in target_cols: if tcol.name not in [col.name for col in source_cols]: if tcol.notnull and tcol.dflt_value is None: ErrorReport['schema_errors'].append( f"Table {table}: Target column '{tcol.name}' is NOT NULL with no default but missing in source" ) def generate_data_migration_script(source_conn, filename='migration_script.sql'): with open(filename, 'w') as f: tables = get_tables(source_conn) for table in tables: cols = get_columns(source_conn, table) col_names = [f'"{col.name}"' for col in cols] cols_str = ', '.join(col_names) f.write(f'INSERT INTO "{table}" ({cols_str})\n') f.write(f'SELECT {cols_str} FROM source_db."{table}";\n\n') def migrate_data(source_conn, target_conn): source_tables = get_tables(source_conn) for table in source_tables: try: source_cols = get_columns(source_conn, table) col_names = [col.name for col in source_cols] placeholders = ', '.join(['?'] * len(col_names)) insert_sql = f'INSERT INTO "{table}" ({", ".join(col_names)}) VALUES ({placeholders})' source_cur = source_conn.cursor() source_cur.execute(f'SELECT * FROM "{table}"') target_cur = target_conn.cursor() while True: rows = source_cur.fetchmany(100) if not rows: break for row in rows: try: target_cur.execute(insert_sql, row) target_conn.commit() except sqlite3.Error as e: ErrorReport['data_errors'].append({ 'table': table, 'row_data': row, 'error': str(e) }) target_conn.rollback() except sqlite3.Error as e: ErrorReport['data_errors'].append({ 'table': table, 'error': f"General migration error: {str(e)}" }) def save_error_report(filename='migration_errors.json'): with open(filename, 'w') as f: json.dump(ErrorReport, f, indent=2) def main(source_db, target_db): source_conn = sqlite3.connect(source_db) target_conn = sqlite3.connect(target_db) # Attach source DB to target connection for SQL script generation target_conn.execute(f"ATTACH DATABASE '{source_db}' AS source_db") compare_and_alter_schema(source_conn, target_conn) generate_data_migration_script(source_conn) migrate_data(source_conn, target_conn) save_error_report() source_conn.close() target_conn.close() if __name__ == '__main__': import sys if len(sys.argv) != 3: print("Usage: python migrate_db.py ") sys.exit(1) main(sys.argv[1], sys.argv[2])