private void tryDrainIterator()

in servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherConcatMapIterable.java [174:255]


        private void tryDrainIterator(ErrorHandlingStrategyInDrain errorHandlingStrategyInDrain) {
            boolean hasNext = false;
            boolean thrown = false;
            boolean terminated = false;
            boolean releasedLock = false;
            do {
                if (!tryAcquireLock(emittingUpdater, this)) {
                    break;
                }
                long currRequestN = this.requestN;
                final long initialRequestN = currRequestN;
                try {
                    try {
                        while ((hasNext = currentIterator.hasNext()) && currRequestN > 0) {
                            --currRequestN;
                            target.onNext(currentIterator.next());
                        }
                    } catch (Throwable cause) {
                        switch (errorHandlingStrategyInDrain) {
                            case PropagateAndCancel:
                                terminated = true;
                                safeOnError(target, cause);
                                doCancel();
                                return; // hard return to avoid potential for duplicate terminal events
                            case Propagate:
                                terminated = true;
                                safeOnError(target, cause);
                                tryClose(currentIterator);
                                return; // hard return to avoid potential for duplicate terminal events
                            case Throw:
                                // since we only request 1 at a time we maybe holding requestN demand, in this case we
                                // discard the current iterator and request 1 more from upstream (if there is demand).
                                hasNext = false;
                                thrown = true;
                                final Iterator<? extends U> currentIterator = this.currentIterator;
                                this.currentIterator = EmptyIterator.instance();
                                tryClose(currentIterator);
                                // let the exception propagate so the upstream source can do the cleanup.
                                throw cause;
                            default:
                                throw new IllegalArgumentException("Unknown error handling strategy: " +
                                        errorHandlingStrategyInDrain);
                        }
                    }
                    if (terminalNotification != null && !hasNext) {
                        terminated = true;
                        terminalNotification.terminate(target);
                    }
                } finally {
                    // If we terminated we don't want to unlock, otherwise we may propagate duplicate terminal signals.
                    if (!terminated) {
                        currRequestN = requestNUpdater.accumulateAndGet(this, currRequestN - initialRequestN,
                                FlowControlUtils::addWithOverflowProtectionIfNotNegative);
                        try {
                            if (currRequestN == CANCEL_PENDING) {
                                terminated = true;
                                // If we throw we expect an error to be propagated, so we are effectively cancelled.
                                requestN = CANCELLED;
                                if (!thrown) {
                                    // We have been cancelled while we held the lock, do the cancel operation.
                                    doCancel();
                                }
                            } else if (terminalNotification == null && !hasNext && currRequestN > 0 &&
                                    (currentIterator != EmptyIterator.instance() || thrown)) {
                                // We only request 1 at a time, and therefore we don't have any outstanding demand, so
                                // we will not be getting an onNext call, so we write to the currentIterator variable
                                // here before we unlock emitting so visibility to other threads should be taken care of
                                // by the write to emitting below (and later read).
                                currentIterator = EmptyIterator.instance();
                                if (sourceSubscription != null) {
                                    sourceSubscription.request(1);
                                }
                            }
                        } finally {
                            // The lock must be released after we interact with the subscription for thread safety
                            // reasons.
                            releasedLock = releaseLock(emittingUpdater, this);
                        }
                    }
                }
            } while (!terminated && !releasedLock);
        }