in dags/bqetl_backfill.py [0:0]
def bqetl_backfill_dag():
@task
def generate_backfill_command(**context):
"""Generate backfill command with arguments."""
cmd = [
"bqetl",
"query",
"backfill",
context["params"]["table_name"],
"--sql_dir",
context["params"]["sql_dir"],
"--project_id",
context["params"]["project_id"],
"--start_date",
context["params"]["start_date"],
"--end_date",
context["params"]["end_date"],
"--max_rows",
str(context["params"]["max_rows"]),
"--parallelism",
str(context["params"]["parallelism"]),
]
if destination_table := context["params"]["destination_table"]:
cmd.append(f"--destination_table={destination_table}")
if excludes := context["params"]["exclude"]:
for exclude in excludes:
cmd.extend(["--exclude", exclude])
if scheduling_overrides := context["params"]["scheduling_overrides"]:
cmd.extend(["--scheduling_overrides", json.dumps(scheduling_overrides)])
if context["params"]["dry_run"]:
cmd.append("--dry_run")
if context["params"]["run_checks"]:
cmd.append("--checks")
else:
cmd.append("--no-checks")
if context["params"]["override_retention_range_limit"]:
cmd.append("--override-retention-range-limit")
if billing_project := context["params"]["billing_project"]:
cmd.append(f"--billing-project={billing_project}")
if not all(isinstance(c, str) for c in cmd):
raise Exception(
f"All GKE arguments must be strings! Did you do something surprising to the DAG params?\nArgs: {cmd}"
)
print("To run the command locally, execute the following:\r" + " ".join(cmd))
return cmd
GKEPodOperator(
reattach_on_restart=True,
task_id="bqetl_backfill",
arguments=generate_backfill_command(),
image="gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest",
gcp_conn_id="google_cloud_airflow_gke",
)