int getBlock()

in src/main/java/com/microsoft/azure/datalake/store/ReadBufferManager.java [143:186]


    int getBlock(ADLFileInputStream file, long position, int length, byte[] buffer) {
        // not synchronized, so have to be careful with locking
        if (log.isTraceEnabled())
            log.trace("getBlock for file " + file.getFilename() + " position " + position + " thread " + Thread.currentThread().getName());

        { // block scope, to scope the usage of readbuf. The two synchronized blocks should not share any data, to
          // ensure there are no race conditions.
            ReadBuffer readBuf;
            synchronized (this) {
                clearFromReadAheadQueue(file, position);
                readBuf = getFromList(inProgressList, file, position);
            }
            if (readBuf != null) {         // if in in-progress queue, then block for it
                try {
                    if (log.isTraceEnabled())
                        log.trace("got a relevant read buffer for file " + file.getFilename() + " offset " + readBuf.offset + " buffer idx " + readBuf.bufferindex);
                    readBuf.latch.await();  // blocking wait on the caller stream's thread
                    // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
                    // is done processing it (in doneReading). There, the latch is set after removing the buffer from
                    // inProgressList. So this latch is safe to be outside the synchronized block.
                    // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
                    // while waiting, so no one will be able to  change any state. If this becomes more complex in the future,
                    // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
                if (log.isTraceEnabled())
                    log.trace("latch done for file " + file.getFilename() + " buffer idx " + readBuf.bufferindex + " length " + readBuf.length);
            }
        }

        int bytesRead = 0;
        synchronized (this) {
            bytesRead = getBlockFromCompletedQueue(file, position, length, buffer);
        }
        if (bytesRead > 0) {
            if (log.isTraceEnabled())
                log.trace("Done read from Cache for " + file.getFilename() + " position " + position + " length " + bytesRead);
            return bytesRead;
        }

        // otherwise, just say we got nothing - calling thread can do it's own read
        return 0;
    }