in sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/SftpInputStreamAsync.java [261:329]
protected boolean fillData() throws IOException {
SftpAckData ack = pendingReads.pollFirst();
boolean traceEnabled = log.isTraceEnabled();
if (ack == null) {
if (traceEnabled) {
log.trace("fillData({}) no pending ack", this);
}
return false;
}
if (traceEnabled) {
log.trace("fillData({}) process ack={}", this, ack);
}
boolean alreadyEof = eofIndicator;
pollBuffer(ack);
if (!alreadyEof && clientOffset < ack.offset) {
shortReads++;
// we are actually missing some data
// so request is synchronously
byte[] data = new byte[(int) (ack.offset - clientOffset + buffer.available())];
int nb = (int) (ack.offset - clientOffset);
if (traceEnabled) {
log.trace("fillData({}) reading {} bytes", this, nb);
}
AtomicReference<Boolean> eof = new AtomicReference<>();
SftpClient client = getClient();
int cur = 0;
while (cur < nb) {
int dlen = client.read(handle, clientOffset + cur, data, cur, nb - cur, eof);
if (dlen > 0) {
cur += dlen;
}
Boolean eofSignal = eof.getAndSet(null);
if ((dlen < 0) || ((eofSignal != null) && eofSignal.booleanValue())) {
eofIndicator = true;
break;
}
}
if (traceEnabled) {
log.trace("fillData({}) read {} bytes - EOF={}", this, cur, eofIndicator);
}
if (cur > 0) {
buffer.getRawBytes(data, cur, buffer.available());
buffer = new ByteArrayBuffer(data);
} else {
buffer.rpos(buffer.wpos());
}
if (!eofIndicator && !bufferAdjusted) {
int newBufferSize = adjustBufferIfNeeded(bufferSize, shortReads, maxReceived, ack.offset - clientOffset);
if (newBufferSize > 0 && newBufferSize < bufferSize) {
int originalSize = bufferSize;
bufferSize = newBufferSize;
bufferAdjusted = true;
if (log.isDebugEnabled()) {
log.debug("adjustBufferIfNeeded({}) changing SFTP buffer size: {} -> {}", this, originalSize,
bufferSize);
}
} else if (newBufferSize > bufferSize) {
throw new IllegalStateException("New buffer size " + newBufferSize + " > existing size " + bufferSize);
}
}
return !pendingReads.isEmpty();
}
return false;
}