override def onNext()

in stream/src/main/scala/org/apache/pekko/stream/impl/StreamLayout.scala [325:383]


  override def onNext(t: T): Unit =
    if (t == null) {
      val ex = elementMustNotBeNullException
      if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.onNext(null)")
      @tailrec def rec(): Unit =
        get() match {
          case x @ (null | _: Subscription) =>
            if (!compareAndSet(x, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec()
          case s: Subscriber[_] =>
            try s.onError(ex)
            catch { case NonFatal(_) => }
            finally set(Inert)
          case Both(s) =>
            try s.onError(ex)
            catch { case NonFatal(_) => }
            finally set(Inert)
          case _ => // spec violation or cancellation race, but nothing we can do
        }
      rec()
      throw ex // must throw NPE, rule 2:13
    } else {
      @tailrec def rec(): Unit = {
        get() match {
          case h: HasActualSubscriber =>
            val s = h.subscriber
            try {
              if (VirtualProcessor.Debug)
                println(s"VirtualPublisher#$hashCode(${h.getClass.getName}($s)).onNext($t).rec()")
              s.onNext(t)
            } catch {
              case NonFatal(e) =>
                if (VirtualProcessor.Debug)
                  println(s"VirtualPublisher#$hashCode(Both($s)).onNext($t) threw, spec violation -> Inert")
                set(Inert)
                throw new IllegalStateException("Subscriber threw exception, this is in violation of rule 2:13", e)
            }

          case s: Subscriber[_] => // spec violation
            if (VirtualProcessor.Debug)
              println(s"VirtualPublisher#$hashCode($s).onNext($t).rec(): spec violation -> Inert")
            val ex = new IllegalStateException(noDemand)
            getAndSet(Inert) match {
              case Inert => // nothing to be done
              case _     => ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s)
            }
            throw ex
          case Inert | _: Publisher[_] =>
            if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Inert|Publisher).onNext($t).rec(): nop")
          // nothing to be done
          case other =>
            if (VirtualProcessor.Debug)
              println(s"VirtualPublisher#$hashCode($other).onNext($t).rec() -> ErrorPublisher")
            val pub = ErrorPublisher(new IllegalStateException(noDemand), "failed-VirtualPublisher")
            if (!compareAndSet(other, pub)) rec()
            else throw pub.t
        }
      }
      rec()
    }