protected void flushQueue()

in sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java [422:534]


    protected void flushQueue(DefaultKeyExchangeFuture flushDone) {
        flushRunner.submit(() -> {
            List<SimpleImmutableEntry<PendingWriteFuture, IoWriteFuture>> pendingFutures = new ArrayList<>();
            boolean allFlushed = false;
            DefaultKeyExchangeFuture newFuture = null;
            try {
                boolean warnedAboutChunkLimit = false;
                int lastSize = -1;
                int take = 2;
                while (!allFlushed) {
                    if (!session.isOpen()) {
                        log.info("flushQueue({}): Session closed while flushing pending packets at end of KEX", session);
                        flushDone.setValue(Boolean.FALSE);
                        return;
                    }
                    // Using the writeLock this thread gets priority over the readLock used by writePacket(). Note that
                    // the outer loop essentially is just a loop around the critical region, so typically only one
                    // reader (i.e., writePacket() call) gets the lock before we get it again, and thus the flush really
                    // should rarely need to increase the chunk size. Data pumping threads in the application remain
                    // blocked until flushing is done.
                    lock.writeLock().lock();
                    try {
                        if (pendingPackets.isEmpty()) {
                            if (log.isDebugEnabled()) {
                                log.debug("flushQueue({}): All packets at end of KEX flushed", session);
                            }
                            kexFlushed = true;
                            allFlushed = true;
                            break;
                        }
                        if (kexFlushedFuture != flushDone) {
                            if (log.isDebugEnabled()) {
                                log.debug("flushQueue({}): Stopping flushing pending packets", session);
                            }
                            // Another KEX was started. Exit and hook up the flushDone future with the new future.
                            newFuture = kexFlushedFuture;
                            break;
                        }
                        int newSize = pendingPackets.size();
                        if (lastSize < 0) {
                            log.info("flushQueue({}): {} pending packets to flush", session, newSize);
                        } else if (newSize >= lastSize) {
                            log.info("flushQueue({}): queue size before={} now={}", session, lastSize, newSize);
                            // More new enqueues while we had written. Try writing more in one go to make progress.
                            if (take < 64) {
                                take *= 2;
                            } else if (!warnedAboutChunkLimit) {
                                warnedAboutChunkLimit = true;
                                log.warn("flushQueue({}): maximum queue flush chunk of 64 reached", session);
                            }
                        }
                        lastSize = newSize;
                        if (log.isDebugEnabled()) {
                            log.debug("flushQueue({}): flushing {} packets", session, Math.min(lastSize, take));
                        }
                        for (int i = 0; i < take; i++) {
                            PendingWriteFuture pending = pendingPackets.poll();
                            if (pending == null) {
                                break;
                            }
                            IoWriteFuture written;
                            try {
                                if (log.isTraceEnabled()) {
                                    log.trace("flushQueue({}): Flushing a packet at end of KEX for {}", session,
                                            pending.getId());
                                }
                                written = session.doWritePacket(pending.getBuffer());
                                pendingFutures.add(new SimpleImmutableEntry<>(pending, written));
                                if (log.isTraceEnabled()) {
                                    log.trace("flushQueue({}): Flushed a packet at end of KEX for {}", session,
                                            pending.getId());
                                }
                                session.resetIdleTimeout();
                            } catch (Throwable e) {
                                log.error("flushQueue({}): Exception while flushing packet at end of KEX for {}", session,
                                        pending.getId(), e);
                                pending.setException(e);
                                flushDone.setValue(e);
                                session.exceptionCaught(e);
                                return;
                            }
                        }
                        if (pendingPackets.isEmpty()) {
                            if (log.isDebugEnabled()) {
                                log.debug("flushQueue({}): All packets at end of KEX flushed", session);
                            }
                            kexFlushed = true;
                            allFlushed = true;
                            break;
                        }
                    } finally {
                        lock.writeLock().unlock();
                    }
                }
            } finally {
                if (allFlushed) {
                    flushDone.setValue(Boolean.TRUE);
                } else if (newFuture != null) {
                    newFuture.addListener(f -> {
                        Throwable error = f.getException();
                        if (error != null) {
                            flushDone.setValue(error);
                        } else {
                            flushDone.setValue(Boolean.TRUE);
                        }
                    });
                }
                // Connect all futures of packets that we wrote. We do this at the end instead of one-by-one inside the
                // loop to minimize the risk that woken up threads waiting on these futures queue up additional packets.
                pendingFutures.forEach(e -> e.getValue().addListener(e.getKey()));
            }
        });
    }