int readRemote()

in src/main/java/com/microsoft/azure/datalake/store/ADLFileInputStream.java [228:297]


    int readRemote(long position, byte[] b, int offset, int length, boolean speculative) throws IOException {
        if (position < 0) throw new IllegalArgumentException("attempting to read from negative offset");
        if (position >= directoryEntry.length) return -1;  // Hadoop prefers -1 to EOFException
        if (b == null) throw new IllegalArgumentException("null byte array passed in to read() method");
        if (offset >= b.length) throw new IllegalArgumentException("offset greater than length of array");
        if (length < 0) throw new IllegalArgumentException("requested read length is less than zero");
        if (length > (b.length - offset))
            throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");

        int totalBytesRead = 0;
        int retriesRemaining = 1;
        // retry is for the HTTP call succeeding, but the InputStream subsequently having an error.
        // If the http call fails, the retry policy takes care of it
        while (retriesRemaining >= 0) {
            byte[] junkbuffer = new byte[16*1024];
            RequestOptions opts = new RequestOptions();
            opts.retryPolicy = speculative ? new NoRetryPolicy() : new ExponentialBackoffPolicy();
            opts.timeout = client.timeout + (1000 * (length / 1000 /1000)); // 1 second grace per MB to be downloaded
            OperationResponse resp = new OperationResponse();
            InputStream inStream = Core.open(filename, position, length, sessionId, speculative, client, opts, resp);
            if (speculative && !resp.successful && resp.httpResponseCode == 400 && resp.remoteExceptionName.equals("SpeculativeReadNotSupported")) {
                client.disableReadAheads = true;
                return 0;
            }
            if (!resp.successful) throw client.getExceptionFromResponse(resp, "Error reading from file " + filename);
            if (resp.responseContentLength == 0 && !resp.responseChunked) return 0;  //Got nothing
            int bytesRead;
            long start = System.nanoTime();
            try {
                do {
                    bytesRead = inStream.read(b, offset + totalBytesRead, length - totalBytesRead);
                    if (bytesRead > 0) { // if not EOF of the Core.open's stream
                        totalBytesRead += bytesRead;
                    }
                } while (bytesRead >= 0 && totalBytesRead < length);
                if (bytesRead >= 0) {  // read to EOF on the stream, so connection can be reused
                    while (inStream.read(junkbuffer, 0, junkbuffer.length)>=0); // read and consume rest of stream, if any remains
                }
            } catch (IOException ex) {
                inStream.close();
                if (totalBytesRead > 0) {
                    return totalBytesRead;
                } else {
                    // we could also just return zero bytes read and not do the retry - that would satisfy the InputStream contract
                    // But that is almost never the intent, so just doing the try internally if zero bytes have been read
                    if (retriesRemaining == 0) {
                        throw new ADLException("Error reading data from response stream in positioned read() for file " + filename, ex);
                    } else {
                        retriesRemaining--;
                    }
                    continue;  // retry in the while loop
                }
            } finally {
                if (inStream != null) inStream.close();
                long timeTaken=(System.nanoTime() - start)/1000000;
                if (log.isDebugEnabled()) {
                    String logline ="HTTPRequestRead," + (resp.successful?"Succeeded":"Failed") +
                            ",cReqId:" + opts.requestid +
                            ",lat:" + Long.toString(resp.lastCallLatency+timeTaken) +
                            ",Reqlen:" + totalBytesRead +
                            ",sReqId:" + resp.requestId +
                            ",path:" + filename +
                            ",offset:" + position;
                    log.debug(logline);
                }
            }
            return totalBytesRead;  // this breaks out of the retry loop
        }
        return totalBytesRead;  // after three retries, this will return 0
    }