in src/buildstream/_stream.py [0:0]
def query_cache(self, elements, *, sources_of_cached_elements=False, only_sources=False, need_state=False):
# It doesn't make sense to combine these flags
assert not sources_of_cached_elements or not only_sources
with self._context.messenger.simple_task("Query cache", silent_nested=True) as task:
if need_state:
# Enqueue complete build plan as this is required to determine `buildable` status.
plan = list(_pipeline.dependencies(elements, _Scope.ALL))
else:
plan = elements
if self._context.remote_cache_spec:
# Parallelize cache queries if a remote cache is configured
self._reset()
self._add_queue(
CacheQueryQueue(
self._scheduler, sources=only_sources, sources_if_cached=sources_of_cached_elements
),
track=True,
)
self._enqueue_plan(plan)
self._run()
else:
task.set_maximum_progress(len(plan))
for element in plan:
if element._can_query_cache():
# Cache status already available.
# This is the case for artifact elements, which load the
# artifact early on.
pass
elif not only_sources and element._get_cache_key(strength=_KeyStrength.WEAK):
element._load_artifact(pull=False)
if (
sources_of_cached_elements
or not element._can_query_cache()
or not element._cached_success()
):
element._query_source_cache()
if not element._pull_pending():
element._load_artifact_done()
elif element._has_all_sources_resolved():
element._query_source_cache()
task.add_current_progress()