in projects/database-archival/src/database_archival/dag/tasks/data_pruning_preparation.py [0:0]
def get_batch_primary_keys_job_config():
"""Creates the job config to batch primary keys to prune."""
context = airflow_operators.get_current_context()
dag_run_id = context['run_id']
dag_date = context['ds_nodash']
snapshot_table_name = context['ti'].xcom_pull(
task_ids=task_namer.get_task_name(
database_table_name=database_table_name,
task_group_name=data_archiving.TASK_GROUP_NAME,
task_name=data_archiving.TASK_NAME_GET_CONFIG,
),
key=data_archiving.VAR_NAME_SNAPSHOT_TABLE,
)
snapshot_progress_table_name = (
snapshot_table_name + database.PRUNE_PROGRESS_TABLE_NAME_SUFFIX
)
parsed_dag_date = f'PARSE_DATE("%Y%m%d", "{dag_date}")'
calculated_batch_number = (
'CAST(CEIL('
f'ROW_NUMBER() OVER () / {database_prune_batch_size}'
') AS INT64)'
)
snapshot_fields = [
database.FIELD_NAME_SNAPSHOT_RUN_ID,
database.FIELD_NAME_SNAPSHOT_DATE,
f'FALSE AS {database.FIELD_NAME_PRUNE_STATUS}',
f'{calculated_batch_number} AS {database.FIELD_NAME_PRUNE_BATCH}',
]
select_fields = table_primary_key_columns + snapshot_fields
where_conditions = [
f'{database.FIELD_NAME_SNAPSHOT_DATE} = {parsed_dag_date}',
f'{database.FIELD_NAME_SNAPSHOT_RUN_ID} = "{dag_run_id}"',
]
context['ti'].xcom_push(
key=VAR_NAME_PRUNE_PROGRESS_TABLE,
value=snapshot_progress_table_name,
)
return {
'query': {
'query': (
f"SELECT {', '.join(select_fields) } "
f'FROM `{snapshot_table_name}` '
f"WHERE {' AND '.join(where_conditions)}"
),
'use_legacy_sql': False,
'destination_table': bigquery_utils.split_table_name(
snapshot_progress_table_name
),
'create_disposition': 'CREATE_IF_NEEDED',
'write_disposition': 'WRITE_APPEND',
},
}