dags/search_forecasting.py (55 lines of code) (raw):

""" See [kpi-forecasting in the docker-etl repository](https://github.com/mozilla/docker-etl/blob/main/jobs/kpi-forecasting). This DAG runs the search forecasts for the DAU, search count and ad clicks metrics . This DAG is high priority for week 1 of the month and low priority otherwise. """ import os from datetime import datetime, timedelta from airflow import DAG from airflow.sensors.external_task import ExternalTaskSensor from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.tags import Tag default_args = { "owner": "jsnyder@mozilla.com", "email": [ "jsnyder@mozilla.com", "mbowerman@mozilla.com", "telemetry-alerts@mozilla.com", ], "depends_on_past": False, "start_date": datetime(2024, 7, 6), "email_on_failure": True, "email_on_retry": False, "retries": 2, "retry_delay": timedelta(minutes=30), } TAGS = [Tag.ImpactTier.tier_1] IMAGE = "gcr.io/moz-fx-data-airflow-prod-88e0/kpi-forecasting_docker_etl:latest" FORECAST_METRICS_LIST = [ "search_forecasting_daily_active_users", "search_forecasting_search_count", "search_forecasting_ad_clicks", ] # schedule to run after bqetl_search_dashboard completes with DAG( "search_forecasting", default_args=default_args, schedule_interval="30 5 7 * *", doc_md=__doc__, tags=TAGS, ) as dag: # all the search forecasting metrics come from the search_revenue_levers_daily # table which is run in the bqetl_search_dashboard dag # as the search_derived__search_revenue_levers_daily__v1 task # see: https://workflow.telemetry.mozilla.org/dags/bqetl_search_dashboard/grid wait_task_sensor = ExternalTaskSensor( task_id="wait_for_search_dashboard", external_dag_id="bqetl_search_dashboard", external_task_id="search_derived__search_revenue_levers_daily__v1", check_existence=True, mode="reschedule", allowed_states=ALLOWED_STATES, failed_states=FAILED_STATES, pool="DATA_ENG_EXTERNALTASKSENSOR", ) for metric in FORECAST_METRICS_LIST: # pass the search_forecasting configs to the KPI forecasting script config_filename = f"{metric}.yaml" script_path = os.path.join(".", "kpi_forecasting.py") config_path = os.path.join("kpi_forecasting", "configs", config_filename) forecast_task = GKEPodOperator( task_id=f"search_forecasting_{metric}", arguments=["python", script_path, "-c", config_path], image=IMAGE, ) wait_task_sensor >> forecast_task