in sqs/src/main/scala/org/apache/pekko/stream/connectors/sqs/impl/BalancingMapAsync.scala [55:143]
override def initialAttributes = name("BalancingMapAsync")
override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
var buffer: Buffer[Holder[Out]] = _
var parallelism = maxParallelism
private val futureCB = getAsyncCallback[Holder[Out]](holder =>
holder.elem match {
case Success(value) =>
parallelism = balancingF(value, parallelism)
pushNextIfPossible()
case Failure(ex) =>
holder.supervisionDirectiveFor(decider, ex) match {
// fail fast as if supervision says so
case Supervision.Stop => failStage(ex)
case _ => pushNextIfPossible()
}
})
override def preStart(): Unit = buffer = BufferImpl(parallelism, inheritedAttributes)
override def onPull(): Unit = pushNextIfPossible()
override def onPush(): Unit = {
try {
val future = f(grab(in))
val holder = new Holder[Out](NotYetThere, futureCB)
buffer.enqueue(holder)
future.value match {
case None => future.onComplete(holder)(pekko.dispatch.ExecutionContexts.parasitic)
case Some(v) =>
// #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and
// run the logic directly on this thread
holder.setElem(v)
v match {
// this optimization also requires us to stop the stage to fail fast if the decider says so:
case Failure(ex) if holder.supervisionDirectiveFor(decider, ex) == Supervision.Stop => failStage(ex)
case _ => pushNextIfPossible()
}
}
} catch {
// this logic must only be executed if f throws, not if the future is failed
case NonFatal(ex) => if (decider(ex) == Supervision.Stop) failStage(ex)
}
pullIfNeeded()
}
override def onUpstreamFinish(): Unit = if (buffer.isEmpty) completeStage()
@tailrec
private def pushNextIfPossible(): Unit =
if (buffer.isEmpty) {
if (isClosed(in)) completeStage()
else pullIfNeeded()
} else if (buffer.peek().elem eq NotYetThere) pullIfNeeded() // ahead of line blocking to keep order
else if (isAvailable(out)) {
val holder = buffer.dequeue()
holder.elem match {
case Success(elem) =>
push(out, elem)
pullIfNeeded()
case Failure(NonFatal(ex)) =>
holder.supervisionDirectiveFor(decider, ex) match {
// this could happen if we are looping in pushNextIfPossible and end up on a failed future before the
// onComplete callback has run
case Supervision.Stop => failStage(ex)
case _ =>
// try next element
pushNextIfPossible()
}
case Failure(_) => // fatal
}
}
private def pullIfNeeded(): Unit =
if (buffer.used < parallelism && !hasBeenPulled(in)) tryPull(in)
setHandlers(in, out, this)
}