def initiate_backfill()

in dags/bqetl_backfill_initiate.py [0:0]


    def initiate_backfill(backfill):
        @task
        def prepare_slack_initiate_message(entry):
            watcher_text = " ".join(
                f"<@{watcher.split('@')[0]}>" for watcher in entry["watchers"]
            )
            return f"{watcher_text} :hourglass_flowing_sand: Initiating backfill scheduled for `{entry['qualified_table_name']}`.  You will receive another notification once the backfill is done."

        notify_initiate = SlackAPIPostOperator(
            task_id="slack_notify_initate",
            username="Backfill",
            slack_conn_id=SLACK_CONNECTION_ID,
            text=prepare_slack_initiate_message(backfill),
            channel=AUTOMATION_SLACK_CHANNEL,
        )

        @task
        def prepare_pod_parameters(entry):
            return [f"script/bqetl backfill initiate { entry['qualified_table_name'] }"]

        process_backfill = GKEPodOperator(
            task_id="process_backfill",
            name="process_backfill",
            cmds=["sh", "-cx"],
            arguments=prepare_pod_parameters(backfill),
            image=DOCKER_IMAGE,
            reattach_on_restart=True,
        )

        @task
        def prepare_slack_processing_complete_parameters(entry):
            project, dataset, table = entry["qualified_table_name"].split(".")
            backfill_table_id = (
                f"{dataset}__{table}_{entry['entry_date'].replace('-', '_')}"
            )
            staging_location = (
                f"{project}.backfills_staging_derived.{backfill_table_id}"
            )
            watcher_text = " ".join(
                f"<@{watcher.split('@')[0]}>" for watcher in entry["watchers"]
            )

            return (
                f"{watcher_text} :white_check_mark: Backfill processing is done. Staging location: `{staging_location}`. "
                "Please validate that your data has changed as you expect and complete your backfill by updating the Backfill entry's status to Complete in the bigquery-etl repository. "
                "Note that the staging table will expire in 30 days, so the backfill must be completed within 30 days."
            )

        notify_processing_complete = SlackAPIPostOperator(
            task_id="slack_notify_processing_complete",
            username="Backfill",
            slack_conn_id=SLACK_CONNECTION_ID,
            text=prepare_slack_processing_complete_parameters(backfill),
            channel=AUTOMATION_SLACK_CHANNEL,
        )

        notify_initiate >> process_backfill >> notify_processing_complete