dags/catalyst.py (90 lines of code) (raw):
"""
DAG to schedule generation of performance reports for recently completed nimbus experiments.
See the [catalyst repository](https://github.com/mozilla/catalyst).
*Triage notes*
This app will perform some bigquery queries, and generate statistical reports based on that data which are
then published to https://protosaur.dev/perf-reports/index.html.
Generally, there should be minimal triage necessary for failures unless it's related to infrastructure issues.
Any failures related to the app execution itself will be taken care of directly by the performance team.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.tags import Tag
default_args = {
"owner": "dpalmeiro@mozilla.com",
"email": [
"dpalmeiro@mozilla.com",
],
"depends_on_past": False,
"start_date": datetime(2025, 5, 5),
"email_on_failure": True,
"email_on_retry": True,
"retries": 1,
"retry_delay": timedelta(minutes=30),
}
tags = [Tag.ImpactTier.tier_2]
with DAG(
"catalyst",
default_args=default_args,
schedule_interval="0 4 * * *",
doc_md=__doc__,
tags=tags,
) as dag:
# Built from repo https://github.com/mozilla/catalyst
catalyst_image = "gcr.io/moz-fx-data-experiments/catalyst:latest"
catalyst_run = GKEPodOperator(
task_id="catalyst_run",
name="catalyst_run",
image=catalyst_image,
email=default_args["email"],
dag=dag,
)
wait_for_clients_daily_export = ExternalTaskSensor(
task_id="wait_for_clients_daily",
external_dag_id="bqetl_main_summary",
external_task_id="telemetry_derived__clients_daily__v6",
execution_delta=timedelta(hours=2),
mode="reschedule",
allowed_states=ALLOWED_STATES,
failed_states=FAILED_STATES,
pool="DATA_ENG_EXTERNALTASKSENSOR",
email_on_retry=False,
dag=dag,
)
wait_for_search_clients_daily = ExternalTaskSensor(
task_id="wait_for_search_clients_daily",
external_dag_id="bqetl_search",
external_task_id="search_derived__search_clients_daily__v8",
execution_delta=timedelta(hours=1),
mode="reschedule",
allowed_states=ALLOWED_STATES,
failed_states=FAILED_STATES,
pool="DATA_ENG_EXTERNALTASKSENSOR",
email_on_retry=False,
dag=dag,
)
wait_for_bq_events = ExternalTaskSensor(
task_id="wait_for_bq_main_events",
external_dag_id="copy_deduplicate",
external_task_id="bq_main_events",
execution_delta=timedelta(hours=3),
mode="reschedule",
allowed_states=ALLOWED_STATES,
failed_states=FAILED_STATES,
pool="DATA_ENG_EXTERNALTASKSENSOR",
email_on_retry=False,
dag=dag,
)
wait_for_copy_deduplicate_events = ExternalTaskSensor(
task_id="wait_for_event_events",
external_dag_id="copy_deduplicate",
external_task_id="event_events",
execution_delta=timedelta(hours=3),
mode="reschedule",
allowed_states=ALLOWED_STATES,
failed_states=FAILED_STATES,
pool="DATA_ENG_EXTERNALTASKSENSOR",
email_on_retry=False,
dag=dag,
)
catalyst_run.set_upstream(
[
wait_for_clients_daily_export,
wait_for_search_clients_daily,
wait_for_bq_events,
wait_for_copy_deduplicate_events,
]
)