protected boolean fillData()

in sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpInputStreamAsync.java [261:329]


    protected boolean fillData() throws IOException {
        SftpAckData ack = pendingReads.pollFirst();
        boolean traceEnabled = log.isTraceEnabled();
        if (ack == null) {
            if (traceEnabled) {
                log.trace("fillData({}) no pending ack", this);
            }
            return false;
        }

        if (traceEnabled) {
            log.trace("fillData({}) process ack={}", this, ack);
        }
        boolean alreadyEof = eofIndicator;
        pollBuffer(ack);

        if (!alreadyEof && clientOffset < ack.offset) {
            shortReads++;
            // we are actually missing some data
            // so request is synchronously
            byte[] data = new byte[(int) (ack.offset - clientOffset + buffer.available())];
            int nb = (int) (ack.offset - clientOffset);
            if (traceEnabled) {
                log.trace("fillData({}) reading {} bytes", this, nb);
            }

            AtomicReference<Boolean> eof = new AtomicReference<>();
            SftpClient client = getClient();
            int cur = 0;
            while (cur < nb) {
                int dlen = client.read(handle, clientOffset + cur, data, cur, nb - cur, eof);
                if (dlen > 0) {
                    cur += dlen;
                }
                Boolean eofSignal = eof.getAndSet(null);
                if ((dlen < 0) || ((eofSignal != null) && eofSignal.booleanValue())) {
                    eofIndicator = true;
                    break;
                }
            }

            if (traceEnabled) {
                log.trace("fillData({}) read {} bytes - EOF={}", this, cur, eofIndicator);
            }

            if (cur > 0) {
                buffer.getRawBytes(data, cur, buffer.available());
                buffer = new ByteArrayBuffer(data);
            } else {
                buffer.rpos(buffer.wpos());
            }
            if (!eofIndicator && !bufferAdjusted) {
                int newBufferSize = adjustBufferIfNeeded(bufferSize, shortReads, maxReceived, ack.offset - clientOffset);
                if (newBufferSize > 0 && newBufferSize < bufferSize) {
                    int originalSize = bufferSize;
                    bufferSize = newBufferSize;
                    bufferAdjusted = true;
                    if (log.isDebugEnabled()) {
                        log.debug("adjustBufferIfNeeded({}) changing SFTP buffer size: {} -> {}", this, originalSize,
                                bufferSize);
                    }
                } else if (newBufferSize > bufferSize) {
                    throw new IllegalStateException("New buffer size " + newBufferSize + " > existing size " + bufferSize);
                }
            }
            return !pendingReads.isEmpty();
        }
        return false;
    }