in airflow-core/src/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py [0:0]
def downgrade():
"""Unapply Rename dataset as asset."""
# Rename tables
for original_name, new_name in table_name_mappings:
op.rename_table(new_name, original_name)
with op.batch_alter_table("asset_active", schema=None) as batch_op:
batch_op.drop_constraint("asset_active_asset_name_uri_fkey", type_="foreignkey")
with op.batch_alter_table("dataset", schema=None) as batch_op:
_rename_index(
batch_op=batch_op,
original_name="idx_asset_name_uri_unique",
new_name="idx_dataset_name_uri_unique",
columns=["name", "uri"],
unique=True,
)
with op.batch_alter_table("asset_active", schema=None) as batch_op:
batch_op.create_foreign_key(
constraint_name="asset_active_asset_name_uri_fkey",
referent_table="dataset",
local_cols=["name", "uri"],
remote_cols=["name", "uri"],
ondelete="CASCADE",
)
with op.batch_alter_table("dataset_alias_dataset", schema=None) as batch_op:
batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False)
with op.batch_alter_table("dataset_alias_dataset", schema=None) as batch_op:
batch_op.drop_constraint(op.f("asset_alias_asset_alias_id_fkey"), type_="foreignkey")
_rename_index(
batch_op=batch_op,
original_name="idx_asset_alias_asset_alias_id",
new_name="idx_dataset_alias_dataset_alias_id",
columns=["alias_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name=op.f("dataset_alias_dataset_alias_id_fkey"),
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
batch_op.drop_constraint(op.f("asset_alias_asset_asset_id_fkey"), type_="foreignkey")
_rename_index(
batch_op=batch_op,
original_name="idx_asset_alias_asset_asset_id",
new_name="idx_dataset_alias_dataset_alias_dataset_id",
columns=["dataset_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name=op.f("dataset_alias_dataset_dataset_id_fkey"),
referent_table="dataset",
local_cols=["dataset_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
_drop_fkey_if_exists("dataset_alias_dataset_event", "dataset_alias_dataset_event_event_id_fkey")
with op.batch_alter_table("dataset_alias_dataset_event", schema=None) as batch_op:
batch_op.drop_constraint(op.f("asset_alias_asset_event_alias_id_fkey"), type_="foreignkey")
_rename_index(
batch_op=batch_op,
original_name="idx_asset_alias_asset_event_alias_id",
new_name="idx_dataset_alias_dataset_event_alias_id",
columns=["alias_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name=op.f("dataset_alias_dataset_event_alias_id_fkey"),
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
batch_op.drop_constraint(op.f("asset_alias_asset_event_event_id_fkey"), type_="foreignkey")
_rename_index(
batch_op=batch_op,
original_name="idx_asset_alias_asset_event_event_id",
new_name="idx_dataset_alias_dataset_event_event_id",
columns=["event_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name=op.f("dataset_alias_dataset_event_event_id_fkey"),
referent_table="dataset_event",
local_cols=["event_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op:
batch_op.drop_constraint("dsaar_asset_alias_fkey", type_="foreignkey")
batch_op.drop_constraint("dsaar_dag_id_fkey", type_="foreignkey")
_rename_pk_constraint(
batch_op=batch_op,
original_name="dsaar_pkey",
new_name="dsdar_pkey",
columns=["alias_id", "dag_id"],
)
_rename_index(
batch_op=batch_op,
original_name="idx_dag_schedule_asset_alias_reference_dag_id",
new_name="idx_dag_schedule_dataset_alias_reference_dag_id",
columns=["dag_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name="dsdar_dataset_alias_fkey",
referent_table="dataset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
batch_op.create_foreign_key(
constraint_name="dsdar_dag_id_fkey",
referent_table="dag",
local_cols=["dag_id"],
remote_cols=["dag_id"],
ondelete="CASCADE",
)
with op.batch_alter_table("dag_schedule_dataset_reference", schema=None) as batch_op:
batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False)
batch_op.drop_constraint("dsar_dag_id_fkey", type_="foreignkey")
batch_op.drop_constraint("dsar_asset_fkey", type_="foreignkey")
_rename_index(
batch_op=batch_op,
original_name="idx_dag_schedule_asset_reference_dag_id",
new_name="idx_dag_schedule_dataset_reference_dag_id",
columns=["dag_id"],
unique=False,
)
_rename_pk_constraint(
batch_op=batch_op,
original_name="dsar_pkey",
new_name="dsdr_pkey",
columns=["dataset_id", "dag_id"],
)
batch_op.create_foreign_key(
constraint_name="dsdr_dag_id_fkey",
referent_table="dag",
local_cols=["dag_id"],
remote_cols=["dag_id"],
ondelete="CASCADE",
)
batch_op.create_foreign_key(
constraint_name="dsdr_dataset_fkey",
referent_table="dataset",
local_cols=["dataset_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
with op.batch_alter_table("task_outlet_dataset_reference", schema=None) as batch_op:
batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False)
batch_op.drop_constraint("toar_asset_fkey", type_="foreignkey")
batch_op.drop_constraint("toar_dag_id_fkey", type_="foreignkey")
_rename_index(
batch_op=batch_op,
original_name="idx_task_outlet_asset_reference_dag_id",
new_name="idx_task_outlet_dataset_reference_dag_id",
columns=["dag_id"],
unique=False,
)
_rename_pk_constraint(
batch_op=batch_op,
original_name="toar_pkey",
new_name="todr_pkey",
columns=["dataset_id", "dag_id", "task_id"],
)
batch_op.create_foreign_key(
constraint_name="todr_dataset_fkey",
referent_table="dataset",
local_cols=["dataset_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
batch_op.create_foreign_key(
constraint_name="todr_dag_id_fkey",
referent_table="dag",
local_cols=["dag_id"],
remote_cols=["dag_id"],
ondelete="CASCADE",
)
with op.batch_alter_table("dataset_dag_run_queue", schema=None) as batch_op:
batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False)
batch_op.drop_constraint("adrq_asset_fkey", type_="foreignkey")
batch_op.drop_constraint("adrq_dag_fkey", type_="foreignkey")
_rename_pk_constraint(
batch_op=batch_op,
original_name="assetdagrunqueue_pkey",
new_name="datasetdagrunqueue_pkey",
columns=["dataset_id", "target_dag_id"],
)
_rename_index(
batch_op=batch_op,
original_name="idx_asset_dag_run_queue_target_dag_id",
new_name="idx_dataset_dag_run_queue_target_dag_id",
columns=["target_dag_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name="ddrq_dataset_fkey",
referent_table="dataset",
local_cols=["dataset_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
batch_op.create_foreign_key(
constraint_name="ddrq_dag_fkey",
referent_table="dag",
local_cols=["target_dag_id"],
remote_cols=["dag_id"],
ondelete="CASCADE",
)
with op.batch_alter_table("dagrun_dataset_event", schema=None) as batch_op:
batch_op.drop_constraint(op.f("dagrun_asset_event_event_id_fkey"), type_="foreignkey")
_rename_index(
batch_op=batch_op,
original_name="idx_dagrun_asset_events_event_id",
new_name="idx_dagrun_dataset_events_event_id",
columns=["event_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name="dagrun_dataset_event_event_id_fkey",
referent_table="dataset_event",
local_cols=["event_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
batch_op.drop_constraint(op.f("dagrun_asset_event_dag_run_id_fkey"), type_="foreignkey")
_rename_index(
batch_op=batch_op,
original_name="idx_dagrun_asset_events_dag_run_id",
new_name="idx_dagrun_dataset_events_dag_run_id",
columns=["dag_run_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name="dagrun_dataset_event_dag_run_id_fkey",
referent_table="dag_run",
local_cols=["dag_run_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
_rename_pk_constraint(
batch_op=batch_op,
original_name="dagrun_asset_event_pkey",
new_name="dagrun_dataset_event_pkey",
columns=["event_id", "dag_run_id"],
)
with op.batch_alter_table("dataset_event", schema=None) as batch_op:
batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False)
with op.batch_alter_table("dataset_event", schema=None) as batch_op:
_rename_index(
batch_op=batch_op,
original_name="idx_asset_id_timestamp",
new_name="idx_dataset_id_timestamp",
columns=["dataset_id", "timestamp"],
unique=False,
)
with op.batch_alter_table("dataset_alias", schema=None) as batch_op:
_rename_index(
batch_op=batch_op,
original_name="idx_asset_alias_name_unique",
new_name="idx_dataset_alias_name_unique",
columns=["name"],
unique=True,
)
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.alter_column(
"asset_expression",
new_column_name="dataset_expression",
type_=sqlalchemy_jsonfield.JSONField(json=json),
)