migratedb.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import sqlite3
  2. import json
  3. from collections import namedtuple
  4. Column = namedtuple('Column', ['name', 'type', 'notnull', 'dflt_value'])
  5. ErrorReport = {
  6. 'schema_errors': [],
  7. 'data_errors': []
  8. }
  9. def get_tables(conn):
  10. cursor = conn.cursor()
  11. cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%';")
  12. return [row[0] for row in cursor.fetchall()]
  13. def get_columns(conn, table_name):
  14. cursor = conn.execute(f"PRAGMA table_info('{table_name}')")
  15. columns = []
  16. for row in cursor:
  17. columns.append(Column(row[1], row[2], bool(row[3]), row[4]))
  18. return columns
  19. def compare_and_alter_schema(source_conn, target_conn):
  20. source_tables = get_tables(source_conn)
  21. target_tables = get_tables(target_conn)
  22. for table in source_tables:
  23. if table not in target_tables:
  24. try:
  25. create_stmt = source_conn.execute(
  26. "SELECT sql FROM sqlite_master WHERE type='table' AND name=?;",
  27. (table,)
  28. ).fetchone()[0]
  29. target_conn.execute(create_stmt)
  30. target_conn.commit()
  31. except Exception as e:
  32. ErrorReport['schema_errors'].append(f"Failed to create table {table}: {str(e)}")
  33. continue
  34. source_cols = get_columns(source_conn, table)
  35. target_cols = get_columns(target_conn, table)
  36. target_col_names = [col.name for col in target_cols]
  37. for col in source_cols:
  38. if col.name not in target_col_names:
  39. try:
  40. alter_sql = f'ALTER TABLE "{table}" ADD COLUMN "{col.name}" {col.type}'
  41. if col.notnull:
  42. alter_sql += ' NOT NULL'
  43. if col.dflt_value is not None:
  44. alter_sql += f' DEFAULT {col.dflt_value}'
  45. target_conn.execute(alter_sql)
  46. target_conn.commit()
  47. except Exception as e:
  48. ErrorReport['schema_errors'].append(
  49. f"Table {table}: Failed to add column {col.name}: {str(e)}"
  50. )
  51. for tcol in target_cols:
  52. if tcol.name not in [col.name for col in source_cols]:
  53. if tcol.notnull and tcol.dflt_value is None:
  54. ErrorReport['schema_errors'].append(
  55. f"Table {table}: Target column '{tcol.name}' is NOT NULL with no default but missing in source"
  56. )
  57. def generate_data_migration_script(source_conn, filename='migration_script.sql'):
  58. with open(filename, 'w') as f:
  59. tables = get_tables(source_conn)
  60. for table in tables:
  61. cols = get_columns(source_conn, table)
  62. col_names = [f'"{col.name}"' for col in cols]
  63. cols_str = ', '.join(col_names)
  64. f.write(f'INSERT INTO "{table}" ({cols_str})\n')
  65. f.write(f'SELECT {cols_str} FROM source_db."{table}";\n\n')
  66. def migrate_data(source_conn, target_conn):
  67. source_tables = get_tables(source_conn)
  68. for table in source_tables:
  69. try:
  70. source_cols = get_columns(source_conn, table)
  71. col_names = [col.name for col in source_cols]
  72. placeholders = ', '.join(['?'] * len(col_names))
  73. insert_sql = f'INSERT INTO "{table}" ({", ".join(col_names)}) VALUES ({placeholders})'
  74. source_cur = source_conn.cursor()
  75. source_cur.execute(f'SELECT * FROM "{table}"')
  76. target_cur = target_conn.cursor()
  77. while True:
  78. rows = source_cur.fetchmany(100)
  79. if not rows:
  80. break
  81. for row in rows:
  82. try:
  83. target_cur.execute(insert_sql, row)
  84. target_conn.commit()
  85. except sqlite3.Error as e:
  86. ErrorReport['data_errors'].append({
  87. 'table': table,
  88. 'row_data': row,
  89. 'error': str(e)
  90. })
  91. target_conn.rollback()
  92. except sqlite3.Error as e:
  93. ErrorReport['data_errors'].append({
  94. 'table': table,
  95. 'error': f"General migration error: {str(e)}"
  96. })
  97. def save_error_report(filename='migration_errors.json'):
  98. with open(filename, 'w') as f:
  99. json.dump(ErrorReport, f, indent=2)
  100. def main(source_db, target_db):
  101. source_conn = sqlite3.connect(source_db)
  102. target_conn = sqlite3.connect(target_db)
  103. # Attach source DB to target connection for SQL script generation
  104. target_conn.execute(f"ATTACH DATABASE '{source_db}' AS source_db")
  105. compare_and_alter_schema(source_conn, target_conn)
  106. generate_data_migration_script(source_conn)
  107. migrate_data(source_conn, target_conn)
  108. save_error_report()
  109. source_conn.close()
  110. target_conn.close()
  111. if __name__ == '__main__':
  112. import sys
  113. if len(sys.argv) != 3:
  114. print("Usage: python migrate_db.py <source.db> <target.db>")
  115. sys.exit(1)
  116. main(sys.argv[1], sys.argv[2])