in src/main/java/build/buildfarm/server/ByteStreamService.java [94:158]
void readFrom(InputStream in, long limit, CallStreamObserver<ReadResponse> target) {
final class ReadFromOnReadyHandler implements Runnable {
private final byte[] buf = new byte[CHUNK_SIZE];
private final boolean unlimited = limit == 0;
private long remaining = limit;
private boolean complete = false;
ReadResponse next() throws IOException {
int readBytes = in.read(buf, 0, (int) Math.min(remaining, buf.length));
if (readBytes <= 0) {
if (readBytes == -1) {
if (!unlimited) {
throw new UnexpectedEndOfStreamException(remaining, limit);
}
complete = true;
}
return ReadResponse.getDefaultInstance();
}
if (readBytes > remaining) {
logger.log(Level.WARNING, format("read %d bytes, expected %d", readBytes, remaining));
readBytes = (int) remaining;
}
remaining -= readBytes;
complete = remaining == 0;
return ReadResponse.newBuilder().setData(ByteString.copyFrom(buf, 0, readBytes)).build();
}
@Override
public void run() {
if (!complete) {
copy();
}
}
void copy() {
try {
while (target.isReady() && !complete) {
ReadResponse response = next();
if (response.getData().size() != 0) {
target.onNext(response);
}
}
if (complete) {
in.close();
target.onCompleted();
}
} catch (Exception e) {
complete = true;
try {
in.close();
} catch (IOException closeEx) {
e.addSuppressed(closeEx);
}
if (e instanceof UnexpectedEndOfStreamException) {
target.onError(Status.UNAVAILABLE.withCause(e).asException());
} else {
target.onError(e);
}
}
}
}
target.setOnReadyHandler(new ReadFromOnReadyHandler());
}