public ListenableFuture addAll()

in og/shared/src/main/java/com/google/idea/blaze/common/artifact/BuildArtifactCacheDirectory.java [273:335]


  public ListenableFuture<?> addAll(
      ImmutableCollection<OutputArtifact> artifacts, Context<?> context) {
    // acquire the read lock to ensure that no clean is ongoing:
    long stamp = lock.readLock();
    try {
      synchronized (activeFetches) {
        Instant accessTime = Instant.now();
        // filter out any duplicate artifacts, and those for which there is already a fetch pending:
        final var allDistinctArtifacts =
            artifacts.stream()
                .filter(distinctBy(OutputArtifact::getDigest))
                .collect(toImmutableList());
        ImmutableList<OutputArtifact> allDistinctNotBeingFetchedArtifacts =
            allDistinctArtifacts.stream()
                .filter(a -> !activeFetches.containsKey(a.getDigest()))
                .collect(toImmutableList());
        // group them based on whether the artifact is already cached
        ImmutableListMultimap<Boolean, OutputArtifact> artifactsByPresence =
            Multimaps.index(allDistinctNotBeingFetchedArtifacts, this::contains);

        // Fetch absent artifacts
        ImmutableList<OutputArtifact> missingArtifactsToFetch = artifactsByPresence.get(false);
        long totalSize =
            missingArtifactsToFetch.stream()
                .collect(Collectors.summarizingLong(BlazeArtifact::getLength))
                .getSum();
        context.output(
            PrintOutput.output(
                "Fetching %d new artifacts (%,.2f MB) out of %d requested...",
                missingArtifactsToFetch.size(),
                (totalSize / (1000f * 1000)),
                allDistinctArtifacts.size()));

        ListenableFuture<?> fetch = startFetch(missingArtifactsToFetch, accessTime, context);
        context.addCancellationHandler(() -> fetch.cancel(false));

        // mark the  artifacts as being actively fetched. If they are requested in the meantime,
        // the future will be used to wait until the fetch is complete.
        // They are unmarked by the future listener above.
        markAsActive(missingArtifactsToFetch, fetch);
        fetch.addListener(
            () -> {
              context.output(PrintOutput.output("Downloading done."));
              unmarkAsActive(missingArtifactsToFetch);
            },
            directExecutor());

        // Update metadata for present artifacts
        ListenableFuture<?> metadataUpdate =
            Futures.allAsList(
                artifactsByPresence.get(true).stream()
                    .map(OutputArtifact::getDigest)
                    .map(digest -> executor.submit(() -> updateMetadata(digest, accessTime)))
                    .collect(toImmutableList()));
        context.addCancellationHandler(() -> metadataUpdate.cancel(false));

        needClean = true;
        return fetch;
      }
    } finally {
      lock.unlockRead(stamp);
    }
  }