in og/shared/src/main/java/com/google/idea/blaze/common/artifact/BuildArtifactCacheDirectory.java [273:335]
public ListenableFuture<?> addAll(
ImmutableCollection<OutputArtifact> artifacts, Context<?> context) {
// acquire the read lock to ensure that no clean is ongoing:
long stamp = lock.readLock();
try {
synchronized (activeFetches) {
Instant accessTime = Instant.now();
// filter out any duplicate artifacts, and those for which there is already a fetch pending:
final var allDistinctArtifacts =
artifacts.stream()
.filter(distinctBy(OutputArtifact::getDigest))
.collect(toImmutableList());
ImmutableList<OutputArtifact> allDistinctNotBeingFetchedArtifacts =
allDistinctArtifacts.stream()
.filter(a -> !activeFetches.containsKey(a.getDigest()))
.collect(toImmutableList());
// group them based on whether the artifact is already cached
ImmutableListMultimap<Boolean, OutputArtifact> artifactsByPresence =
Multimaps.index(allDistinctNotBeingFetchedArtifacts, this::contains);
// Fetch absent artifacts
ImmutableList<OutputArtifact> missingArtifactsToFetch = artifactsByPresence.get(false);
long totalSize =
missingArtifactsToFetch.stream()
.collect(Collectors.summarizingLong(BlazeArtifact::getLength))
.getSum();
context.output(
PrintOutput.output(
"Fetching %d new artifacts (%,.2f MB) out of %d requested...",
missingArtifactsToFetch.size(),
(totalSize / (1000f * 1000)),
allDistinctArtifacts.size()));
ListenableFuture<?> fetch = startFetch(missingArtifactsToFetch, accessTime, context);
context.addCancellationHandler(() -> fetch.cancel(false));
// mark the artifacts as being actively fetched. If they are requested in the meantime,
// the future will be used to wait until the fetch is complete.
// They are unmarked by the future listener above.
markAsActive(missingArtifactsToFetch, fetch);
fetch.addListener(
() -> {
context.output(PrintOutput.output("Downloading done."));
unmarkAsActive(missingArtifactsToFetch);
},
directExecutor());
// Update metadata for present artifacts
ListenableFuture<?> metadataUpdate =
Futures.allAsList(
artifactsByPresence.get(true).stream()
.map(OutputArtifact::getDigest)
.map(digest -> executor.submit(() -> updateMetadata(digest, accessTime)))
.collect(toImmutableList()));
context.addCancellationHandler(() -> metadataUpdate.cancel(false));
needClean = true;
return fetch;
}
} finally {
lock.unlockRead(stamp);
}
}