in hadoop-api-shim/src/main/java/org/apache/hadoop/fs/shim/impl/FSDataInputStreamShimImpl.java [241:290]
private synchronized void fallbackByteBufferReadFully(long position, ByteBuffer buf)
throws IOException {
FSDataInputStream in = getInstance();
int len = buf.remaining();
LOG.debug("read @{} {} bytes", position, len);
// position to return to.
if (buf.hasArray()) {
readIntoArrayByteBufferThroughReadFully(position, buf, len);
return;
}
// no array.
// is the inner stream ByteBufferReadable? if so, read
// through that then seek back.
if (isByteBufferReadableAvailable.get()) {
LOG.debug("reading bytebuffer through seek and read(ByteBuffer)");
try (SeekToThenBack back = new SeekToThenBack(position)) {
while (buf.remaining() > 0) {
int bytesRead = in.read(buf);
if (bytesRead < 0) {
throw new EOFException("No more data in stream; needed "
+ buf.remaining() + " to complete the read");
}
}
return;
} catch (UnsupportedOperationException ex) {
LOG.debug("stream does not support ByteBufferReadable", ex);
// don't try using this again
isByteBufferReadableAvailable.set(false);
/* and fall through into the final strategy */
}
}
// final strategy.
// buffer isn't an array, so need to create a smaller one then read via a series of readFully
// calls.
LOG.debug("Reading the byte buffer by reading into an array and copying");
int bufferSize = Math.min(len, TEMPORARY_BUFFER);
byte[] byteArray = new byte[bufferSize];
long nextReadPosition = position;
while (buf.remaining() > 0) {
int bytesToRead = Math.min(bufferSize, buf.remaining());
LOG.debug("Reading {} bytes from {}", bytesToRead, nextReadPosition);
getInstance().readFully(nextReadPosition, byteArray, 0,
bytesToRead);
buf.put(byteArray, 0, bytesToRead);
// move forward in the file
nextReadPosition += bytesToRead;
}
}