in sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java [142:240]
public synchronized void write(byte[] buf, int s, int l) throws IOException {
// This is the only use of this instance's monitor; it's used exclusively to synchronize concurrent writes.
Channel channel = getChannel();
if (!isOpen()) {
throw new SshChannelClosedException(channel.getChannelId(),
"write(" + this + ") len=" + l + " - channel already closed");
}
Session session = channel.getSession();
boolean debugEnabled = log.isDebugEnabled();
boolean traceEnabled = log.isTraceEnabled();
boolean flushed = false;
WriteState state;
int nanos = maxWaitTimeout.getNano();
long millisInSecond = TimeUnit.NANOSECONDS.toMillis(nanos);
long millis = TimeUnit.SECONDS.toMillis(maxWaitTimeout.getSeconds()) + millisInSecond;
nanos -= TimeUnit.MILLISECONDS.toNanos(millisInSecond);
while (l > 0) {
flushed = false;
state = WriteState.BUFFERED;
synchronized (bufferLock) {
while (isFlushing) {
try {
bufferLock.wait(millis, nanos);
} catch (InterruptedException e) {
InterruptedIOException interrupted = new InterruptedIOException(
channel.getChannelId() + ": write interrupted waiting for flush()");
interrupted.initCause(e);
Thread.currentThread().interrupt();
throw interrupted;
}
}
while (l > 0) {
// The maximum amount we should admit without flushing again
// is enough to make up one full packet within our allowed
// window size. We give ourselves a credit equal to the last
// packet we sent to allow the producer to race ahead and fill
// out the next packet before we block and wait for space to
// become available again.
long minReqLen = Math.min(remoteWindow.getSize() + lastSize, remoteWindow.getPacketSize());
long l2 = Math.min(l, minReqLen - bufferLength);
if (l2 <= 0) {
if (bufferLength > 0) {
state = WriteState.NEED_FLUSH;
} else {
state = WriteState.NEED_SPACE;
}
session.resetIdleTimeout();
break;
}
ValidateUtils.checkTrue(l2 <= Integer.MAX_VALUE, "Accumulated bytes length exceeds int boundary: %d", l2);
buffer.putRawBytes(buf, s, (int) l2);
bufferLength += l2;
s += l2;
l -= l2;
}
}
switch (state) {
case NEED_FLUSH:
flush();
flushed = true;
session.resetIdleTimeout();
break;
case NEED_SPACE:
try {
long available = remoteWindow.waitForSpace(maxWaitTimeout);
if (traceEnabled) {
log.trace("write({}) len={} - available={}", this, l, available);
}
} catch (IOException e) {
LoggingUtils.debug(log, "write({}) failed ({}) to wait for space of len={}: {}",
this, e.getClass().getSimpleName(), l, e.getMessage(), e);
if ((e instanceof WindowClosedException) && (OpenState.OPEN == openState.getAndSet(OpenState.CLOSED))) {
if (debugEnabled) {
log.debug("write({})[len={}] closing due to window closed", this, l);
}
}
throw e;
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException(
"Interrupted while waiting for remote space on write len=" + l + " to " + this)
.initCause(e);
}
session.resetIdleTimeout();
break;
default:
// BUFFERED implies l == 0; outer loop will terminate
break;
}
}
if (isNoDelay() && !flushed) {
flush();
} else {
session.resetIdleTimeout();
}
}