in libs/libcommon/src/libcommon/orchestrator.py [0:0]
def _get_cache_status(self) -> CacheStatus:
cache_status = CacheStatus()
processing_steps = (
self.processing_graph.get_first_processing_steps()
if self.only_first_processing_steps
else self.processing_graph.get_topologically_ordered_processing_steps()
)
for processing_step in processing_steps:
# Every step can have one or multiple artifacts, for example config-level steps have one artifact per
# config
artifact_states = self._get_artifact_states_for_step(processing_step)
for artifact_state in artifact_states:
# any of the parents is more recent?
if any(
artifact_state.cache_state.is_older_than(parent_artifact_state.cache_state)
for parent_step in self.processing_graph.get_parents(processing_step.name)
for parent_artifact_state in self._get_artifact_states_for_step(
processing_step=parent_step,
config=artifact_state.config,
split=artifact_state.split,
)
):
cache_status.cache_is_outdated_by_parent[artifact_state.id] = artifact_state
continue
# is empty?
if artifact_state.cache_state.is_empty():
cache_status.cache_is_empty[artifact_state.id] = artifact_state
continue
# is an error that can be retried?
if artifact_state.cache_state.is_error_to_retry():
cache_status.cache_is_error_to_retry[artifact_state.id] = artifact_state
continue
# was created with an obsolete version of the job runner?
if artifact_state.cache_state.is_job_runner_obsolete():
cache_status.cache_is_job_runner_obsolete[artifact_state.id] = artifact_state
continue
# has a different git revision from the dataset current revision?
if artifact_state.cache_state.is_git_revision_different_from(self.revision):
cache_status.cache_has_different_git_revision[artifact_state.id] = artifact_state
continue
# ok
cache_status.up_to_date[artifact_state.id] = artifact_state
return cache_status