in src/databao_context_engine/storage/migrate.py [0:0]
def migrate(self) -> None:
applied_migrations: list[MigrationDTO] = self.init_db_and_load_applied_migrations()
applied_checksums = [m.checksum for m in applied_migrations]
applied_versions = [m.version for m in applied_migrations]
migrations_to_apply = [m for m in self._requested_migrations if m.checksum not in applied_checksums]
duplicated_versions = [
migration.version for migration in migrations_to_apply if migration.version in applied_versions
]
if any(duplicated_versions):
raise MigrationError(f"Migrations with versions {duplicated_versions} already exist")
with duckdb.connect(self._db_path) as conn:
for migration in migrations_to_apply:
logger.debug("Applying migration %s", migration.name)
with conn.cursor() as cur:
cur.execute("START TRANSACTION;")
try:
cur.execute(migration.query)
cur.execute(self._insert_migration_sql, [migration.name, migration.version, migration.checksum])
cur.commit()
except Exception:
cur.rollback()
raise MigrationError(f"Failed to apply migration {migration.name}. Aborting migration process.")