def createLogic()

in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala [668:746]


  def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {
      val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider

      private var aggregator: Out = zero
      private var aggregating: Future[Out] = Future.successful(aggregator)

      private def onRestart(@unused t: Throwable): Unit = {
        aggregator = zero
      }

      private val futureCB = getAsyncCallback[Try[Out]] {
        case Success(update) if update != null =>
          aggregator = update

          if (isClosed(in)) {
            push(out, update)
            completeStage()
          } else if (isAvailable(out) && !hasBeenPulled(in)) tryPull(in)

        case other =>
          val ex = other match {
            case Failure(t) => t
            case Success(null) =>
              ReactiveStreamsCompliance.elementMustNotBeNullException
            case Success(_) =>
              throw new IllegalArgumentException() // won't happen, compiler exhaustiveness check pleaser
          }
          val supervision = decider(ex)

          if (supervision == Supervision.Stop) failStage(ex)
          else {
            if (supervision == Supervision.Restart) onRestart(ex)

            if (isClosed(in)) {
              push(out, aggregator)
              completeStage()
            } else if (isAvailable(out) && !hasBeenPulled(in)) tryPull(in)
          }
      }.invoke _

      def onPush(): Unit = {
        try {
          aggregating = f(aggregator, grab(in))
          handleAggregatingValue()
        } catch {
          case NonFatal(ex) =>
            decider(ex) match {
              case Supervision.Stop => failStage(ex)
              case supervision => {
                supervision match {
                  case Supervision.Restart => onRestart(ex)
                  case _                   => () // just ignore on Resume
                }

                tryPull(in)
              }
            }
        }
      }

      override def onUpstreamFinish(): Unit = {
        handleAggregatingValue()
      }

      def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)

      private def handleAggregatingValue(): Unit = {
        aggregating.value match {
          case Some(result) => futureCB(result) // already completed
          case _            => aggregating.onComplete(futureCB)(ExecutionContexts.parasitic)
        }
      }

      setHandlers(in, out, this)

      override def toString =
        s"FoldAsync.Logic(completed=${aggregating.isCompleted})"
    }