override def initialAttributes = DefaultAttributes.groupedWeightedWithin and SourceLocation.forLambda()

in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala [1754:1884]


  override def initialAttributes = DefaultAttributes.groupedWeightedWithin and SourceLocation.forLambda(costFn)

  val shape = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogic(shape) with InHandler with OutHandler {

      private val buf: VectorBuilder[T] = new VectorBuilder
      private var pending: T = null.asInstanceOf[T]
      private var pendingWeight: Long = 0L
      // True if:
      // - buf is nonEmpty
      //       AND
      // - (timer fired
      //        OR
      //    totalWeight >= maxWeight
      //        OR
      //    pending != null
      //        OR
      //    upstream completed)
      private var pushEagerly = false
      private var groupEmitted = true
      private var finished = false
      private var totalWeight = 0L
      private var totalNumber = 0
      private var hasElements = false
      private val contextPropagation = ContextPropagation()

      override def preStart() = {
        scheduleWithFixedDelay(GroupedWeightedWithin.groupedWeightedWithinTimer, interval, interval)
        pull(in)
      }

      private def nextElement(elem: T): Unit = {
        groupEmitted = false
        val cost = costFn(elem)
        if (cost < 0L)
          failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed"))
        else {
          hasElements = true
          // if there is place (both weight and number) for `elem` in the current group
          if (totalWeight + cost <= maxWeight && totalNumber + 1 <= maxNumber) {
            buf += elem
            totalWeight += cost
            totalNumber += 1;

            // if potentially there is a place (both weight and number) for one more element in the current group
            if (totalWeight < maxWeight && totalNumber < maxNumber) pull(in)
            else {
              if (!isAvailable(out)) {
                // we should emit group when downstream becomes available
                pushEagerly = true
                // we want to pull anyway, since we allow for zero weight elements
                // but since `emitGroup()` will pull internally (by calling `startNewGroup()`)
                // we also have to pull if downstream hasn't yet requested an element.
                pull(in)
              } else {
                scheduleWithFixedDelay(GroupedWeightedWithin.groupedWeightedWithinTimer, interval, interval)
                emitGroup()
              }
            }
          } else {
            // if there is a single heavy element that weighs more than the limit
            if (totalWeight == 0L && totalNumber == 0) {
              buf += elem
              totalWeight += cost
              totalNumber += 1;
              pushEagerly = true
            } else {
              pending = elem
              pendingWeight = cost
            }
            scheduleWithFixedDelay(GroupedWeightedWithin.groupedWeightedWithinTimer, interval, interval)
            tryCloseGroup()
          }
        }
      }

      private def tryCloseGroup(): Unit = {
        if (isAvailable(out)) emitGroup()
        else if (pending != null || finished) pushEagerly = true
      }

      private def emitGroup(): Unit = {
        groupEmitted = true
        contextPropagation.resumeContext()
        push(out, buf.result())
        buf.clear()
        if (!finished) startNewGroup()
        else if (pending != null) emit(out, Vector(pending), () => completeStage())
        else completeStage()
      }

      private def startNewGroup(): Unit = {
        if (pending != null) {
          totalWeight = pendingWeight
          totalNumber = 1
          pendingWeight = 0L
          buf += pending
          pending = null.asInstanceOf[T]
          groupEmitted = false
        } else {
          totalWeight = 0L
          totalNumber = 0
          hasElements = false
        }
        pushEagerly = false
        if (isAvailable(in)) nextElement(grab(in))
        else if (!hasBeenPulled(in)) pull(in)
      }

      override def onPush(): Unit = {
        contextPropagation.suspendContext()
        if (pending == null) nextElement(grab(in)) // otherwise keep the element for next round
      }

      override def onPull(): Unit = if (pushEagerly) emitGroup()

      override def onUpstreamFinish(): Unit = {
        finished = true
        if (groupEmitted) completeStage()
        else tryCloseGroup()
      }

      override protected def onTimer(timerKey: Any) = if (hasElements) {
        if (isAvailable(out)) emitGroup()
        else pushEagerly = true
      }

      setHandlers(in, out, this)
    }