void readFrom()

in src/main/java/build/buildfarm/server/ByteStreamService.java [94:158]


  void readFrom(InputStream in, long limit, CallStreamObserver<ReadResponse> target) {
    final class ReadFromOnReadyHandler implements Runnable {
      private final byte[] buf = new byte[CHUNK_SIZE];
      private final boolean unlimited = limit == 0;
      private long remaining = limit;
      private boolean complete = false;

      ReadResponse next() throws IOException {
        int readBytes = in.read(buf, 0, (int) Math.min(remaining, buf.length));
        if (readBytes <= 0) {
          if (readBytes == -1) {
            if (!unlimited) {
              throw new UnexpectedEndOfStreamException(remaining, limit);
            }
            complete = true;
          }
          return ReadResponse.getDefaultInstance();
        }

        if (readBytes > remaining) {
          logger.log(Level.WARNING, format("read %d bytes, expected %d", readBytes, remaining));
          readBytes = (int) remaining;
        }
        remaining -= readBytes;
        complete = remaining == 0;
        return ReadResponse.newBuilder().setData(ByteString.copyFrom(buf, 0, readBytes)).build();
      }

      @Override
      public void run() {
        if (!complete) {
          copy();
        }
      }

      void copy() {
        try {
          while (target.isReady() && !complete) {
            ReadResponse response = next();
            if (response.getData().size() != 0) {
              target.onNext(response);
            }
          }

          if (complete) {
            in.close();
            target.onCompleted();
          }
        } catch (Exception e) {
          complete = true;
          try {
            in.close();
          } catch (IOException closeEx) {
            e.addSuppressed(closeEx);
          }
          if (e instanceof UnexpectedEndOfStreamException) {
            target.onError(Status.UNAVAILABLE.withCause(e).asException());
          } else {
            target.onError(e);
          }
        }
      }
    }
    target.setOnReadyHandler(new ReadFromOnReadyHandler());
  }