private void findMissingBlobsOnWorker()

in src/main/java/build/buildfarm/instance/shard/ShardInstance.java [648:738]


  private void findMissingBlobsOnWorker(
      String requestId,
      Iterable<Digest> blobDigests,
      Deque<String> workers,
      ImmutableList.Builder<FindMissingResponseEntry> responses,
      int originalSize,
      Executor executor,
      SettableFuture<Iterable<Digest>> missingDigestsFuture,
      RequestMetadata requestMetadata) {
    String worker = workers.removeFirst();
    ListenableFuture<Iterable<Digest>> workerMissingBlobsFuture =
        workerStub(worker).findMissingBlobs(blobDigests, requestMetadata);

    Stopwatch stopwatch = Stopwatch.createStarted();
    addCallback(
        workerMissingBlobsFuture,
        new FutureCallback<Iterable<Digest>>() {
          @Override
          public void onSuccess(Iterable<Digest> missingDigests) {
            if (Iterables.isEmpty(missingDigests) || workers.isEmpty()) {
              missingDigestsFuture.set(missingDigests);
            } else {
              responses.add(
                  new FindMissingResponseEntry(
                      worker,
                      stopwatch.elapsed(MICROSECONDS),
                      null,
                      Iterables.size(missingDigests)));
              findMissingBlobsOnWorker(
                  requestId,
                  missingDigests,
                  workers,
                  responses,
                  originalSize,
                  executor,
                  missingDigestsFuture,
                  requestMetadata);
            }
          }

          @Override
          public void onFailure(Throwable t) {
            responses.add(
                new FindMissingResponseEntry(
                    worker, stopwatch.elapsed(MICROSECONDS), t, Iterables.size(blobDigests)));
            Status status = Status.fromThrowable(t);
            if (status.getCode() == Code.UNAVAILABLE || status.getCode() == Code.UNIMPLEMENTED) {
              removeMalfunctioningWorker(worker, t, "findMissingBlobs(" + requestId + ")");
            } else if (status.getCode() == Code.DEADLINE_EXCEEDED) {
              for (FindMissingResponseEntry response : responses.build()) {
                logger.log(
                    response.exception == null ? Level.WARNING : Level.SEVERE,
                    format(
                        "DEADLINE_EXCEEDED: findMissingBlobs(%s) %s: %d remaining of %d %dus%s",
                        requestId,
                        response.worker,
                        response.stillMissingAfter,
                        originalSize,
                        response.elapsedMicros,
                        response.exception != null ? ": " + response.exception.toString() : ""));
              }
              missingDigestsFuture.setException(status.asException());
            } else if (status.getCode() == Code.CANCELLED
                || Context.current().isCancelled()
                || !SHARD_IS_RETRIABLE.test(status)) {
              // do nothing further if we're cancelled
              missingDigestsFuture.setException(status.asException());
            } else {
              // why not, always
              workers.addLast(worker);
            }

            if (!missingDigestsFuture.isDone()) {
              if (workers.isEmpty()) {
                missingDigestsFuture.set(blobDigests);
              } else {
                findMissingBlobsOnWorker(
                    requestId,
                    blobDigests,
                    workers,
                    responses,
                    originalSize,
                    executor,
                    missingDigestsFuture,
                    requestMetadata);
              }
            }
          }
        },
        executor);
  }