dags/operational_monitoring_backfill.py (68 lines of code) (raw):
import datetime
from airflow.decorators import dag, task
from airflow.models.param import Param
from operators.gcp_container_operator import GKEPodOperator
from utils.tags import Tag
docs = """
### operational_monitoring_backfill
Build from telemetry-airflow repo, [dags/operational_monitoring_backfill.py](https://github.com/mozilla/telemetry-airflow/blob/main/dags/operational_monitoring_backfill.py)
Triggers backfills for specifc operational monitoring projects.
#### Owner
ascholtz@mozilla.com
lschiestl@mozilla.com
"""
tags = [Tag.ImpactTier.tier_3, Tag.Triage.no_triage]
@dag(
dag_id="operational_monitoring_backfill",
start_date=datetime.datetime(2021, 1, 1, 0, 0),
schedule_interval=None,
catchup=False,
doc_md=docs,
dagrun_timeout=datetime.timedelta(days=4),
tags=tags,
render_template_as_native_obj=True,
params={
"slug": Param(
"slug",
title="Slug",
type="string",
description="[Required] Experimenter slug or slug of OpMon project to (re)run the analysis for",
),
"start_date": Param(
f"{datetime.date.today()}",
title="Start Date",
type="string",
format="date",
description="[Required] First date to be backfilled, inclusive",
),
"end_date": Param(
f"{datetime.date.today()}",
title="End Date",
type="string",
format="date",
description="[Required] Last date to be backfilled, inclusive",
),
"args": Param(
None,
title="Additional Arguments",
type=["null", "string"],
description="[Optional] Additional command line arguments",
),
},
)
def operational_monitoring_backfill_dag():
@task
def generate_backfill_arguments(**context):
cmd = [
"backfill",
"--slug",
context["params"]["slug"],
"--start-date",
context["params"]["start_date"],
"--end_date",
context["params"]["end_date"],
]
if args := context["params"]["args"]:
cmd.append(args)
return cmd
# Built from repo https://github.com/mozilla/opmon
opmon_image = "gcr.io/moz-fx-data-experiments/opmon:latest"
GKEPodOperator(
task_id="opmon_backfill",
name="opmon_backfill",
image=opmon_image,
arguments=generate_backfill_arguments(),
)
dag = operational_monitoring_backfill_dag()