in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala [1295:1388]
override def initialAttributes = DefaultAttributes.mapAsync and SourceLocation.forLambda(f)
override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
var buffer: BufferImpl[Holder[Out]] = _
private val futureCB = getAsyncCallback[Holder[Out]](holder =>
holder.elem match {
case Success(_) => pushNextIfPossible()
case Failure(ex) =>
holder.supervisionDirectiveFor(decider, ex) match {
// fail fast as if supervision says so
case Supervision.Stop => failStage(ex)
case _ => pushNextIfPossible()
}
})
override def preStart(): Unit = buffer = BufferImpl(parallelism, inheritedAttributes)
override def onPull(): Unit = pushNextIfPossible()
override def onPush(): Unit = {
try {
val future = f(grab(in))
val holder = new Holder[Out](NotYetThere, futureCB)
buffer.enqueue(holder)
future.value match {
case None => future.onComplete(holder)(pekko.dispatch.ExecutionContexts.parasitic)
case Some(v) =>
// #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and
// run the logic directly on this thread
holder.setElem(v)
v match {
// this optimization also requires us to stop the stage to fail fast if the decider says so:
case Failure(ex) if holder.supervisionDirectiveFor(decider, ex) == Supervision.Stop => failStage(ex)
case _ => pushNextIfPossible()
}
}
} catch {
// this logic must only be executed if f throws, not if the future is failed
case NonFatal(ex) => if (decider(ex) == Supervision.Stop) failStage(ex)
}
pullIfNeeded()
}
override def onUpstreamFinish(): Unit = if (buffer.isEmpty) completeStage()
@tailrec
private def pushNextIfPossible(): Unit =
if (buffer.isEmpty) pullIfNeeded()
else if (buffer.peek().elem eq NotYetThere) pullIfNeeded() // ahead of line blocking to keep order
else if (isAvailable(out)) {
val holder = buffer.dequeue()
holder.elem match {
case Success(elem) =>
if (elem != null) {
push(out, elem)
pullIfNeeded()
} else {
// elem is null
pullIfNeeded()
pushNextIfPossible()
}
case Failure(NonFatal(ex)) =>
holder.supervisionDirectiveFor(decider, ex) match {
// this could happen if we are looping in pushNextIfPossible and end up on a failed future before the
// onComplete callback has run
case Supervision.Stop => failStage(ex)
case _ =>
// try next element
pushNextIfPossible()
}
case Failure(ex) =>
// fatal exception in buffer, not sure that it can actually happen, but for good measure
throw ex
}
}
private def pullIfNeeded(): Unit = {
if (isClosed(in) && buffer.isEmpty) completeStage()
else if (buffer.used < parallelism && !hasBeenPulled(in)) tryPull(in)
// else already pulled and waiting for next element
}
setHandlers(in, out, this)
}