override def createLogic()

in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala [1073:1190]


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler {

      lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider

      private var agg: Out = null.asInstanceOf[Out]
      private var left: Long = max
      private var pending: In = null.asInstanceOf[In]
      private val contextPropagation = ContextPropagation()

      private def flush(): Unit = {
        if (agg != null) {
          push(out, agg)
          left = max
        }
        if (pending != null) {
          try {
            agg = seed(pending)
            left -= costFn(pending)
            pending = null.asInstanceOf[In]
          } catch {
            case NonFatal(ex) =>
              decider(ex) match {
                case Supervision.Stop    => failStage(ex)
                case Supervision.Restart => restartState()
                case Supervision.Resume =>
                  pending = null.asInstanceOf[In]
              }
          }
        } else {
          agg = null.asInstanceOf[Out]
        }
      }

      override def preStart() = pull(in)

      def onPush(): Unit = {
        val elem = grab(in)
        val cost = costFn(elem)
        contextPropagation.suspendContext()

        if (agg == null) {
          try {
            agg = seed(elem)
            left -= cost
          } catch {
            case NonFatal(ex) =>
              decider(ex) match {
                case Supervision.Stop => failStage(ex)
                case Supervision.Restart =>
                  restartState()
                case Supervision.Resume =>
              }
          }
        } else if (left < cost) {
          pending = elem
        } else {
          try {
            agg = aggregate(agg, elem)
            left -= cost
          } catch {
            case NonFatal(ex) =>
              decider(ex) match {
                case Supervision.Stop => failStage(ex)
                case Supervision.Restart =>
                  restartState()
                case Supervision.Resume =>
              }
          }
        }

        if (isAvailable(out)) flush()
        if (pending == null) pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        if (agg == null) completeStage()
      }

      def onPull(): Unit = {
        if (agg == null) {
          if (isClosed(in)) completeStage()
          else if (!hasBeenPulled(in)) pull(in)
        } else if (isClosed(in)) {
          contextPropagation.resumeContext()
          push(out, agg)
          if (pending == null) completeStage()
          else {
            try {
              agg = seed(pending)
            } catch {
              case NonFatal(ex) =>
                decider(ex) match {
                  case Supervision.Stop   => failStage(ex)
                  case Supervision.Resume =>
                  case Supervision.Restart =>
                    restartState()
                    if (!hasBeenPulled(in)) pull(in)
                }
            }
            pending = null.asInstanceOf[In]
          }
        } else {
          contextPropagation.resumeContext()
          flush()
          if (!hasBeenPulled(in)) pull(in)
        }

      }

      private def restartState(): Unit = {
        agg = null.asInstanceOf[Out]
        left = max
        pending = null.asInstanceOf[In]
      }

      setHandlers(in, out, this)
    }