in sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java [243:344]
public void flush() throws IOException {
// Concurrent flushes are OK. We choose the simple way: if a flush is already going on, the second flush is
// simply a no-op.
//
// The framework may flush concurrently when it closes the stream if there is an exception at an inopportune
// moment, for instance during KEX.
Channel channel = getChannel();
if (OpenState.CLOSED.equals(openState.get())) {
throw new SshChannelClosedException(channel.getChannelId(),
"flush(" + this + ") length=" + bufferLength + " - stream is already closed");
}
Session session = channel.getSession();
boolean traceEnabled = log.isTraceEnabled();
Buffer buf;
int remaining;
synchronized (bufferLock) {
remaining = bufferLength;
if (isFlushing) {
return;
}
if (remaining == 0) {
bufferLock.notifyAll();
return;
}
isFlushing = true;
buf = buffer;
}
try {
while (remaining > 0) {
session.resetIdleTimeout();
long total = remaining;
long available;
try {
available = remoteWindow.waitForSpace(maxWaitTimeout);
if (traceEnabled) {
log.trace("flush({}) len={}, available={}", this, total, available);
}
} catch (IOException e) {
LoggingUtils.debug(log, "flush({}) failed ({}) to wait for space of len={}: {}",
this, e.getClass().getSimpleName(), total, e.getMessage(), e);
throw e;
}
long lenToSend = Math.min(available, total);
long length = Math.min(lenToSend, remoteWindow.getPacketSize());
if (length > Integer.MAX_VALUE) {
throw new StreamCorruptedException("Accumulated " + SshConstants.getCommandMessageName(cmd)
+ " command bytes size (" + length + ") exceeds int boundaries");
}
int pos = buf.wpos();
buf.wpos((cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) ? 14 : 10);
buf.putUInt(length);
buf.wpos(buf.wpos() + (int) length);
Buffer freshBuffer;
if (total == length) {
freshBuffer = newBuffer((int) length);
remaining = 0;
} else {
long leftover = total - length;
freshBuffer = newBuffer((int) Math.max(leftover, length));
freshBuffer.putRawBytes(buf.array(), pos - (int) leftover, (int) leftover);
remaining = (int) leftover;
}
synchronized (bufferLock) {
buffer = freshBuffer;
bufferLength = remaining;
lastSize = (int) length;
}
session.resetIdleTimeout();
remoteWindow.waitAndConsume(length, maxWaitTimeout);
if (traceEnabled) {
log.trace("flush({}) send {} len={}",
channel, SshConstants.getCommandMessageName(cmd), length);
}
packetWriter.writeData(buf);
buf = freshBuffer;
}
} catch (WindowClosedException e) {
if (OpenState.OPEN == openState.getAndSet(OpenState.CLOSED)) {
if (log.isDebugEnabled()) {
log.debug("flush({}) closing due to window closed", this);
}
}
throw e;
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException(
"Interrupted while waiting for remote space flush len=" + bufferLength + " to " + this).initCause(e);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new SshException(e);
} finally {
synchronized (bufferLock) {
isFlushing = false;
bufferLock.notifyAll();
}
}
}