def get_dag()

in airflow-core/src/airflow/models/dagbag.py [0:0]


    def get_dag(self, dag_id, session: Session = None):
        """
        Get the DAG out of the dictionary, and refreshes it if expired.

        :param dag_id: DAG ID
        """
        # Avoid circular import
        from airflow.models.dag import DagModel

        if self.read_dags_from_db:
            # Import here so that serialized dag is only imported when serialization is enabled
            from airflow.models.serialized_dag import SerializedDagModel

            if dag_id not in self.dags:
                # Load from DB if not (yet) in the bag
                self._add_dag_from_db(dag_id=dag_id, session=session)
                return self.dags.get(dag_id)

            # If DAG is in the DagBag, check the following
            # 1. if time has come to check if DAG is updated (controlled by min_serialized_dag_fetch_secs)
            # 2. check the last_updated and hash columns in SerializedDag table to see if
            # Serialized DAG is updated
            # 3. if (2) is yes, fetch the Serialized DAG.
            # 4. if (2) returns None (i.e. Serialized DAG is deleted), remove dag from dagbag
            # if it exists and return None.
            min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
            if (
                dag_id in self.dags_last_fetched
                and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
            ):
                sd_latest_version_and_updated_datetime = (
                    SerializedDagModel.get_latest_version_hash_and_updated_datetime(
                        dag_id=dag_id, session=session
                    )
                )
                if not sd_latest_version_and_updated_datetime:
                    self.log.warning("Serialized DAG %s no longer exists", dag_id)
                    del self.dags[dag_id]
                    del self.dags_last_fetched[dag_id]
                    del self.dags_hash[dag_id]
                    return None

                sd_latest_version, sd_last_updated_datetime = sd_latest_version_and_updated_datetime

                if (
                    sd_last_updated_datetime > self.dags_last_fetched[dag_id]
                    or sd_latest_version != self.dags_hash[dag_id]
                ):
                    self._add_dag_from_db(dag_id=dag_id, session=session)

            return self.dags.get(dag_id)

        # If asking for a known subdag, we want to refresh the parent
        dag = None
        if dag_id in self.dags:
            dag = self.dags[dag_id]

        # If DAG Model is absent, we can't check last_expired property. Is the DAG not yet synchronized?
        orm_dag = DagModel.get_current(dag_id, session=session)
        if not orm_dag:
            return self.dags.get(dag_id)

        is_missing = dag_id not in self.dags
        is_expired = (
            orm_dag.last_expired and dag and dag.last_loaded and dag.last_loaded < orm_dag.last_expired
        )
        if is_expired:
            # Remove associated dags so we can re-add them.
            self.dags.pop(dag_id, None)
        if is_missing or is_expired:
            # Reprocess source file.
            found_dags = self.process_file(
                filepath=correct_maybe_zipped(orm_dag.fileloc), only_if_updated=False
            )

            # If the source file no longer exports `dag_id`, delete it from self.dags
            if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]:
                return self.dags[dag_id]
            if dag_id in self.dags:
                del self.dags[dag_id]
        return self.dags.get(dag_id)