in airflow-core/src/airflow/dag_processing/manager.py [0:0]
def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
"""Refresh DAG bundles, if required."""
now = timezone.utcnow()
# we don't need to check if it's time to refresh every loop - that is way too often
next_check = self._bundles_last_refreshed + self.bundle_refresh_check_interval
now_seconds = time.monotonic()
if now_seconds < next_check and not self._force_refresh_bundles:
self.log.debug(
"Not time to check if DAG Bundles need refreshed yet - skipping. Next check in %.2f seconds",
next_check - now_seconds,
)
return
self._bundles_last_refreshed = now_seconds
for bundle in self._dag_bundles:
# TODO: AIP-66 handle errors in the case of incomplete cloning? And test this.
# What if the cloning/refreshing took too long(longer than the dag processor timeout)
if not bundle.is_initialized:
try:
bundle.initialize()
except AirflowException as e:
self.log.exception("Error initializing bundle %s: %s", bundle.name, e)
continue
# TODO: AIP-66 test to make sure we get a fresh record from the db and it's not cached
with create_session() as session:
bundle_model: DagBundleModel = session.get(DagBundleModel, bundle.name)
elapsed_time_since_refresh = (
now - (bundle_model.last_refreshed or timezone.utc_epoch())
).total_seconds()
if bundle.supports_versioning:
# we will also check the version of the bundle to see if another DAG processor has seen
# a new version
pre_refresh_version = (
self._bundle_versions.get(bundle.name) or bundle.get_current_version()
)
current_version_matches_db = pre_refresh_version == bundle_model.version
else:
# With no versioning, it always "matches"
current_version_matches_db = True
previously_seen = bundle.name in self._bundle_versions
if (
elapsed_time_since_refresh < bundle.refresh_interval
and current_version_matches_db
and previously_seen
and bundle.name not in self._force_refresh_bundles
):
self.log.info("Not time to refresh bundle %s", bundle.name)
continue
self.log.info("Refreshing bundle %s", bundle.name)
try:
bundle.refresh()
except Exception:
self.log.exception("Error refreshing bundle %s", bundle.name)
continue
bundle_model.last_refreshed = now
self._force_refresh_bundles.discard(bundle.name)
if bundle.supports_versioning:
# We can short-circuit the rest of this if (1) bundle was seen before by
# this dag processor and (2) the version of the bundle did not change
# after refreshing it
version_after_refresh = bundle.get_current_version()
if previously_seen and pre_refresh_version == version_after_refresh:
self.log.debug(
"Bundle %s version not changed after refresh: %s",
bundle.name,
version_after_refresh,
)
continue
bundle_model.version = version_after_refresh
self.log.info(
"Version changed for %s, new version: %s", bundle.name, version_after_refresh
)
else:
version_after_refresh = None
self._bundle_versions[bundle.name] = version_after_refresh
found_files = {
DagFileInfo(rel_path=p, bundle_name=bundle.name, bundle_path=bundle.path)
for p in self._find_files_in_bundle(bundle)
}
known_files[bundle.name] = found_files
self.handle_removed_files(known_files=known_files)
self.deactivate_deleted_dags(bundle_name=bundle.name, present=found_files)
self.clear_orphaned_import_errors(
bundle_name=bundle.name,
observed_filelocs={str(x.absolute_path) for x in found_files}, # todo: make relative
)