in airflow-core/src/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py [0:0]
def upgrade():
"""Rename dataset as asset."""
_drop_fkey_if_exists("dataset_alias_dataset", "dataset_alias_dataset_dataset_id_fkey")
_drop_fkey_if_exists("dataset_alias_dataset", "dataset_alias_dataset_alias_id_fkey")
_drop_fkey_if_exists("dataset_alias_dataset", "ds_dsa_alias_id")
_drop_fkey_if_exists("dataset_alias_dataset", "ds_dsa_dataset_id")
_drop_fkey_if_exists("dataset_alias_dataset_event", "dataset_alias_dataset_dataset_id_fkey")
_drop_fkey_if_exists("dataset_alias_dataset_event", "dataset_alias_dataset_event_alias_id_fkey")
_drop_fkey_if_exists("dataset_alias_dataset_event", "dataset_alias_dataset_event_event_id_fkey")
_drop_fkey_if_exists("dataset_alias_dataset_event", "dss_de_alias_id")
_drop_fkey_if_exists("dataset_alias_dataset_event", "dss_de_event_id")
_drop_fkey_if_exists("dag_schedule_dataset_alias_reference", "dsdar_dag_id_fkey")
_drop_fkey_if_exists("dag_schedule_dataset_alias_reference", "dsdar_dataset_alias_fkey")
_drop_fkey_if_exists("dag_schedule_dataset_reference", "dsdr_dag_id_fkey")
_drop_fkey_if_exists("dag_schedule_dataset_reference", "dsdr_dataset_fkey")
_drop_fkey_if_exists("task_outlet_dataset_reference", "todr_dataset_fkey")
_drop_fkey_if_exists("dataset_dag_run_queue", "ddrq_dag_fkey")
_drop_fkey_if_exists("dataset_dag_run_queue", "ddrq_dataset_fkey")
_drop_fkey_if_exists("dagrun_dataset_event", "dagrun_dataset_events_event_id_fkey")
_drop_fkey_if_exists("dagrun_dataset_event", "dagrun_dataset_event_event_id_fkey")
_drop_fkey_if_exists("dagrun_dataset_event", "dagrun_dataset_events_dag_run_id_fkey")
_drop_fkey_if_exists("dagrun_dataset_event", "dagrun_dataset_event_dag_run_id_fkey")
# Rename tables
for original_name, new_name in table_name_mappings:
op.rename_table(original_name, new_name)
with op.batch_alter_table("asset_active", schema=None) as batch_op:
batch_op.drop_constraint("asset_active_asset_name_uri_fkey", type_="foreignkey")
with op.batch_alter_table("asset", schema=None) as batch_op:
_rename_index(
batch_op=batch_op,
original_name="idx_dataset_name_uri_unique",
new_name="idx_asset_name_uri_unique",
columns=["name", "uri"],
unique=True,
)
with op.batch_alter_table("asset_active", schema=None) as batch_op:
batch_op.create_foreign_key(
constraint_name="asset_active_asset_name_uri_fkey",
referent_table="asset",
local_cols=["name", "uri"],
remote_cols=["name", "uri"],
ondelete="CASCADE",
)
with op.batch_alter_table("asset_alias_asset", schema=None) as batch_op:
batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False)
_rename_pk_constraint_unkown(
batch_op=batch_op,
table_name="asset_alias_asset",
original_name="dataset_alias_dataset_pkey",
alternative_name="asset_alias_asset_pkey",
new_name="asset_alias_asset_pkey",
columns=["alias_id", "asset_id"],
)
with op.batch_alter_table("asset_alias_asset", schema=None) as batch_op:
_rename_index(
batch_op=batch_op,
original_name="idx_dataset_alias_dataset_alias_id",
new_name="idx_asset_alias_asset_alias_id",
columns=["alias_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name="asset_alias_asset_alias_id_fkey",
referent_table="asset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
_rename_index(
batch_op=batch_op,
original_name="idx_dataset_alias_dataset_alias_dataset_id",
new_name="idx_asset_alias_asset_asset_id",
columns=["asset_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name="asset_alias_asset_asset_id_fkey",
referent_table="asset",
local_cols=["asset_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
with op.batch_alter_table("asset_alias_asset_event", schema=None) as batch_op:
_rename_index(
batch_op=batch_op,
original_name="idx_dataset_alias_dataset_event_alias_id",
new_name="idx_asset_alias_asset_event_alias_id",
columns=["alias_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name=op.f("asset_alias_asset_event_alias_id_fkey"),
referent_table="asset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
_rename_index(
batch_op=batch_op,
original_name="idx_dataset_alias_dataset_event_event_id",
new_name="idx_asset_alias_asset_event_event_id",
columns=["event_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name=op.f("asset_alias_asset_event_event_id_fkey"),
referent_table="asset_event",
local_cols=["event_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
with op.batch_alter_table("dag_schedule_asset_alias_reference", schema=None) as batch_op:
_rename_pk_constraint(
batch_op=batch_op,
original_name="dsdar_pkey",
new_name="dsaar_pkey",
columns=["alias_id", "dag_id"],
)
_rename_index(
batch_op=batch_op,
original_name="idx_dag_schedule_dataset_alias_reference_dag_id",
new_name="idx_dag_schedule_asset_alias_reference_dag_id",
columns=["dag_id"],
unique=False,
)
with op.batch_alter_table("dag_schedule_asset_alias_reference", schema=None) as batch_op:
batch_op.create_foreign_key(
constraint_name="dsaar_asset_alias_fkey",
referent_table="asset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
batch_op.create_foreign_key(
constraint_name="dsaar_dag_id_fkey",
referent_table="dag",
local_cols=["dag_id"],
remote_cols=["dag_id"],
ondelete="CASCADE",
)
with op.batch_alter_table("dag_schedule_asset_reference", schema=None) as batch_op:
batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False)
with op.batch_alter_table("dag_schedule_asset_reference", schema=None) as batch_op:
_rename_pk_constraint_unkown(
batch_op=batch_op,
table_name="dag_schedule_asset_reference",
original_name="dag_schedule_dataset_reference_pkey",
alternative_name="dsdr_pkey",
new_name="dsar_pkey",
columns=["asset_id", "dag_id"],
)
_rename_index(
batch_op=batch_op,
original_name="idx_dag_schedule_dataset_reference_dag_id",
new_name="idx_dag_schedule_asset_reference_dag_id",
columns=["dag_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name="dsar_dag_id_fkey",
referent_table="dag",
local_cols=["dag_id"],
remote_cols=["dag_id"],
ondelete="CASCADE",
)
batch_op.create_foreign_key(
constraint_name="dsar_asset_fkey",
referent_table="asset",
local_cols=["asset_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
with op.batch_alter_table("task_outlet_asset_reference", schema=None) as batch_op:
batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False)
batch_op.drop_constraint("todr_dag_id_fkey", type_="foreignkey")
with op.batch_alter_table("task_outlet_asset_reference", schema=None) as batch_op:
_rename_pk_constraint_unkown(
batch_op=batch_op,
table_name="task_outlet_asset_reference",
original_name="task_outlet_dataset_reference_pkey",
alternative_name="todr_pkey",
new_name="toar_pkey",
columns=["asset_id", "dag_id", "task_id"],
)
_rename_index(
batch_op=batch_op,
original_name="idx_task_outlet_dataset_reference_dag_id",
new_name="idx_task_outlet_asset_reference_dag_id",
columns=["dag_id"],
unique=False,
)
with op.batch_alter_table("task_outlet_asset_reference", schema=None) as batch_op:
batch_op.create_foreign_key("toar_asset_fkey", "asset", ["asset_id"], ["id"], ondelete="CASCADE")
batch_op.create_foreign_key(
constraint_name="toar_dag_id_fkey",
referent_table="dag",
local_cols=["dag_id"],
remote_cols=["dag_id"],
ondelete="CASCADE",
)
with op.batch_alter_table("asset_dag_run_queue", schema=None) as batch_op:
batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False)
with op.batch_alter_table("asset_dag_run_queue", schema=None) as batch_op:
_rename_pk_constraint_unkown(
batch_op=batch_op,
table_name="asset_dag_run_queue",
original_name="dataset_dag_run_queue_pkey",
alternative_name="datasetdagrunqueue_pkey",
new_name="assetdagrunqueue_pkey",
columns=["asset_id", "target_dag_id"],
)
_rename_index(
batch_op=batch_op,
original_name="idx_dataset_dag_run_queue_target_dag_id",
new_name="idx_asset_dag_run_queue_target_dag_id",
columns=["target_dag_id"],
unique=False,
)
with op.batch_alter_table("asset_dag_run_queue", schema=None) as batch_op:
batch_op.create_foreign_key("adrq_asset_fkey", "asset", ["asset_id"], ["id"], ondelete="CASCADE")
batch_op.create_foreign_key(
constraint_name="adrq_dag_fkey",
referent_table="dag",
local_cols=["target_dag_id"],
remote_cols=["dag_id"],
ondelete="CASCADE",
)
with op.batch_alter_table("dagrun_asset_event", schema=None) as batch_op:
_rename_index(
batch_op=batch_op,
original_name="idx_dagrun_dataset_events_dag_run_id",
new_name="idx_dagrun_asset_events_dag_run_id",
columns=["dag_run_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name="dagrun_asset_event_dag_run_id_fkey",
referent_table="dag_run",
local_cols=["dag_run_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
_rename_index(
batch_op=batch_op,
original_name="idx_dagrun_dataset_events_event_id",
new_name="idx_dagrun_asset_events_event_id",
columns=["event_id"],
unique=False,
)
batch_op.create_foreign_key(
constraint_name="dagrun_asset_event_event_id_fkey",
referent_table="asset_event",
local_cols=["event_id"],
remote_cols=["id"],
ondelete="CASCADE",
)
_rename_pk_constraint_unkown(
batch_op=batch_op,
table_name="dagrun_asset_event",
original_name="dagrun_dataset_events_pkey",
alternative_name="dagrun_dataset_event_pkey",
new_name="dagrun_asset_event_pkey",
columns=["event_id", "dag_run_id"],
)
with op.batch_alter_table("asset_event", schema=None) as batch_op:
batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False)
with op.batch_alter_table("asset_event", schema=None) as batch_op:
_rename_index(
batch_op=batch_op,
original_name="idx_dataset_id_timestamp",
new_name="idx_asset_id_timestamp",
columns=["asset_id", "timestamp"],
unique=False,
)
with op.batch_alter_table("asset_alias", schema=None) as batch_op:
_rename_index(
batch_op=batch_op,
original_name="idx_dataset_alias_name_unique",
new_name="idx_asset_alias_name_unique",
columns=["name"],
unique=True,
)
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.alter_column(
"dataset_expression",
new_column_name="asset_expression",
type_=sqlalchemy_jsonfield.JSONField(json=json),
)