in src/main/java/org/apache/commons/io/input/ReadAheadInputStream.java [378:456]
private void readAsync() throws IOException {
stateChangeLock.lock();
final byte[] arr;
try {
arr = readAheadBuffer.array();
if (endOfStream || readInProgress) {
return;
}
checkReadException();
readAheadBuffer.position(0);
readAheadBuffer.flip();
readInProgress = true;
} finally {
stateChangeLock.unlock();
}
executorService.execute(() -> {
stateChangeLock.lock();
try {
if (isClosed) {
readInProgress = false;
return;
}
// Flip this so that the close method will not close the underlying input stream when we
// are reading.
isReading = true;
} finally {
stateChangeLock.unlock();
}
// Please note that it is safe to release the lock and read into the read ahead buffer
// because either of following two conditions will hold:
//
// 1. The active buffer has data available to read so the reader will not read from the read ahead buffer.
//
// 2. This is the first time read is called or the active buffer is exhausted, in that case the reader waits
// for this async read to complete.
//
// So there is no race condition in both the situations.
int read = 0;
int off = 0;
int len = arr.length;
Throwable exception = null;
try {
// try to fill the read ahead buffer.
// if a reader is waiting, possibly return early.
do {
read = in.read(arr, off, len);
if (read <= 0) {
break;
}
off += read;
len -= read;
} while (len > 0 && !isWaiting.get());
} catch (final Throwable ex) {
exception = ex;
if (ex instanceof Error) {
// `readException` may not be reported to the user. Rethrow Error to make sure at least
// The user can see Error in UncaughtExceptionHandler.
throw (Error) ex;
}
} finally {
stateChangeLock.lock();
try {
readAheadBuffer.limit(off);
if (read < 0 || exception instanceof EOFException) {
endOfStream = true;
} else if (exception != null) {
readAborted = true;
readException = exception;
}
readInProgress = false;
signalAsyncReadComplete();
} finally {
stateChangeLock.unlock();
}
closeUnderlyingInputStreamIfNecessary();
}
});
}