dags/search_forecasting.py (55 lines of code) (raw):
"""
See [kpi-forecasting in the docker-etl repository](https://github.com/mozilla/docker-etl/blob/main/jobs/kpi-forecasting).
This DAG runs the search forecasts for the DAU, search count and ad clicks metrics .
This DAG is high priority for week 1 of the month and low priority otherwise.
"""
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from operators.gcp_container_operator import GKEPodOperator
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.tags import Tag
default_args = {
"owner": "jsnyder@mozilla.com",
"email": [
"jsnyder@mozilla.com",
"mbowerman@mozilla.com",
"telemetry-alerts@mozilla.com",
],
"depends_on_past": False,
"start_date": datetime(2024, 7, 6),
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=30),
}
TAGS = [Tag.ImpactTier.tier_1]
IMAGE = "gcr.io/moz-fx-data-airflow-prod-88e0/kpi-forecasting_docker_etl:latest"
FORECAST_METRICS_LIST = [
"search_forecasting_daily_active_users",
"search_forecasting_search_count",
"search_forecasting_ad_clicks",
]
# schedule to run after bqetl_search_dashboard completes
with DAG(
"search_forecasting",
default_args=default_args,
schedule_interval="30 5 7 * *",
doc_md=__doc__,
tags=TAGS,
) as dag:
# all the search forecasting metrics come from the search_revenue_levers_daily
# table which is run in the bqetl_search_dashboard dag
# as the search_derived__search_revenue_levers_daily__v1 task
# see: https://workflow.telemetry.mozilla.org/dags/bqetl_search_dashboard/grid
wait_task_sensor = ExternalTaskSensor(
task_id="wait_for_search_dashboard",
external_dag_id="bqetl_search_dashboard",
external_task_id="search_derived__search_revenue_levers_daily__v1",
check_existence=True,
mode="reschedule",
allowed_states=ALLOWED_STATES,
failed_states=FAILED_STATES,
pool="DATA_ENG_EXTERNALTASKSENSOR",
)
for metric in FORECAST_METRICS_LIST:
# pass the search_forecasting configs to the KPI forecasting script
config_filename = f"{metric}.yaml"
script_path = os.path.join(".", "kpi_forecasting.py")
config_path = os.path.join("kpi_forecasting", "configs", config_filename)
forecast_task = GKEPodOperator(
task_id=f"search_forecasting_{metric}",
arguments=["python", script_path, "-c", config_path],
image=IMAGE,
)
wait_task_sensor >> forecast_task