in src/main/java/com/microsoft/azure/datalake/store/ADLFileInputStream.java [228:297]
int readRemote(long position, byte[] b, int offset, int length, boolean speculative) throws IOException {
if (position < 0) throw new IllegalArgumentException("attempting to read from negative offset");
if (position >= directoryEntry.length) return -1; // Hadoop prefers -1 to EOFException
if (b == null) throw new IllegalArgumentException("null byte array passed in to read() method");
if (offset >= b.length) throw new IllegalArgumentException("offset greater than length of array");
if (length < 0) throw new IllegalArgumentException("requested read length is less than zero");
if (length > (b.length - offset))
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
int totalBytesRead = 0;
int retriesRemaining = 1;
// retry is for the HTTP call succeeding, but the InputStream subsequently having an error.
// If the http call fails, the retry policy takes care of it
while (retriesRemaining >= 0) {
byte[] junkbuffer = new byte[16*1024];
RequestOptions opts = new RequestOptions();
opts.retryPolicy = speculative ? new NoRetryPolicy() : new ExponentialBackoffPolicy();
opts.timeout = client.timeout + (1000 * (length / 1000 /1000)); // 1 second grace per MB to be downloaded
OperationResponse resp = new OperationResponse();
InputStream inStream = Core.open(filename, position, length, sessionId, speculative, client, opts, resp);
if (speculative && !resp.successful && resp.httpResponseCode == 400 && resp.remoteExceptionName.equals("SpeculativeReadNotSupported")) {
client.disableReadAheads = true;
return 0;
}
if (!resp.successful) throw client.getExceptionFromResponse(resp, "Error reading from file " + filename);
if (resp.responseContentLength == 0 && !resp.responseChunked) return 0; //Got nothing
int bytesRead;
long start = System.nanoTime();
try {
do {
bytesRead = inStream.read(b, offset + totalBytesRead, length - totalBytesRead);
if (bytesRead > 0) { // if not EOF of the Core.open's stream
totalBytesRead += bytesRead;
}
} while (bytesRead >= 0 && totalBytesRead < length);
if (bytesRead >= 0) { // read to EOF on the stream, so connection can be reused
while (inStream.read(junkbuffer, 0, junkbuffer.length)>=0); // read and consume rest of stream, if any remains
}
} catch (IOException ex) {
inStream.close();
if (totalBytesRead > 0) {
return totalBytesRead;
} else {
// we could also just return zero bytes read and not do the retry - that would satisfy the InputStream contract
// But that is almost never the intent, so just doing the try internally if zero bytes have been read
if (retriesRemaining == 0) {
throw new ADLException("Error reading data from response stream in positioned read() for file " + filename, ex);
} else {
retriesRemaining--;
}
continue; // retry in the while loop
}
} finally {
if (inStream != null) inStream.close();
long timeTaken=(System.nanoTime() - start)/1000000;
if (log.isDebugEnabled()) {
String logline ="HTTPRequestRead," + (resp.successful?"Succeeded":"Failed") +
",cReqId:" + opts.requestid +
",lat:" + Long.toString(resp.lastCallLatency+timeTaken) +
",Reqlen:" + totalBytesRead +
",sReqId:" + resp.requestId +
",path:" + filename +
",offset:" + position;
log.debug(logline);
}
}
return totalBytesRead; // this breaks out of the retry loop
}
return totalBytesRead; // after three retries, this will return 0
}