in src/main/java/com/microsoft/azure/datalake/store/ReadBufferManager.java [102:126]
void queueReadAhead(ADLFileInputStream file, long requestedOffset, int requestedLength) {
if (log.isTraceEnabled())
log.trace("Start Queueing readAhead for " + file.getFilename() + " offset " + requestedOffset + " length " + requestedLength);
ReadBuffer buffer;
synchronized (this) {
if (isAlreadyQueued(file, requestedOffset)) return; // already queued, do not queue again
if (freeList.size() == 0 && !tryEvict()) return; // no buffers available, cannot queue anything
buffer = new ReadBuffer();
buffer.file = file;
buffer.offset = requestedOffset;
buffer.length = 0;
buffer.requestedLength = requestedLength;
buffer.status = ReadBufferStatus.NOT_AVAILABLE;
buffer.latch = new CountDownLatch(1);
Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
buffer.buffer = buffers[bufferIndex];
buffer.bufferindex = bufferIndex;
readAheadQueue.add(buffer);
notifyAll();
}
if (log.isTraceEnabled()) {
log.trace("Done q-ing readAhead for file " + file.getFilename() + " offset " + requestedOffset + " buffer idx " + buffer.bufferindex);
}
}