in servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherConcatMapIterable.java [174:255]
private void tryDrainIterator(ErrorHandlingStrategyInDrain errorHandlingStrategyInDrain) {
boolean hasNext = false;
boolean thrown = false;
boolean terminated = false;
boolean releasedLock = false;
do {
if (!tryAcquireLock(emittingUpdater, this)) {
break;
}
long currRequestN = this.requestN;
final long initialRequestN = currRequestN;
try {
try {
while ((hasNext = currentIterator.hasNext()) && currRequestN > 0) {
--currRequestN;
target.onNext(currentIterator.next());
}
} catch (Throwable cause) {
switch (errorHandlingStrategyInDrain) {
case PropagateAndCancel:
terminated = true;
safeOnError(target, cause);
doCancel();
return; // hard return to avoid potential for duplicate terminal events
case Propagate:
terminated = true;
safeOnError(target, cause);
tryClose(currentIterator);
return; // hard return to avoid potential for duplicate terminal events
case Throw:
// since we only request 1 at a time we maybe holding requestN demand, in this case we
// discard the current iterator and request 1 more from upstream (if there is demand).
hasNext = false;
thrown = true;
final Iterator<? extends U> currentIterator = this.currentIterator;
this.currentIterator = EmptyIterator.instance();
tryClose(currentIterator);
// let the exception propagate so the upstream source can do the cleanup.
throw cause;
default:
throw new IllegalArgumentException("Unknown error handling strategy: " +
errorHandlingStrategyInDrain);
}
}
if (terminalNotification != null && !hasNext) {
terminated = true;
terminalNotification.terminate(target);
}
} finally {
// If we terminated we don't want to unlock, otherwise we may propagate duplicate terminal signals.
if (!terminated) {
currRequestN = requestNUpdater.accumulateAndGet(this, currRequestN - initialRequestN,
FlowControlUtils::addWithOverflowProtectionIfNotNegative);
try {
if (currRequestN == CANCEL_PENDING) {
terminated = true;
// If we throw we expect an error to be propagated, so we are effectively cancelled.
requestN = CANCELLED;
if (!thrown) {
// We have been cancelled while we held the lock, do the cancel operation.
doCancel();
}
} else if (terminalNotification == null && !hasNext && currRequestN > 0 &&
(currentIterator != EmptyIterator.instance() || thrown)) {
// We only request 1 at a time, and therefore we don't have any outstanding demand, so
// we will not be getting an onNext call, so we write to the currentIterator variable
// here before we unlock emitting so visibility to other threads should be taken care of
// by the write to emitting below (and later read).
currentIterator = EmptyIterator.instance();
if (sourceSubscription != null) {
sourceSubscription.request(1);
}
}
} finally {
// The lock must be released after we interact with the subscription for thread safety
// reasons.
releasedLock = releaseLock(emittingUpdater, this);
}
}
}
} while (!terminated && !releasedLock);
}