in src/cdc_dag_generator/templates/airflow_dag_raw_to_cdc.py [0:0]
def check_raw_if_deployed(session=None, **kwargs):
del kwargs
now = Pendulum.now(UTC)
active_runs = DagRun.find(dag_id=_RAW_DAG_ID, state=State.RUNNING)
if active_runs and len(active_runs) > 0:
logging.info("Rescheduling to wait for an active run of the Raw DAG.")
raise AirflowRescheduleException(now + timedelta(
minutes=_RAW_WAITING_TIMEOUT_MINUTES))
complete_runs: list[DagRun] = DagRun.find(dag_id=_RAW_DAG_ID,
state=State.SUCCESS)
run_raw_now = True
if complete_runs and len(complete_runs) > 0:
if (now - complete_runs[-1].execution_date
).total_hours() < _RAW_AGE_HOURS_MAX:
run_raw_now = False
logging.info("Found a recent run of the Raw DAG.")
if run_raw_now:
bag = DagBag()
raw_dag: DAG = bag.get_dag(_RAW_DAG_ID)
if not raw_dag:
logging.info("No Raw DAG %s found.", _RAW_DAG_ID)
return
logging.info("Starting a new run of the Raw DAG")
trigger_dag(
dag_id=_RAW_DAG_ID,
run_id=f"forced__{now.isoformat()}",
conf=None,
execution_date=timezone.utcnow(),
replace_microseconds=False,
)
logging.info("Rescheduling to wait for a new run of the Raw DAG.")
raise AirflowRescheduleException(now + timedelta(
minutes=_RAW_WAITING_TIMEOUT_MINUTES))