in src/main/java/build/buildfarm/cas/cfc/CASFileCache.java [541:615]
public void get(
Digest digest,
long offset,
long count,
ServerCallStreamObserver<ByteString> blobObserver,
RequestMetadata requestMetadata) {
InputStream in;
try {
in = newInput(digest, offset);
} catch (IOException e) {
blobObserver.onError(e);
return;
}
blobObserver.setOnCancelHandler(
() -> {
try {
in.close();
} catch (IOException e) {
logger.log(Level.SEVERE, "error closing input stream on cancel", e);
}
});
byte[] buffer = new byte[CHUNK_SIZE];
int initialLength;
try {
initialLength = in.read(buffer);
} catch (IOException e) {
try {
in.close();
} catch (IOException ioEx) {
logger.log(Level.SEVERE, "error closing input stream on error", ioEx);
}
blobObserver.onError(e);
return;
}
final class ReadOnReadyHandler implements Runnable {
private boolean wasReady = false;
private int len = initialLength;
@Override
public void run() {
if (blobObserver.isReady() && !wasReady) {
wasReady = true;
try {
sendBuffer();
} catch (IOException e) {
logger.log(Level.SEVERE, "error reading from input stream", e);
try {
in.close();
} catch (IOException ioEx) {
logger.log(Level.SEVERE, "error closing input stream on error", ioEx);
}
blobObserver.onError(e);
}
}
}
void sendBuffer() throws IOException {
while (len >= 0 && wasReady) {
if (len != 0) {
blobObserver.onNext(ByteString.copyFrom(buffer, 0, len));
}
len = in.read(buffer);
if (!blobObserver.isReady()) {
wasReady = false;
}
}
if (len < 0) {
in.close();
blobObserver.onCompleted();
}
}
}
blobObserver.setOnReadyHandler(new ReadOnReadyHandler());
}