override def createLogic()

in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala [282:457]


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogic(shape) with OutHandler with InHandler {
      parent =>
      lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
      private val activeSubstreamsMap = new java.util.HashMap[Any, SubstreamSource]()
      private val closedSubstreams =
        if (allowClosedSubstreamRecreation) Collections.unmodifiableSet(Collections.emptySet[Any])
        else new java.util.HashSet[Any]()
      private val timeout: FiniteDuration =
        inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
      private var substreamWaitingToBePushed: Option[SubstreamSource] = None
      private var nextElementKey: K = null.asInstanceOf[K]
      private var nextElementValue: T = null.asInstanceOf[T]
      private var _nextId = 0
      private val substreamsJustStared = new java.util.HashSet[Any]()
      private var firstPushCounter: Int = 0

      private val tooManySubstreamsOpenException = new TooManySubstreamsOpenException

      private def nextId(): Long = { _nextId += 1; _nextId }

      private def hasNextElement = nextElementKey != null

      private def clearNextElement(): Unit = {
        nextElementKey = null.asInstanceOf[K]
        nextElementValue = null.asInstanceOf[T]
      }

      private def tryCompleteAll(): Boolean =
        if (activeSubstreamsMap.isEmpty || (!hasNextElement && firstPushCounter == 0)) {
          for (value <- activeSubstreamsMap.values().asScala) value.complete()
          completeStage()
          true
        } else false

      private def tryCancel(cause: Throwable): Boolean =
        // if there's no active substreams or there's only one but it's not been pushed yet
        if (activeSubstreamsMap.isEmpty || (activeSubstreamsMap.size == 1 && substreamWaitingToBePushed.isDefined)) {
          cancelStage(cause)
          true
        } else false

      private def fail(ex: Throwable): Unit = {
        for (value <- activeSubstreamsMap.values().asScala) value.fail(ex)
        failStage(ex)
      }

      private def needToPull: Boolean =
        !(hasBeenPulled(in) || isClosed(in) || hasNextElement || substreamWaitingToBePushed.nonEmpty)

      override def onPull(): Unit = {
        substreamWaitingToBePushed match {
          case Some(substreamSource) =>
            push(out, Source.fromGraph(substreamSource.source))
            scheduleOnce(substreamSource.key, timeout)
            substreamWaitingToBePushed = None
          case None =>
            if (hasNextElement) {
              val subSubstreamSource = activeSubstreamsMap.get(nextElementKey)
              if (subSubstreamSource.isAvailable) {
                subSubstreamSource.push(nextElementValue)
                clearNextElement()
              }
            } else if (!hasBeenPulled(in)) tryPull(in)
        }
      }

      override def onUpstreamFailure(ex: Throwable): Unit = fail(ex)

      override def onUpstreamFinish(): Unit = if (!tryCompleteAll()) setKeepGoing(true)

      override def onDownstreamFinish(cause: Throwable): Unit = if (!tryCancel(cause)) setKeepGoing(true)

      override def onPush(): Unit =
        try {
          val elem = grab(in)
          val key = keyFor(elem)
          require(key != null, "Key cannot be null")
          val substreamSource = activeSubstreamsMap.get(key)
          if (substreamSource != null) {
            if (substreamSource.isAvailable) substreamSource.push(elem)
            else {
              nextElementKey = key
              nextElementValue = elem
            }
          } else {
            if (closedSubstreams.contains(key)) {
              // If the sub stream is already closed, we just skip the current element and pull the next element.
              pull(in)
            } else if (activeSubstreamsMap.size + closedSubstreams.size == maxSubstreams) {
              throw tooManySubstreamsOpenException
            } else runSubstream(key, elem)
          }
        } catch {
          case NonFatal(ex) =>
            decider(ex) match {
              case Supervision.Stop                         => fail(ex)
              case Supervision.Resume | Supervision.Restart => if (!hasBeenPulled(in)) pull(in)
            }
        }

      private def runSubstream(key: K, value: T): Unit = {
        val substreamSource = new SubstreamSource("GroupBySource " + nextId(), key, value)
        activeSubstreamsMap.put(key, substreamSource)
        firstPushCounter += 1
        if (isAvailable(out)) {
          push(out, Source.fromGraph(substreamSource.source))
          scheduleOnce(key, timeout)
          substreamWaitingToBePushed = None
        } else {
          setKeepGoing(true)
          substreamsJustStared.add(substreamSource)
          substreamWaitingToBePushed = Some(substreamSource)
        }
      }

      override protected def onTimer(timerKey: Any): Unit = {
        val substreamSource = activeSubstreamsMap.get(timerKey)
        if (substreamSource != null) {
          if (!allowClosedSubstreamRecreation) {
            closedSubstreams.add(timerKey)
          }
          activeSubstreamsMap.remove(timerKey)
          if (isClosed(in)) tryCompleteAll()
        }
      }

      setHandlers(in, out, this)

      private class SubstreamSource(name: String, val key: K, var firstElement: T)
          extends SubSourceOutlet[T](name)
          with OutHandler {
        def firstPush(): Boolean = firstElement != null
        def hasNextForSubSource = hasNextElement && nextElementKey == key
        private def completeSubStream(): Unit = {
          complete()
          activeSubstreamsMap.remove(key)
          if (!allowClosedSubstreamRecreation) {
            closedSubstreams.add(key)
          }
        }

        private def tryCompleteHandler(): Unit = {
          if (parent.isClosed(in) && !hasNextForSubSource) {
            completeSubStream()
            tryCompleteAll()
          }
        }

        override def onPull(): Unit = {
          cancelTimer(key)
          if (firstPush()) {
            firstPushCounter -= 1
            push(firstElement)
            firstElement = null.asInstanceOf[T]
            substreamsJustStared.remove(this)
            if (substreamsJustStared.isEmpty) setKeepGoing(false)
          } else if (hasNextForSubSource) {
            push(nextElementValue)
            clearNextElement()
          } else if (needToPull) pull(in)

          tryCompleteHandler()
        }

        override def onDownstreamFinish(cause: Throwable): Unit = {
          if (hasNextElement && nextElementKey == key) clearNextElement()
          if (firstPush()) firstPushCounter -= 1
          completeSubStream()
          if (parent.isClosed(out)) tryCancel(cause)
          if (parent.isClosed(in)) tryCompleteAll() else if (needToPull) pull(in)
        }

        setHandler(this)
      }
    }