def downgrade()

in airflow-core/src/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py [0:0]


def downgrade():
    """Unapply Rename dataset as asset."""
    # Rename tables
    for original_name, new_name in table_name_mappings:
        op.rename_table(new_name, original_name)

    with op.batch_alter_table("asset_active", schema=None) as batch_op:
        batch_op.drop_constraint("asset_active_asset_name_uri_fkey", type_="foreignkey")

    with op.batch_alter_table("dataset", schema=None) as batch_op:
        _rename_index(
            batch_op=batch_op,
            original_name="idx_asset_name_uri_unique",
            new_name="idx_dataset_name_uri_unique",
            columns=["name", "uri"],
            unique=True,
        )

    with op.batch_alter_table("asset_active", schema=None) as batch_op:
        batch_op.create_foreign_key(
            constraint_name="asset_active_asset_name_uri_fkey",
            referent_table="dataset",
            local_cols=["name", "uri"],
            remote_cols=["name", "uri"],
            ondelete="CASCADE",
        )

    with op.batch_alter_table("dataset_alias_dataset", schema=None) as batch_op:
        batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False)

    with op.batch_alter_table("dataset_alias_dataset", schema=None) as batch_op:
        batch_op.drop_constraint(op.f("asset_alias_asset_alias_id_fkey"), type_="foreignkey")
        _rename_index(
            batch_op=batch_op,
            original_name="idx_asset_alias_asset_alias_id",
            new_name="idx_dataset_alias_dataset_alias_id",
            columns=["alias_id"],
            unique=False,
        )
        batch_op.create_foreign_key(
            constraint_name=op.f("dataset_alias_dataset_alias_id_fkey"),
            referent_table="dataset_alias",
            local_cols=["alias_id"],
            remote_cols=["id"],
            ondelete="CASCADE",
        )

        batch_op.drop_constraint(op.f("asset_alias_asset_asset_id_fkey"), type_="foreignkey")
        _rename_index(
            batch_op=batch_op,
            original_name="idx_asset_alias_asset_asset_id",
            new_name="idx_dataset_alias_dataset_alias_dataset_id",
            columns=["dataset_id"],
            unique=False,
        )
        batch_op.create_foreign_key(
            constraint_name=op.f("dataset_alias_dataset_dataset_id_fkey"),
            referent_table="dataset",
            local_cols=["dataset_id"],
            remote_cols=["id"],
            ondelete="CASCADE",
        )

    _drop_fkey_if_exists("dataset_alias_dataset_event", "dataset_alias_dataset_event_event_id_fkey")
    with op.batch_alter_table("dataset_alias_dataset_event", schema=None) as batch_op:
        batch_op.drop_constraint(op.f("asset_alias_asset_event_alias_id_fkey"), type_="foreignkey")
        _rename_index(
            batch_op=batch_op,
            original_name="idx_asset_alias_asset_event_alias_id",
            new_name="idx_dataset_alias_dataset_event_alias_id",
            columns=["alias_id"],
            unique=False,
        )
        batch_op.create_foreign_key(
            constraint_name=op.f("dataset_alias_dataset_event_alias_id_fkey"),
            referent_table="dataset_alias",
            local_cols=["alias_id"],
            remote_cols=["id"],
            ondelete="CASCADE",
        )

        batch_op.drop_constraint(op.f("asset_alias_asset_event_event_id_fkey"), type_="foreignkey")
        _rename_index(
            batch_op=batch_op,
            original_name="idx_asset_alias_asset_event_event_id",
            new_name="idx_dataset_alias_dataset_event_event_id",
            columns=["event_id"],
            unique=False,
        )
        batch_op.create_foreign_key(
            constraint_name=op.f("dataset_alias_dataset_event_event_id_fkey"),
            referent_table="dataset_event",
            local_cols=["event_id"],
            remote_cols=["id"],
            ondelete="CASCADE",
        )

    with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op:
        batch_op.drop_constraint("dsaar_asset_alias_fkey", type_="foreignkey")
        batch_op.drop_constraint("dsaar_dag_id_fkey", type_="foreignkey")

        _rename_pk_constraint(
            batch_op=batch_op,
            original_name="dsaar_pkey",
            new_name="dsdar_pkey",
            columns=["alias_id", "dag_id"],
        )
        _rename_index(
            batch_op=batch_op,
            original_name="idx_dag_schedule_asset_alias_reference_dag_id",
            new_name="idx_dag_schedule_dataset_alias_reference_dag_id",
            columns=["dag_id"],
            unique=False,
        )

        batch_op.create_foreign_key(
            constraint_name="dsdar_dataset_alias_fkey",
            referent_table="dataset_alias",
            local_cols=["alias_id"],
            remote_cols=["id"],
            ondelete="CASCADE",
        )
        batch_op.create_foreign_key(
            constraint_name="dsdar_dag_id_fkey",
            referent_table="dag",
            local_cols=["dag_id"],
            remote_cols=["dag_id"],
            ondelete="CASCADE",
        )

    with op.batch_alter_table("dag_schedule_dataset_reference", schema=None) as batch_op:
        batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False)

        batch_op.drop_constraint("dsar_dag_id_fkey", type_="foreignkey")
        batch_op.drop_constraint("dsar_asset_fkey", type_="foreignkey")

        _rename_index(
            batch_op=batch_op,
            original_name="idx_dag_schedule_asset_reference_dag_id",
            new_name="idx_dag_schedule_dataset_reference_dag_id",
            columns=["dag_id"],
            unique=False,
        )
        _rename_pk_constraint(
            batch_op=batch_op,
            original_name="dsar_pkey",
            new_name="dsdr_pkey",
            columns=["dataset_id", "dag_id"],
        )

        batch_op.create_foreign_key(
            constraint_name="dsdr_dag_id_fkey",
            referent_table="dag",
            local_cols=["dag_id"],
            remote_cols=["dag_id"],
            ondelete="CASCADE",
        )
        batch_op.create_foreign_key(
            constraint_name="dsdr_dataset_fkey",
            referent_table="dataset",
            local_cols=["dataset_id"],
            remote_cols=["id"],
            ondelete="CASCADE",
        )

    with op.batch_alter_table("task_outlet_dataset_reference", schema=None) as batch_op:
        batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False)
        batch_op.drop_constraint("toar_asset_fkey", type_="foreignkey")
        batch_op.drop_constraint("toar_dag_id_fkey", type_="foreignkey")

        _rename_index(
            batch_op=batch_op,
            original_name="idx_task_outlet_asset_reference_dag_id",
            new_name="idx_task_outlet_dataset_reference_dag_id",
            columns=["dag_id"],
            unique=False,
        )
        _rename_pk_constraint(
            batch_op=batch_op,
            original_name="toar_pkey",
            new_name="todr_pkey",
            columns=["dataset_id", "dag_id", "task_id"],
        )

        batch_op.create_foreign_key(
            constraint_name="todr_dataset_fkey",
            referent_table="dataset",
            local_cols=["dataset_id"],
            remote_cols=["id"],
            ondelete="CASCADE",
        )
        batch_op.create_foreign_key(
            constraint_name="todr_dag_id_fkey",
            referent_table="dag",
            local_cols=["dag_id"],
            remote_cols=["dag_id"],
            ondelete="CASCADE",
        )

    with op.batch_alter_table("dataset_dag_run_queue", schema=None) as batch_op:
        batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False)

        batch_op.drop_constraint("adrq_asset_fkey", type_="foreignkey")
        batch_op.drop_constraint("adrq_dag_fkey", type_="foreignkey")

        _rename_pk_constraint(
            batch_op=batch_op,
            original_name="assetdagrunqueue_pkey",
            new_name="datasetdagrunqueue_pkey",
            columns=["dataset_id", "target_dag_id"],
        )
        _rename_index(
            batch_op=batch_op,
            original_name="idx_asset_dag_run_queue_target_dag_id",
            new_name="idx_dataset_dag_run_queue_target_dag_id",
            columns=["target_dag_id"],
            unique=False,
        )

        batch_op.create_foreign_key(
            constraint_name="ddrq_dataset_fkey",
            referent_table="dataset",
            local_cols=["dataset_id"],
            remote_cols=["id"],
            ondelete="CASCADE",
        )
        batch_op.create_foreign_key(
            constraint_name="ddrq_dag_fkey",
            referent_table="dag",
            local_cols=["target_dag_id"],
            remote_cols=["dag_id"],
            ondelete="CASCADE",
        )

    with op.batch_alter_table("dagrun_dataset_event", schema=None) as batch_op:
        batch_op.drop_constraint(op.f("dagrun_asset_event_event_id_fkey"), type_="foreignkey")
        _rename_index(
            batch_op=batch_op,
            original_name="idx_dagrun_asset_events_event_id",
            new_name="idx_dagrun_dataset_events_event_id",
            columns=["event_id"],
            unique=False,
        )
        batch_op.create_foreign_key(
            constraint_name="dagrun_dataset_event_event_id_fkey",
            referent_table="dataset_event",
            local_cols=["event_id"],
            remote_cols=["id"],
            ondelete="CASCADE",
        )

        batch_op.drop_constraint(op.f("dagrun_asset_event_dag_run_id_fkey"), type_="foreignkey")
        _rename_index(
            batch_op=batch_op,
            original_name="idx_dagrun_asset_events_dag_run_id",
            new_name="idx_dagrun_dataset_events_dag_run_id",
            columns=["dag_run_id"],
            unique=False,
        )
        batch_op.create_foreign_key(
            constraint_name="dagrun_dataset_event_dag_run_id_fkey",
            referent_table="dag_run",
            local_cols=["dag_run_id"],
            remote_cols=["id"],
            ondelete="CASCADE",
        )
        _rename_pk_constraint(
            batch_op=batch_op,
            original_name="dagrun_asset_event_pkey",
            new_name="dagrun_dataset_event_pkey",
            columns=["event_id", "dag_run_id"],
        )

    with op.batch_alter_table("dataset_event", schema=None) as batch_op:
        batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False)

    with op.batch_alter_table("dataset_event", schema=None) as batch_op:
        _rename_index(
            batch_op=batch_op,
            original_name="idx_asset_id_timestamp",
            new_name="idx_dataset_id_timestamp",
            columns=["dataset_id", "timestamp"],
            unique=False,
        )

    with op.batch_alter_table("dataset_alias", schema=None) as batch_op:
        _rename_index(
            batch_op=batch_op,
            original_name="idx_asset_alias_name_unique",
            new_name="idx_dataset_alias_name_unique",
            columns=["name"],
            unique=True,
        )

    with op.batch_alter_table("dag", schema=None) as batch_op:
        batch_op.alter_column(
            "asset_expression",
            new_column_name="dataset_expression",
            type_=sqlalchemy_jsonfield.JSONField(json=json),
        )