in stream/src/main/scala/org/apache/pekko/stream/impl/StreamLayout.scala [325:383]
override def onNext(t: T): Unit =
if (t == null) {
val ex = elementMustNotBeNullException
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.onNext(null)")
@tailrec def rec(): Unit =
get() match {
case x @ (null | _: Subscription) =>
if (!compareAndSet(x, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec()
case s: Subscriber[_] =>
try s.onError(ex)
catch { case NonFatal(_) => }
finally set(Inert)
case Both(s) =>
try s.onError(ex)
catch { case NonFatal(_) => }
finally set(Inert)
case _ => // spec violation or cancellation race, but nothing we can do
}
rec()
throw ex // must throw NPE, rule 2:13
} else {
@tailrec def rec(): Unit = {
get() match {
case h: HasActualSubscriber =>
val s = h.subscriber
try {
if (VirtualProcessor.Debug)
println(s"VirtualPublisher#$hashCode(${h.getClass.getName}($s)).onNext($t).rec()")
s.onNext(t)
} catch {
case NonFatal(e) =>
if (VirtualProcessor.Debug)
println(s"VirtualPublisher#$hashCode(Both($s)).onNext($t) threw, spec violation -> Inert")
set(Inert)
throw new IllegalStateException("Subscriber threw exception, this is in violation of rule 2:13", e)
}
case s: Subscriber[_] => // spec violation
if (VirtualProcessor.Debug)
println(s"VirtualPublisher#$hashCode($s).onNext($t).rec(): spec violation -> Inert")
val ex = new IllegalStateException(noDemand)
getAndSet(Inert) match {
case Inert => // nothing to be done
case _ => ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s)
}
throw ex
case Inert | _: Publisher[_] =>
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Inert|Publisher).onNext($t).rec(): nop")
// nothing to be done
case other =>
if (VirtualProcessor.Debug)
println(s"VirtualPublisher#$hashCode($other).onNext($t).rec() -> ErrorPublisher")
val pub = ErrorPublisher(new IllegalStateException(noDemand), "failed-VirtualPublisher")
if (!compareAndSet(other, pub)) rec()
else throw pub.t
}
}
rec()
}