dags/dbt_daily.py (50 lines of code) (raw):
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.sensors.external_task import ExternalTaskSensor
from utils.constants import ALLOWED_STATES, FAILED_STATES
from utils.tags import Tag
DOCS = """\
# DBT Daily
This triggers jobs configured in dbt Cloud to run daily scheduled models that depend
on other Airflow jobs.
*Triage notes*
DBT accounts are limited at the moment, so it might not be possible to get more visibility
into failing jobs at the moment.
"""
default_args = {
"owner": "ascholtz@mozilla.com",
"depends_on_past": False,
"start_date": datetime(2024, 7, 31),
"email_on_failure": True,
"email_on_retry": True,
"retries": 2,
"retry_delay": timedelta(minutes=30),
"dbt_cloud_conn_id": "dbt_cloud",
"account_id": "{{ var.value.dbt_account_id }}"
}
tags = [
Tag.ImpactTier.tier_3,
Tag.Triage.no_triage,
]
with DAG(
"dbt_daily",
doc_md=DOCS,
max_active_runs=1,
default_args=default_args,
schedule_interval="0 4 * * 0",
tags=tags,
) as dag:
wait_for_copy_deduplicate = ExternalTaskSensor(
task_id="wait_for_copy_deduplicate",
external_dag_id="copy_deduplicate",
external_task_id="copy_deduplicate_all",
execution_delta=timedelta(hours=3),
mode="reschedule",
allowed_states=ALLOWED_STATES,
failed_states=FAILED_STATES,
pool="DATA_ENG_EXTERNALTASKSENSOR",
email_on_retry=False,
dag=dag,
)
# runs dbt jobs tagged with "refresh_daily" and "scheduled_in_airflow"
trigger_dbt_daily_cloud_run_job = DbtCloudRunJobOperator(
task_id="trigger_dbt_daily_cloud_run_job",
job_id=684764,
check_interval=10,
timeout=300,
)
wait_for_copy_deduplicate >> trigger_dbt_daily_cloud_run_job