public static InputStream newInput()

in src/main/java/build/buildfarm/common/grpc/ByteStreamHelper.java [47:153]


  public static InputStream newInput(
      String resourceName,
      long offset,
      Supplier<ByteStreamStub> bsStubSupplier,
      Supplier<Backoff> backoffSupplier,
      Predicate<Status> isRetriable,
      @Nullable ListeningScheduledExecutorService retryService)
      throws IOException {
    ReadRequest request =
        ReadRequest.newBuilder().setResourceName(resourceName).setReadOffset(offset).build();
    BlockingQueue<ByteString> queue = new ArrayBlockingQueue<>(1);
    ByteStringQueueInputStream inputStream = new ByteStringQueueInputStream(queue);
    // this interface needs to operate similar to open, where it
    // throws an exception on creation. We will need to wait around
    // for the response to come back in order to supply the stream or
    // throw the exception it receives
    SettableFuture<InputStream> streamReadyFuture = SettableFuture.create();
    StreamObserver<ReadResponse> responseObserver =
        new StreamObserver<ReadResponse>() {
          long requestOffset = offset;
          long currentOffset = offset;
          Backoff backoff = backoffSupplier.get();

          @Override
          public void onNext(ReadResponse response) {
            streamReadyFuture.set(inputStream);
            ByteString data = response.getData();
            try {
              queue.put(data);
              currentOffset += data.size();
            } catch (InterruptedException e) {
              // cancel context?
              inputStream.setException(e);
            }
          }

          private void retryRequest() {
            requestOffset = currentOffset;
            bsStubSupplier
                .get()
                .read(request.toBuilder().setReadOffset(requestOffset).build(), this);
          }

          @Override
          public void onError(Throwable t) {
            Status status = Status.fromThrowable(t);
            long nextDelayMillis = backoff.nextDelayMillis();
            if (status.getCode() == Status.Code.DEADLINE_EXCEEDED
                && currentOffset != requestOffset) {
              backoff = backoffSupplier.get();
              retryRequest();
            } else if (retryService == null || nextDelayMillis < 0 || !isRetriable.test(status)) {
              streamReadyFuture.setException(t);
              inputStream.setException(t);
            } else {
              try {
                ListenableFuture<?> schedulingResult =
                    retryService.schedule(
                        this::retryRequest, nextDelayMillis, TimeUnit.MILLISECONDS);
                schedulingResult.addListener(
                    () -> {
                      try {
                        schedulingResult.get();
                      } catch (ExecutionException e) {
                        inputStream.setException(e.getCause());
                      } catch (InterruptedException e) {
                        inputStream.setException(e);
                      }
                    },
                    MoreExecutors.directExecutor());
              } catch (RejectedExecutionException e) {
                inputStream.setException(e);
              }
            }
          }

          @Override
          public void onCompleted() {
            inputStream.setCompleted();
          }
        };
    bsStubSupplier.get().read(request, responseObserver);
    // the interface is technically blocking (not aio) and is
    // perfectly reasonable to be used as a wait point
    try {
      return streamReadyFuture.get();
    } catch (InterruptedException e) {
      try {
        inputStream.close();
      } catch (RuntimeException closeEx) {
        e.addSuppressed(e);
      }
      IOException ioEx = new ClosedByInterruptException();
      ioEx.addSuppressed(e);
      throw ioEx;
    } catch (ExecutionException e) {
      Throwable cause = e.getCause();
      Status status = Status.fromThrowable(cause);
      if (status.getCode() == Status.Code.NOT_FOUND) {
        IOException ioEx = new NoSuchFileException(resourceName);
        ioEx.addSuppressed(cause);
        throw ioEx;
      }
      Throwables.throwIfInstanceOf(cause, IOException.class);
      throw new IOException(cause);
    }
  }