in sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java [422:534]
protected void flushQueue(DefaultKeyExchangeFuture flushDone) {
flushRunner.submit(() -> {
List<SimpleImmutableEntry<PendingWriteFuture, IoWriteFuture>> pendingFutures = new ArrayList<>();
boolean allFlushed = false;
DefaultKeyExchangeFuture newFuture = null;
try {
boolean warnedAboutChunkLimit = false;
int lastSize = -1;
int take = 2;
while (!allFlushed) {
if (!session.isOpen()) {
log.info("flushQueue({}): Session closed while flushing pending packets at end of KEX", session);
flushDone.setValue(Boolean.FALSE);
return;
}
// Using the writeLock this thread gets priority over the readLock used by writePacket(). Note that
// the outer loop essentially is just a loop around the critical region, so typically only one
// reader (i.e., writePacket() call) gets the lock before we get it again, and thus the flush really
// should rarely need to increase the chunk size. Data pumping threads in the application remain
// blocked until flushing is done.
lock.writeLock().lock();
try {
if (pendingPackets.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("flushQueue({}): All packets at end of KEX flushed", session);
}
kexFlushed = true;
allFlushed = true;
break;
}
if (kexFlushedFuture != flushDone) {
if (log.isDebugEnabled()) {
log.debug("flushQueue({}): Stopping flushing pending packets", session);
}
// Another KEX was started. Exit and hook up the flushDone future with the new future.
newFuture = kexFlushedFuture;
break;
}
int newSize = pendingPackets.size();
if (lastSize < 0) {
log.info("flushQueue({}): {} pending packets to flush", session, newSize);
} else if (newSize >= lastSize) {
log.info("flushQueue({}): queue size before={} now={}", session, lastSize, newSize);
// More new enqueues while we had written. Try writing more in one go to make progress.
if (take < 64) {
take *= 2;
} else if (!warnedAboutChunkLimit) {
warnedAboutChunkLimit = true;
log.warn("flushQueue({}): maximum queue flush chunk of 64 reached", session);
}
}
lastSize = newSize;
if (log.isDebugEnabled()) {
log.debug("flushQueue({}): flushing {} packets", session, Math.min(lastSize, take));
}
for (int i = 0; i < take; i++) {
PendingWriteFuture pending = pendingPackets.poll();
if (pending == null) {
break;
}
IoWriteFuture written;
try {
if (log.isTraceEnabled()) {
log.trace("flushQueue({}): Flushing a packet at end of KEX for {}", session,
pending.getId());
}
written = session.doWritePacket(pending.getBuffer());
pendingFutures.add(new SimpleImmutableEntry<>(pending, written));
if (log.isTraceEnabled()) {
log.trace("flushQueue({}): Flushed a packet at end of KEX for {}", session,
pending.getId());
}
session.resetIdleTimeout();
} catch (Throwable e) {
log.error("flushQueue({}): Exception while flushing packet at end of KEX for {}", session,
pending.getId(), e);
pending.setException(e);
flushDone.setValue(e);
session.exceptionCaught(e);
return;
}
}
if (pendingPackets.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("flushQueue({}): All packets at end of KEX flushed", session);
}
kexFlushed = true;
allFlushed = true;
break;
}
} finally {
lock.writeLock().unlock();
}
}
} finally {
if (allFlushed) {
flushDone.setValue(Boolean.TRUE);
} else if (newFuture != null) {
newFuture.addListener(f -> {
Throwable error = f.getException();
if (error != null) {
flushDone.setValue(error);
} else {
flushDone.setValue(Boolean.TRUE);
}
});
}
// Connect all futures of packets that we wrote. We do this at the end instead of one-by-one inside the
// loop to minimize the risk that woken up threads waiting on these futures queue up additional packets.
pendingFutures.forEach(e -> e.getValue().addListener(e.getKey()));
}
});
}