override protected def initialAttributes: Attributes = DefaultAttributes.split and SourceLocation.forLambda()

in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala [500:652]


  override protected def initialAttributes: Attributes = DefaultAttributes.split and SourceLocation.forLambda(p)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
    import Split._

    private val SubscriptionTimer = "SubstreamSubscriptionTimer"

    private val timeout: FiniteDuration =
      inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
    private var substreamSource: SubSourceOutlet[T] = null
    private var substreamWaitingToBePushed = false
    private var substreamCancelled = false

    setHandler(
      out,
      new OutHandler {
        override def onPull(): Unit = {
          if (substreamSource eq null) {
            // can be already pulled from substream in case split after
            if (!hasBeenPulled(in)) pull(in)
          } else if (substreamWaitingToBePushed) pushSubstreamSource()
        }

        override def onDownstreamFinish(cause: Throwable): Unit = {
          // If the substream is already cancelled or it has not been handed out, we can go away
          if ((substreamSource eq null) || substreamWaitingToBePushed || substreamCancelled) cancelStage(cause)
        }
      })

    val initInHandler = new InHandler {
      override def onPush(): Unit = {
        val handler = new SubstreamHandler
        val elem = grab(in)

        decision match {
          case SplitAfter if p(elem) =>
            push(out, Source.single(elem))
          // Next pull will come from the next substream that we will open
          case _ =>
            handler.firstElem = elem
        }

        handOver(handler)
      }
      override def onUpstreamFinish(): Unit = completeStage()
    }

    // initial input handler
    setHandler(in, initInHandler)

    private def handOver(handler: SubstreamHandler): Unit = {
      if (isClosed(out)) completeStage()
      else {
        substreamSource = new SubSourceOutlet[T]("SplitSource")
        substreamSource.setHandler(handler)
        substreamCancelled = false
        setHandler(in, handler)
        setKeepGoing(enabled = handler.hasInitialElement)

        if (isAvailable(out)) {
          if (decision == SplitBefore || handler.hasInitialElement) pushSubstreamSource() else pull(in)
        } else substreamWaitingToBePushed = true
      }
    }

    private def pushSubstreamSource(): Unit = {
      push(out, Source.fromGraph(substreamSource.source))
      scheduleOnce(SubscriptionTimer, timeout)
      substreamWaitingToBePushed = false
    }

    override protected def onTimer(timerKey: Any): Unit = substreamSource.timeout(timeout)

    private class SubstreamHandler extends InHandler with OutHandler {

      var firstElem: T = null.asInstanceOf[T]

      def hasInitialElement: Boolean = firstElem.asInstanceOf[AnyRef] ne null
      private var willCompleteAfterInitialElement = false

      // Substreams are always assumed to be pushable position when we enter this method
      private def closeThis(handler: SubstreamHandler, currentElem: T): Unit = {
        decision match {
          case SplitAfter =>
            if (!substreamCancelled) {
              substreamSource.push(currentElem)
              substreamSource.complete()
            }
          case SplitBefore =>
            handler.firstElem = currentElem
            if (!substreamCancelled) substreamSource.complete()
        }
      }

      override def onPull(): Unit = {
        cancelTimer(SubscriptionTimer)
        if (hasInitialElement) {
          substreamSource.push(firstElem)
          firstElem = null.asInstanceOf[T]
          setKeepGoing(false)
          if (willCompleteAfterInitialElement) {
            substreamSource.complete()
            completeStage()
          }
        } else pull(in)
      }

      override def onDownstreamFinish(cause: Throwable): Unit = {
        substreamCancelled = true
        if (isClosed(in) || propagateSubstreamCancel) {
          cancelStage(cause)
        } else {
          // Start draining
          if (!hasBeenPulled(in)) pull(in)
        }
      }

      override def onPush(): Unit = {
        val elem = grab(in)
        try {
          if (p(elem)) {
            val handler = new SubstreamHandler
            closeThis(handler, elem)
            if (decision == SplitBefore) handOver(handler)
            else {
              substreamSource = null
              setHandler(in, initInHandler)
              pull(in)
            }
          } else {
            // Drain into the void
            if (substreamCancelled) pull(in)
            else substreamSource.push(elem)
          }
        } catch {
          case NonFatal(ex) => onUpstreamFailure(ex)
        }
      }

      override def onUpstreamFinish(): Unit =
        if (hasInitialElement) willCompleteAfterInitialElement = true
        else {
          substreamSource.complete()
          completeStage()
        }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        substreamSource.fail(ex)
        failStage(ex)
      }

    }
  }