in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala [1754:1884]
override def initialAttributes = DefaultAttributes.groupedWeightedWithin and SourceLocation.forLambda(costFn)
val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private val buf: VectorBuilder[T] = new VectorBuilder
private var pending: T = null.asInstanceOf[T]
private var pendingWeight: Long = 0L
// True if:
// - buf is nonEmpty
// AND
// - (timer fired
// OR
// totalWeight >= maxWeight
// OR
// pending != null
// OR
// upstream completed)
private var pushEagerly = false
private var groupEmitted = true
private var finished = false
private var totalWeight = 0L
private var totalNumber = 0
private var hasElements = false
private val contextPropagation = ContextPropagation()
override def preStart() = {
scheduleWithFixedDelay(GroupedWeightedWithin.groupedWeightedWithinTimer, interval, interval)
pull(in)
}
private def nextElement(elem: T): Unit = {
groupEmitted = false
val cost = costFn(elem)
if (cost < 0L)
failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed"))
else {
hasElements = true
// if there is place (both weight and number) for `elem` in the current group
if (totalWeight + cost <= maxWeight && totalNumber + 1 <= maxNumber) {
buf += elem
totalWeight += cost
totalNumber += 1;
// if potentially there is a place (both weight and number) for one more element in the current group
if (totalWeight < maxWeight && totalNumber < maxNumber) pull(in)
else {
if (!isAvailable(out)) {
// we should emit group when downstream becomes available
pushEagerly = true
// we want to pull anyway, since we allow for zero weight elements
// but since `emitGroup()` will pull internally (by calling `startNewGroup()`)
// we also have to pull if downstream hasn't yet requested an element.
pull(in)
} else {
scheduleWithFixedDelay(GroupedWeightedWithin.groupedWeightedWithinTimer, interval, interval)
emitGroup()
}
}
} else {
// if there is a single heavy element that weighs more than the limit
if (totalWeight == 0L && totalNumber == 0) {
buf += elem
totalWeight += cost
totalNumber += 1;
pushEagerly = true
} else {
pending = elem
pendingWeight = cost
}
scheduleWithFixedDelay(GroupedWeightedWithin.groupedWeightedWithinTimer, interval, interval)
tryCloseGroup()
}
}
}
private def tryCloseGroup(): Unit = {
if (isAvailable(out)) emitGroup()
else if (pending != null || finished) pushEagerly = true
}
private def emitGroup(): Unit = {
groupEmitted = true
contextPropagation.resumeContext()
push(out, buf.result())
buf.clear()
if (!finished) startNewGroup()
else if (pending != null) emit(out, Vector(pending), () => completeStage())
else completeStage()
}
private def startNewGroup(): Unit = {
if (pending != null) {
totalWeight = pendingWeight
totalNumber = 1
pendingWeight = 0L
buf += pending
pending = null.asInstanceOf[T]
groupEmitted = false
} else {
totalWeight = 0L
totalNumber = 0
hasElements = false
}
pushEagerly = false
if (isAvailable(in)) nextElement(grab(in))
else if (!hasBeenPulled(in)) pull(in)
}
override def onPush(): Unit = {
contextPropagation.suspendContext()
if (pending == null) nextElement(grab(in)) // otherwise keep the element for next round
}
override def onPull(): Unit = if (pushEagerly) emitGroup()
override def onUpstreamFinish(): Unit = {
finished = true
if (groupEmitted) completeStage()
else tryCloseGroup()
}
override protected def onTimer(timerKey: Any) = if (hasElements) {
if (isAvailable(out)) emitGroup()
else pushEagerly = true
}
setHandlers(in, out, this)
}