in src/main/java/com/microsoft/azure/datalake/store/ReadBufferManager.java [143:186]
int getBlock(ADLFileInputStream file, long position, int length, byte[] buffer) {
// not synchronized, so have to be careful with locking
if (log.isTraceEnabled())
log.trace("getBlock for file " + file.getFilename() + " position " + position + " thread " + Thread.currentThread().getName());
{ // block scope, to scope the usage of readbuf. The two synchronized blocks should not share any data, to
// ensure there are no race conditions.
ReadBuffer readBuf;
synchronized (this) {
clearFromReadAheadQueue(file, position);
readBuf = getFromList(inProgressList, file, position);
}
if (readBuf != null) { // if in in-progress queue, then block for it
try {
if (log.isTraceEnabled())
log.trace("got a relevant read buffer for file " + file.getFilename() + " offset " + readBuf.offset + " buffer idx " + readBuf.bufferindex);
readBuf.latch.await(); // blocking wait on the caller stream's thread
// Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
// is done processing it (in doneReading). There, the latch is set after removing the buffer from
// inProgressList. So this latch is safe to be outside the synchronized block.
// Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
// while waiting, so no one will be able to change any state. If this becomes more complex in the future,
// then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
if (log.isTraceEnabled())
log.trace("latch done for file " + file.getFilename() + " buffer idx " + readBuf.bufferindex + " length " + readBuf.length);
}
}
int bytesRead = 0;
synchronized (this) {
bytesRead = getBlockFromCompletedQueue(file, position, length, buffer);
}
if (bytesRead > 0) {
if (log.isTraceEnabled())
log.trace("Done read from Cache for " + file.getFilename() + " position " + position + " length " + bytesRead);
return bytesRead;
}
// otherwise, just say we got nothing - calling thread can do it's own read
return 0;
}