in airflow-core/src/airflow/models/dagbag.py [0:0]
def get_dag(self, dag_id, session: Session = None):
"""
Get the DAG out of the dictionary, and refreshes it if expired.
:param dag_id: DAG ID
"""
# Avoid circular import
from airflow.models.dag import DagModel
if self.read_dags_from_db:
# Import here so that serialized dag is only imported when serialization is enabled
from airflow.models.serialized_dag import SerializedDagModel
if dag_id not in self.dags:
# Load from DB if not (yet) in the bag
self._add_dag_from_db(dag_id=dag_id, session=session)
return self.dags.get(dag_id)
# If DAG is in the DagBag, check the following
# 1. if time has come to check if DAG is updated (controlled by min_serialized_dag_fetch_secs)
# 2. check the last_updated and hash columns in SerializedDag table to see if
# Serialized DAG is updated
# 3. if (2) is yes, fetch the Serialized DAG.
# 4. if (2) returns None (i.e. Serialized DAG is deleted), remove dag from dagbag
# if it exists and return None.
min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
if (
dag_id in self.dags_last_fetched
and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
):
sd_latest_version_and_updated_datetime = (
SerializedDagModel.get_latest_version_hash_and_updated_datetime(
dag_id=dag_id, session=session
)
)
if not sd_latest_version_and_updated_datetime:
self.log.warning("Serialized DAG %s no longer exists", dag_id)
del self.dags[dag_id]
del self.dags_last_fetched[dag_id]
del self.dags_hash[dag_id]
return None
sd_latest_version, sd_last_updated_datetime = sd_latest_version_and_updated_datetime
if (
sd_last_updated_datetime > self.dags_last_fetched[dag_id]
or sd_latest_version != self.dags_hash[dag_id]
):
self._add_dag_from_db(dag_id=dag_id, session=session)
return self.dags.get(dag_id)
# If asking for a known subdag, we want to refresh the parent
dag = None
if dag_id in self.dags:
dag = self.dags[dag_id]
# If DAG Model is absent, we can't check last_expired property. Is the DAG not yet synchronized?
orm_dag = DagModel.get_current(dag_id, session=session)
if not orm_dag:
return self.dags.get(dag_id)
is_missing = dag_id not in self.dags
is_expired = (
orm_dag.last_expired and dag and dag.last_loaded and dag.last_loaded < orm_dag.last_expired
)
if is_expired:
# Remove associated dags so we can re-add them.
self.dags.pop(dag_id, None)
if is_missing or is_expired:
# Reprocess source file.
found_dags = self.process_file(
filepath=correct_maybe_zipped(orm_dag.fileloc), only_if_updated=False
)
# If the source file no longer exports `dag_id`, delete it from self.dags
if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]:
return self.dags[dag_id]
if dag_id in self.dags:
del self.dags[dag_id]
return self.dags.get(dag_id)