protected void flushQueue()

in sshd-core/src/main/java/org/apache/sshd/common/session/helpers/KeyExchangeMessageHandler.java [423:555]


    protected void flushQueue(DefaultKeyExchangeFuture flushDone) {
        // kexFlushed must be set to true in all cases when this thread exits, **except** if a new KEX has started while
        // flushing.
        flushRunner.submit(() -> {
            List<SimpleImmutableEntry<PendingWriteFuture, IoWriteFuture>> pendingFutures = new ArrayList<>();
            boolean allFlushed = false;
            DefaultKeyExchangeFuture newFuture = null;
            // A Throwable when doWritePacket fails, or Boolean.FALSE if the session closes while flushing.
            Object error = null;
            try {
                boolean warnedAboutChunkLimit = false;
                int lastSize = -1;
                int take = 2;
                while (!allFlushed) {
                    // Using the writeLock this thread gets priority over the readLock used by writePacket(). Note that
                    // the outer loop essentially is just a loop around the critical region, so typically only one
                    // reader (i.e., writePacket() call) gets the lock before we get it again, and thus the flush really
                    // should rarely need to increase the chunk size. Data pumping threads in the application remain
                    // blocked until flushing is done.
                    lock.writeLock().lock();
                    try {
                        if (pendingPackets.isEmpty()) {
                            if (log.isDebugEnabled()) {
                                log.debug("flushQueue({}): All packets at end of KEX flushed", session);
                            }
                            kexFlushed.set(true);
                            allFlushed = true;
                            break;
                        }

                        if (!session.isOpen()) {
                            log.info("flushQueue({}): Session closed while flushing pending packets at end of KEX", session);
                            AbstractIoWriteFuture aborted = new AbstractIoWriteFuture(session, null) {
                                // Nothing extra
                            };
                            aborted.setValue(new SshException("Session closed while flushing pending packets at end of KEX"));
                            drainQueueTo(pendingFutures, aborted);
                            kexFlushed.set(true);
                            error = Boolean.FALSE;
                            break;
                        }

                        DefaultKeyExchangeFuture currentFuture = kexFlushedFuture.get();
                        if (currentFuture != flushDone) {
                            if (log.isDebugEnabled()) {
                                log.debug("flushQueue({}): Stopping flushing pending packets", session);
                            }
                            // Another KEX was started. Exit and hook up the flushDone future with the new future.
                            newFuture = currentFuture;
                            break;
                        }
                        int newSize = pendingPackets.size();
                        if (lastSize < 0) {
                            log.info("flushQueue({}): {} pending packets to flush", session, newSize);
                        } else if (newSize >= lastSize) {
                            log.info("flushQueue({}): queue size before={} now={}", session, lastSize, newSize);
                            // More new enqueues while we had written. Try writing more in one go to make progress.
                            if (take < 64) {
                                take *= 2;
                            } else if (!warnedAboutChunkLimit) {
                                warnedAboutChunkLimit = true;
                                log.warn("flushQueue({}): maximum queue flush chunk of 64 reached", session);
                            }
                        }
                        lastSize = newSize;
                        if (log.isDebugEnabled()) {
                            log.debug("flushQueue({}): flushing {} packets", session, Math.min(lastSize, take));
                        }
                        for (int i = 0; i < take; i++) {
                            PendingWriteFuture pending = pendingPackets.poll();
                            if (pending == null) {
                                break;
                            }
                            IoWriteFuture written;
                            try {
                                if (log.isTraceEnabled()) {
                                    log.trace("flushQueue({}): Flushing a packet at end of KEX for {}", session,
                                            pending.getId());
                                }
                                written = session.doWritePacket(pending.getBuffer());
                            } catch (Throwable e) {
                                log.error("flushQueue({}): Exception while flushing packet at end of KEX for {}", session,
                                        pending.getId(), e);
                                AbstractIoWriteFuture aborted = new AbstractIoWriteFuture(pending.getId(), null) {
                                    // Nothing extra
                                };
                                aborted.setValue(e);
                                pendingFutures.add(new SimpleImmutableEntry<>(pending, aborted));
                                drainQueueTo(pendingFutures, aborted);
                                kexFlushed.set(true);
                                // Remember the error, but close the session outside of the lock critical region.
                                error = e;
                                return;
                            }
                            pendingFutures.add(new SimpleImmutableEntry<>(pending, written));
                            if (log.isTraceEnabled()) {
                                log.trace("flushQueue({}): Flushed a packet at end of KEX for {}", session, pending.getId());
                            }
                            session.resetIdleTimeout();
                        }
                        if (pendingPackets.isEmpty()) {
                            if (log.isDebugEnabled()) {
                                log.debug("flushQueue({}): All packets at end of KEX flushed", session);
                            }
                            kexFlushed.set(true);
                            allFlushed = true;
                            break;
                        }
                    } finally {
                        lock.writeLock().unlock();
                    }
                }
            } finally {
                if (allFlushed) {
                    flushDone.setValue(Boolean.TRUE);
                } else if (error != null) {
                    // We'll close the session (or it is closing already). Pretend we had written everything.
                    flushDone.setValue(error);
                    if (error instanceof Throwable) {
                        session.exceptionCaught((Throwable) error);
                    }
                } else if (newFuture != null) {
                    newFuture.addListener(f -> {
                        Throwable failed = f.getException();
                        flushDone.setValue(failed != null ? failed : Boolean.TRUE);
                    });
                }
                // Connect all futures of packets that we wrote. We do this at the end instead of one-by-one inside the
                // loop to minimize the risk that woken up threads waiting on these futures queue up additional packets.
                pendingFutures.forEach(e -> e.getValue().addListener(e.getKey()));
            }
        });
    }