in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala [668:746]
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var aggregator: Out = zero
private var aggregating: Future[Out] = Future.successful(aggregator)
private def onRestart(@unused t: Throwable): Unit = {
aggregator = zero
}
private val futureCB = getAsyncCallback[Try[Out]] {
case Success(update) if update != null =>
aggregator = update
if (isClosed(in)) {
push(out, update)
completeStage()
} else if (isAvailable(out) && !hasBeenPulled(in)) tryPull(in)
case other =>
val ex = other match {
case Failure(t) => t
case Success(null) =>
ReactiveStreamsCompliance.elementMustNotBeNullException
case Success(_) =>
throw new IllegalArgumentException() // won't happen, compiler exhaustiveness check pleaser
}
val supervision = decider(ex)
if (supervision == Supervision.Stop) failStage(ex)
else {
if (supervision == Supervision.Restart) onRestart(ex)
if (isClosed(in)) {
push(out, aggregator)
completeStage()
} else if (isAvailable(out) && !hasBeenPulled(in)) tryPull(in)
}
}.invoke _
def onPush(): Unit = {
try {
aggregating = f(aggregator, grab(in))
handleAggregatingValue()
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop => failStage(ex)
case supervision => {
supervision match {
case Supervision.Restart => onRestart(ex)
case _ => () // just ignore on Resume
}
tryPull(in)
}
}
}
}
override def onUpstreamFinish(): Unit = {
handleAggregatingValue()
}
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
private def handleAggregatingValue(): Unit = {
aggregating.value match {
case Some(result) => futureCB(result) // already completed
case _ => aggregating.onComplete(futureCB)(ExecutionContexts.parasitic)
}
}
setHandlers(in, out, this)
override def toString =
s"FoldAsync.Logic(completed=${aggregating.isCompleted})"
}