dags/kpi_forecasting.py (72 lines of code) (raw):

""" See [kpi-forecasting in the docker-etl repository](https://github.com/mozilla/docker-etl/blob/main/jobs/kpi-forecasting). This DAG runs the forecast Desktop DAU and Mobile DAU. The output powers KPI dashboards and monthly revenue forecasts. This DAG is high priority for week 1 of the month and low priority otherwise. """ import os from collections import namedtuple from datetime import datetime, timedelta from airflow import DAG from airflow.sensors.external_task import ExternalTaskSensor from operators.gcp_container_operator import GKEPodOperator from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.tags import Tag default_args = { "owner": "bochocki@mozilla.com", "email": ["bochocki@mozilla.com", "jsilverman@mozilla.com"], "depends_on_past": False, "start_date": datetime(2022, 3, 28), "email_on_failure": True, "email_on_retry": True, "retries": 2, "retry_delay": timedelta(minutes=30), } TAGS = [Tag.ImpactTier.tier_1] IMAGE = "gcr.io/moz-fx-data-airflow-prod-88e0/kpi-forecasting_docker_etl:latest" Config = namedtuple("Config", ["filename", "wait_dag", "wait_tasks"]) CONFIGS = { "dau_desktop": Config( "dau_desktop.yaml", "bqetl_analytics_aggregations", [ "firefox_desktop_active_users_aggregates_v4", ], ), "dau_mobile": Config( "dau_mobile.yaml", "bqetl_analytics_aggregations", [ "firefox_ios_active_users_aggregates_v3", "fenix_active_users_aggregates_v3", "focus_android_active_users_aggregates_v3", "focus_ios_active_users_aggregates_v3", ], ), } with DAG( "kpi_forecasting", default_args=default_args, schedule_interval="0 5 * * *", doc_md=__doc__, tags=TAGS, ) as dag: for id, config in CONFIGS.items(): script_path = os.path.join(".", "kpi_forecasting.py") config_path = os.path.join("kpi_forecasting", "configs", config.filename) wait_tasks = config.wait_tasks if not isinstance(config.wait_tasks, list): wait_tasks = [wait_tasks] forecast_task = GKEPodOperator( task_id=f"kpi_forecasting_{id}", arguments=["python", script_path, "-c", config_path], image=IMAGE, dag=dag, ) for wait_task in wait_tasks: wait_task_sensor = ExternalTaskSensor( task_id=f"wait_for_{wait_task}", external_dag_id=config.wait_dag, external_task_id=wait_task, execution_delta=timedelta(minutes=45), check_existence=True, mode="reschedule", allowed_states=ALLOWED_STATES, failed_states=FAILED_STATES, pool="DATA_ENG_EXTERNALTASKSENSOR", ) wait_task_sensor >> forecast_task