def check_raw_if_deployed()

in src/cdc_dag_generator/templates/airflow_dag_raw_to_cdc.py [0:0]


def check_raw_if_deployed(session=None, **kwargs):
    del kwargs
    now = Pendulum.now(UTC)

    active_runs = DagRun.find(dag_id=_RAW_DAG_ID, state=State.RUNNING)
    if active_runs and len(active_runs) > 0:
        logging.info("Rescheduling to wait for an active run of the Raw DAG.")
        raise AirflowRescheduleException(now + timedelta(
            minutes=_RAW_WAITING_TIMEOUT_MINUTES))

    complete_runs: list[DagRun] = DagRun.find(dag_id=_RAW_DAG_ID,
                                              state=State.SUCCESS)
    run_raw_now = True
    if complete_runs and len(complete_runs) > 0:
        if (now - complete_runs[-1].execution_date
           ).total_hours() < _RAW_AGE_HOURS_MAX:
            run_raw_now = False
            logging.info("Found a recent run of the Raw DAG.")

    if run_raw_now:
        bag = DagBag()
        raw_dag: DAG = bag.get_dag(_RAW_DAG_ID)
        if not raw_dag:
            logging.info("No Raw DAG %s found.", _RAW_DAG_ID)
            return
        logging.info("Starting a new run of the Raw DAG")
        trigger_dag(
                dag_id=_RAW_DAG_ID,
                run_id=f"forced__{now.isoformat()}",
                conf=None,
                execution_date=timezone.utcnow(),
                replace_microseconds=False,
            )
        logging.info("Rescheduling to wait for a new run of the Raw DAG.")
        raise AirflowRescheduleException(now + timedelta(
            minutes=_RAW_WAITING_TIMEOUT_MINUTES))