in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala [1400:1468]
override def initialAttributes = DefaultAttributes.mapAsyncUnordered and SourceLocation.forLambda(f)
override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)"
val decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var inFlight = 0
private var buffer: BufferImpl[Out] = _
private[this] def todo = inFlight + buffer.used
override def preStart(): Unit = buffer = BufferImpl(parallelism, inheritedAttributes)
def futureCompleted(result: Try[Out]): Unit = {
def isCompleted = isClosed(in) && todo == 0
inFlight -= 1
result match {
case Success(elem) if elem != null =>
if (isAvailable(out)) {
if (!hasBeenPulled(in)) tryPull(in)
push(out, elem)
if (isCompleted) completeStage()
} else buffer.enqueue(elem)
case Success(_) =>
if (isCompleted) completeStage()
else if (!hasBeenPulled(in)) tryPull(in)
case Failure(ex) =>
if (decider(ex) == Supervision.Stop) failStage(ex)
else if (isCompleted) completeStage()
else if (!hasBeenPulled(in)) tryPull(in)
}
}
private val futureCB = getAsyncCallback(futureCompleted)
private val invokeFutureCB: Try[Out] => Unit = futureCB.invoke
override def onPush(): Unit = {
try {
val future = f(grab(in))
inFlight += 1
future.value match {
case None => future.onComplete(invokeFutureCB)(pekko.dispatch.ExecutionContexts.parasitic)
case Some(v) => futureCompleted(v)
}
} catch {
case NonFatal(ex) => if (decider(ex) == Supervision.Stop) failStage(ex)
}
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
}
override def onUpstreamFinish(): Unit = {
if (todo == 0) completeStage()
}
override def onPull(): Unit = {
if (!buffer.isEmpty) push(out, buffer.dequeue())
val leftTodo = todo
if (isClosed(in) && leftTodo == 0) completeStage()
else if (leftTodo < parallelism && !hasBeenPulled(in)) tryPull(in)
}
setHandlers(in, out, this)
}