def _refresh_dag_bundles()

in airflow-core/src/airflow/dag_processing/manager.py [0:0]


    def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
        """Refresh DAG bundles, if required."""
        now = timezone.utcnow()

        # we don't need to check if it's time to refresh every loop - that is way too often
        next_check = self._bundles_last_refreshed + self.bundle_refresh_check_interval
        now_seconds = time.monotonic()
        if now_seconds < next_check and not self._force_refresh_bundles:
            self.log.debug(
                "Not time to check if DAG Bundles need refreshed yet - skipping. Next check in %.2f seconds",
                next_check - now_seconds,
            )
            return

        self._bundles_last_refreshed = now_seconds

        for bundle in self._dag_bundles:
            # TODO: AIP-66 handle errors in the case of incomplete cloning? And test this.
            #  What if the cloning/refreshing took too long(longer than the dag processor timeout)
            if not bundle.is_initialized:
                try:
                    bundle.initialize()
                except AirflowException as e:
                    self.log.exception("Error initializing bundle %s: %s", bundle.name, e)
                    continue
            # TODO: AIP-66 test to make sure we get a fresh record from the db and it's not cached
            with create_session() as session:
                bundle_model: DagBundleModel = session.get(DagBundleModel, bundle.name)
                elapsed_time_since_refresh = (
                    now - (bundle_model.last_refreshed or timezone.utc_epoch())
                ).total_seconds()
                if bundle.supports_versioning:
                    # we will also check the version of the bundle to see if another DAG processor has seen
                    # a new version
                    pre_refresh_version = (
                        self._bundle_versions.get(bundle.name) or bundle.get_current_version()
                    )
                    current_version_matches_db = pre_refresh_version == bundle_model.version
                else:
                    # With no versioning, it always "matches"
                    current_version_matches_db = True

                previously_seen = bundle.name in self._bundle_versions
                if (
                    elapsed_time_since_refresh < bundle.refresh_interval
                    and current_version_matches_db
                    and previously_seen
                    and bundle.name not in self._force_refresh_bundles
                ):
                    self.log.info("Not time to refresh bundle %s", bundle.name)
                    continue

                self.log.info("Refreshing bundle %s", bundle.name)

                try:
                    bundle.refresh()
                except Exception:
                    self.log.exception("Error refreshing bundle %s", bundle.name)
                    continue

                bundle_model.last_refreshed = now
                self._force_refresh_bundles.discard(bundle.name)

                if bundle.supports_versioning:
                    # We can short-circuit the rest of this if (1) bundle was seen before by
                    # this dag processor and (2) the version of the bundle did not change
                    # after refreshing it
                    version_after_refresh = bundle.get_current_version()
                    if previously_seen and pre_refresh_version == version_after_refresh:
                        self.log.debug(
                            "Bundle %s version not changed after refresh: %s",
                            bundle.name,
                            version_after_refresh,
                        )
                        continue

                    bundle_model.version = version_after_refresh

                    self.log.info(
                        "Version changed for %s, new version: %s", bundle.name, version_after_refresh
                    )
                else:
                    version_after_refresh = None

            self._bundle_versions[bundle.name] = version_after_refresh

            found_files = {
                DagFileInfo(rel_path=p, bundle_name=bundle.name, bundle_path=bundle.path)
                for p in self._find_files_in_bundle(bundle)
            }

            known_files[bundle.name] = found_files
            self.handle_removed_files(known_files=known_files)

            self.deactivate_deleted_dags(bundle_name=bundle.name, present=found_files)
            self.clear_orphaned_import_errors(
                bundle_name=bundle.name,
                observed_filelocs={str(x.absolute_path) for x in found_files},  # todo: make relative
            )