public InputStream newInput()

in src/main/java/build/buildfarm/instance/shard/RemoteInputStreamFactory.java [150:276]


  public InputStream newInput(
      Digest blobDigest,
      long offset,
      long deadlineAfter,
      TimeUnit deadlineAfterUnits,
      RequestMetadata requestMetadata)
      throws IOException, InterruptedException {
    Set<String> remoteWorkers;
    Set<String> locationSet;
    try {
      Set<String> workers = backplane.getWorkers();
      if (publicName == null) {
        remoteWorkers = workers;
      } else {
        synchronized (workers) {
          remoteWorkers = Sets.difference(workers, ImmutableSet.of(publicName)).immutableCopy();
        }
      }
      locationSet =
          Sets.newHashSet(Sets.intersection(backplane.getBlobLocationSet(blobDigest), workers));
    } catch (IOException e) {
      throw Status.fromThrowable(e).asRuntimeException();
    }

    if (publicName != null && locationSet.remove(publicName)) {
      backplane.removeBlobLocation(blobDigest, publicName);
    }
    List<String> workersList = new ArrayList<>(locationSet);
    boolean emptyWorkerList = workersList.isEmpty();
    final ListenableFuture<List<String>> populatedWorkerListFuture;
    if (emptyWorkerList) {
      populatedWorkerListFuture =
          transform(
              correctMissingBlob(
                  backplane,
                  remoteWorkers,
                  locationSet,
                  this::workerStub,
                  blobDigest,
                  newDirectExecutorService(),
                  requestMetadata),
              (foundOnWorkers) -> {
                Iterables.addAll(workersList, foundOnWorkers);
                return workersList;
              },
              directExecutor());
    } else {
      populatedWorkerListFuture = immediateFuture(workersList);
    }
    SettableFuture<InputStream> inputStreamFuture = SettableFuture.create();
    addCallback(
        populatedWorkerListFuture,
        new WorkersCallback(rand) {
          boolean triedCheck = emptyWorkerList;

          @Override
          public void onQueue(Deque<String> workers) {
            Set<String> locationSet = Sets.newHashSet(workers);
            boolean complete = false;
            while (!complete && !workers.isEmpty()) {
              try {
                inputStreamFuture.set(
                    fetchBlobFromRemoteWorker(
                        blobDigest,
                        workers,
                        offset,
                        deadlineAfter,
                        deadlineAfterUnits,
                        requestMetadata));
                complete = true;
              } catch (IOException e) {
                if (workers.isEmpty()) {
                  if (triedCheck) {
                    onFailure(e);
                    return;
                  }
                  triedCheck = true;

                  workersList.clear();
                  ListenableFuture<List<String>> checkedWorkerListFuture =
                      transform(
                          correctMissingBlob(
                              backplane,
                              remoteWorkers,
                              locationSet,
                              RemoteInputStreamFactory.this::workerStub,
                              blobDigest,
                              newDirectExecutorService(),
                              requestMetadata),
                          (foundOnWorkers) -> {
                            Iterables.addAll(workersList, foundOnWorkers);
                            return workersList;
                          },
                          directExecutor());
                  addCallback(checkedWorkerListFuture, this, directExecutor());
                  complete = true;
                }
              } catch (InterruptedException e) {
                complete = true;
                onFailure(e);
              }
            }
          }

          @SuppressWarnings("NullableProblems")
          @Override
          public void onFailure(Throwable t) {
            Status status = Status.fromThrowable(t);
            if (status.getCode() == Code.NOT_FOUND) {
              inputStreamFuture.setException(
                  new NoSuchFileException(DigestUtil.toString(blobDigest)));
            } else {
              inputStreamFuture.setException(t);
            }
          }
        },
        directExecutor());
    try {
      return inputStreamFuture.get();
    } catch (ExecutionException e) {
      Throwable cause = e.getCause();
      Throwables.throwIfUnchecked(cause);
      Throwables.throwIfInstanceOf(cause, IOException.class);
      Throwables.throwIfInstanceOf(cause, InterruptedException.class);
      throw new UncheckedExecutionException(cause);
    }
  }