override def initialAttributes: Attributes = DefaultAttributes.split and SourceLocation.forLambda()

in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala [501:663]


  override def initialAttributes: Attributes = DefaultAttributes.split and SourceLocation.forLambda(p)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogic(shape) with InHandler with OutHandler { parent =>
      import Split._

      private val SubscriptionTimer = "SubstreamSubscriptionTimer"

      private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider

      private val timeout: FiniteDuration =
        inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
      private var substreamSource: SubSourceOutlet[T] = null
      private var substreamWaitingToBePushed = false
      private var substreamCancelled = false

      def propagateSubstreamCancel(ex: Throwable): Boolean =
        decider(ex) match {
          case Supervision.Stop    => true
          case Supervision.Resume  => false
          case Supervision.Restart => false
        }

      override def onPull(): Unit = {
        if (substreamSource eq null) {
          // can be already pulled from substream in case split after
          if (!hasBeenPulled(in)) pull(in)
        } else if (substreamWaitingToBePushed) pushSubstreamSource()
      }

      override def onDownstreamFinish(cause: Throwable): Unit = {
        // If the substream is already cancelled or it has not been handed out, we can go away
        if ((substreamSource eq null) || substreamWaitingToBePushed || substreamCancelled) cancelStage(cause)
      }

      override def onPush(): Unit = {
        val handler = new SubstreamHandler
        val elem = grab(in)

        decision match {
          case SplitAfter if p(elem) =>
            push(out, Source.single(elem))
          // Next pull will come from the next substream that we will open
          case _ =>
            handler.firstElem = elem
        }

        handOver(handler)
      }

      override def onUpstreamFinish(): Unit = completeStage()

      // initial input handler
      setHandlers(in, out, this)

      private def handOver(handler: SubstreamHandler): Unit = {
        if (isClosed(out)) completeStage()
        else {
          substreamSource = new SubSourceOutlet[T]("SplitSource")
          substreamSource.setHandler(handler)
          substreamCancelled = false
          setHandler(in, handler)
          setKeepGoing(enabled = handler.hasInitialElement)

          if (isAvailable(out)) {
            if (decision == SplitBefore || handler.hasInitialElement) pushSubstreamSource() else pull(in)
          } else substreamWaitingToBePushed = true
        }
      }

      private def pushSubstreamSource(): Unit = {
        push(out, Source.fromGraph(substreamSource.source))
        scheduleOnce(SubscriptionTimer, timeout)
        substreamWaitingToBePushed = false
      }

      override protected def onTimer(timerKey: Any): Unit = substreamSource.timeout(timeout)

      private class SubstreamHandler extends InHandler with OutHandler {

        var firstElem: T = null.asInstanceOf[T]

        def hasInitialElement: Boolean = firstElem.asInstanceOf[AnyRef] ne null
        private var willCompleteAfterInitialElement = false

        // Substreams are always assumed to be pushable position when we enter this method
        private def closeThis(handler: SubstreamHandler, currentElem: T): Unit = {
          decision match {
            case SplitAfter =>
              if (!substreamCancelled) {
                substreamSource.push(currentElem)
                substreamSource.complete()
              }
            case SplitBefore =>
              handler.firstElem = currentElem
              if (!substreamCancelled) substreamSource.complete()
          }
        }

        override def onPull(): Unit = {
          cancelTimer(SubscriptionTimer)
          if (hasInitialElement) {
            substreamSource.push(firstElem)
            firstElem = null.asInstanceOf[T]
            setKeepGoing(false)
            if (willCompleteAfterInitialElement) {
              substreamSource.complete()
              completeStage()
            }
          } else pull(in)
        }

        override def onDownstreamFinish(cause: Throwable): Unit = {
          substreamCancelled = true
          if (isClosed(in) || propagateSubstreamCancel(cause)) {
            cancelStage(cause)
          } else {
            // Start draining
            if (!hasBeenPulled(in)) pull(in)
          }
        }

        override def onPush(): Unit = {
          val elem = grab(in)
          try {
            if (p(elem)) {
              val handler = new SubstreamHandler
              closeThis(handler, elem)
              if (decision == SplitBefore) handOver(handler)
              else {
                substreamSource = null
                setHandler(in, parent)
                pull(in)
              }
            } else {
              // Drain into the void
              if (substreamCancelled) pull(in)
              else substreamSource.push(elem)
            }
          } catch {
            case NonFatal(ex) =>
              decider(ex) match {
                case Supervision.Resume  => pull(in)
                case Supervision.Stop    => onUpstreamFailure(ex)
                case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
              }
          }
        }

        override def onUpstreamFinish(): Unit =
          if (hasInitialElement) willCompleteAfterInitialElement = true
          else {
            substreamSource.complete()
            completeStage()
          }

        override def onUpstreamFailure(ex: Throwable): Unit = {
          substreamSource.fail(ex)
          failStage(ex)
        }

      }
    }