in sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java [423:555]
protected void flushQueue(DefaultKeyExchangeFuture flushDone) {
// kexFlushed must be set to true in all cases when this thread exits, **except** if a new KEX has started while
// flushing.
flushRunner.submit(() -> {
List<SimpleImmutableEntry<PendingWriteFuture, IoWriteFuture>> pendingFutures = new ArrayList<>();
boolean allFlushed = false;
DefaultKeyExchangeFuture newFuture = null;
// A Throwable when doWritePacket fails, or Boolean.FALSE if the session closes while flushing.
Object error = null;
try {
boolean warnedAboutChunkLimit = false;
int lastSize = -1;
int take = 2;
while (!allFlushed) {
// Using the writeLock this thread gets priority over the readLock used by writePacket(). Note that
// the outer loop essentially is just a loop around the critical region, so typically only one
// reader (i.e., writePacket() call) gets the lock before we get it again, and thus the flush really
// should rarely need to increase the chunk size. Data pumping threads in the application remain
// blocked until flushing is done.
lock.writeLock().lock();
try {
if (pendingPackets.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("flushQueue({}): All packets at end of KEX flushed", session);
}
kexFlushed.set(true);
allFlushed = true;
break;
}
if (!session.isOpen()) {
log.info("flushQueue({}): Session closed while flushing pending packets at end of KEX", session);
AbstractIoWriteFuture aborted = new AbstractIoWriteFuture(session, null) {
// Nothing extra
};
aborted.setValue(new SshException("Session closed while flushing pending packets at end of KEX"));
drainQueueTo(pendingFutures, aborted);
kexFlushed.set(true);
error = Boolean.FALSE;
break;
}
DefaultKeyExchangeFuture currentFuture = kexFlushedFuture.get();
if (currentFuture != flushDone) {
if (log.isDebugEnabled()) {
log.debug("flushQueue({}): Stopping flushing pending packets", session);
}
// Another KEX was started. Exit and hook up the flushDone future with the new future.
newFuture = currentFuture;
break;
}
int newSize = pendingPackets.size();
if (lastSize < 0) {
log.info("flushQueue({}): {} pending packets to flush", session, newSize);
} else if (newSize >= lastSize) {
log.info("flushQueue({}): queue size before={} now={}", session, lastSize, newSize);
// More new enqueues while we had written. Try writing more in one go to make progress.
if (take < 64) {
take *= 2;
} else if (!warnedAboutChunkLimit) {
warnedAboutChunkLimit = true;
log.warn("flushQueue({}): maximum queue flush chunk of 64 reached", session);
}
}
lastSize = newSize;
if (log.isDebugEnabled()) {
log.debug("flushQueue({}): flushing {} packets", session, Math.min(lastSize, take));
}
for (int i = 0; i < take; i++) {
PendingWriteFuture pending = pendingPackets.poll();
if (pending == null) {
break;
}
IoWriteFuture written;
try {
if (log.isTraceEnabled()) {
log.trace("flushQueue({}): Flushing a packet at end of KEX for {}", session,
pending.getId());
}
written = session.doWritePacket(pending.getBuffer());
} catch (Throwable e) {
log.error("flushQueue({}): Exception while flushing packet at end of KEX for {}", session,
pending.getId(), e);
AbstractIoWriteFuture aborted = new AbstractIoWriteFuture(pending.getId(), null) {
// Nothing extra
};
aborted.setValue(e);
pendingFutures.add(new SimpleImmutableEntry<>(pending, aborted));
drainQueueTo(pendingFutures, aborted);
kexFlushed.set(true);
// Remember the error, but close the session outside of the lock critical region.
error = e;
return;
}
pendingFutures.add(new SimpleImmutableEntry<>(pending, written));
if (log.isTraceEnabled()) {
log.trace("flushQueue({}): Flushed a packet at end of KEX for {}", session, pending.getId());
}
session.resetIdleTimeout();
}
if (pendingPackets.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("flushQueue({}): All packets at end of KEX flushed", session);
}
kexFlushed.set(true);
allFlushed = true;
break;
}
} finally {
lock.writeLock().unlock();
}
}
} finally {
if (allFlushed) {
flushDone.setValue(Boolean.TRUE);
} else if (error != null) {
// We'll close the session (or it is closing already). Pretend we had written everything.
flushDone.setValue(error);
if (error instanceof Throwable) {
session.exceptionCaught((Throwable) error);
}
} else if (newFuture != null) {
newFuture.addListener(f -> {
Throwable failed = f.getException();
flushDone.setValue(failed != null ? failed : Boolean.TRUE);
});
}
// Connect all futures of packets that we wrote. We do this at the end instead of one-by-one inside the
// loop to minimize the risk that woken up threads waiting on these futures queue up additional packets.
pendingFutures.forEach(e -> e.getValue().addListener(e.getKey()));
}
});
}