def get_batch_primary_keys_job_config()

in projects/database-archival/src/database_archival/dag/tasks/data_pruning_preparation.py [0:0]


    def get_batch_primary_keys_job_config():
        """Creates the job config to batch primary keys to prune."""
        context = airflow_operators.get_current_context()
        dag_run_id = context['run_id']
        dag_date = context['ds_nodash']
        snapshot_table_name = context['ti'].xcom_pull(
            task_ids=task_namer.get_task_name(
                database_table_name=database_table_name,
                task_group_name=data_archiving.TASK_GROUP_NAME,
                task_name=data_archiving.TASK_NAME_GET_CONFIG,
            ),
            key=data_archiving.VAR_NAME_SNAPSHOT_TABLE,
        )

        snapshot_progress_table_name = (
            snapshot_table_name + database.PRUNE_PROGRESS_TABLE_NAME_SUFFIX
        )
        parsed_dag_date = f'PARSE_DATE("%Y%m%d", "{dag_date}")'
        calculated_batch_number = (
            'CAST(CEIL('
            f'ROW_NUMBER() OVER () / {database_prune_batch_size}'
            ') AS INT64)'
        )
        snapshot_fields = [
            database.FIELD_NAME_SNAPSHOT_RUN_ID,
            database.FIELD_NAME_SNAPSHOT_DATE,
            f'FALSE AS {database.FIELD_NAME_PRUNE_STATUS}',
            f'{calculated_batch_number} AS {database.FIELD_NAME_PRUNE_BATCH}',
        ]
        select_fields = table_primary_key_columns + snapshot_fields
        where_conditions = [
            f'{database.FIELD_NAME_SNAPSHOT_DATE} = {parsed_dag_date}',
            f'{database.FIELD_NAME_SNAPSHOT_RUN_ID} = "{dag_run_id}"',
        ]

        context['ti'].xcom_push(
            key=VAR_NAME_PRUNE_PROGRESS_TABLE,
            value=snapshot_progress_table_name,
        )
        return {
            'query': {
                'query': (
                    f"SELECT {', '.join(select_fields) } "
                    f'FROM `{snapshot_table_name}` '
                    f"WHERE {' AND '.join(where_conditions)}"
                ),
                'use_legacy_sql': False,
                'destination_table': bigquery_utils.split_table_name(
                    snapshot_progress_table_name
                ),
                'create_disposition': 'CREATE_IF_NEEDED',
                'write_disposition': 'WRITE_APPEND',
            },
        }