dags/dbt_daily.py (50 lines of code) (raw):

from datetime import datetime, timedelta from airflow import DAG from airflow.models import Variable from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator from airflow.sensors.external_task import ExternalTaskSensor from utils.constants import ALLOWED_STATES, FAILED_STATES from utils.tags import Tag DOCS = """\ # DBT Daily This triggers jobs configured in dbt Cloud to run daily scheduled models that depend on other Airflow jobs. *Triage notes* DBT accounts are limited at the moment, so it might not be possible to get more visibility into failing jobs at the moment. """ default_args = { "owner": "ascholtz@mozilla.com", "depends_on_past": False, "start_date": datetime(2024, 7, 31), "email_on_failure": True, "email_on_retry": True, "retries": 2, "retry_delay": timedelta(minutes=30), "dbt_cloud_conn_id": "dbt_cloud", "account_id": "{{ var.value.dbt_account_id }}" } tags = [ Tag.ImpactTier.tier_3, Tag.Triage.no_triage, ] with DAG( "dbt_daily", doc_md=DOCS, max_active_runs=1, default_args=default_args, schedule_interval="0 4 * * 0", tags=tags, ) as dag: wait_for_copy_deduplicate = ExternalTaskSensor( task_id="wait_for_copy_deduplicate", external_dag_id="copy_deduplicate", external_task_id="copy_deduplicate_all", execution_delta=timedelta(hours=3), mode="reschedule", allowed_states=ALLOWED_STATES, failed_states=FAILED_STATES, pool="DATA_ENG_EXTERNALTASKSENSOR", email_on_retry=False, dag=dag, ) # runs dbt jobs tagged with "refresh_daily" and "scheduled_in_airflow" trigger_dbt_daily_cloud_run_job = DbtCloudRunJobOperator( task_id="trigger_dbt_daily_cloud_run_job", job_id=684764, check_interval=10, timeout=300, ) wait_for_copy_deduplicate >> trigger_dbt_daily_cloud_run_job