def bqetl_backfill_dag()

in dags/bqetl_backfill.py [0:0]


def bqetl_backfill_dag():
    @task
    def generate_backfill_command(**context):
        """Generate backfill command with arguments."""
        cmd = [
            "bqetl",
            "query",
            "backfill",
            context["params"]["table_name"],
            "--sql_dir",
            context["params"]["sql_dir"],
            "--project_id",
            context["params"]["project_id"],
            "--start_date",
            context["params"]["start_date"],
            "--end_date",
            context["params"]["end_date"],
            "--max_rows",
            str(context["params"]["max_rows"]),
            "--parallelism",
            str(context["params"]["parallelism"]),
        ]

        if destination_table := context["params"]["destination_table"]:
            cmd.append(f"--destination_table={destination_table}")

        if excludes := context["params"]["exclude"]:
            for exclude in excludes:
                cmd.extend(["--exclude", exclude])

        if scheduling_overrides := context["params"]["scheduling_overrides"]:
            cmd.extend(["--scheduling_overrides", json.dumps(scheduling_overrides)])

        if context["params"]["dry_run"]:
            cmd.append("--dry_run")

        if context["params"]["run_checks"]:
            cmd.append("--checks")
        else:
            cmd.append("--no-checks")

        if context["params"]["override_retention_range_limit"]:
            cmd.append("--override-retention-range-limit")

        if billing_project := context["params"]["billing_project"]:
            cmd.append(f"--billing-project={billing_project}")

        if not all(isinstance(c, str) for c in cmd):
            raise Exception(
                f"All GKE arguments must be strings! Did you do something surprising to the DAG params?\nArgs: {cmd}"
            )

        print("To run the command locally, execute the following:\r" + " ".join(cmd))

        return cmd

    GKEPodOperator(
        reattach_on_restart=True,
        task_id="bqetl_backfill",
        arguments=generate_backfill_command(),
        image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
        gcp_conn_id="google_cloud_airflow_gke",
    )