in stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala [84:276]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private val contextPropagation = pekko.stream.impl.ContextPropagation()
private final class Contextual[T](context: AnyRef, val element: T) {
private var suspended = false
def suspend(): Unit =
if (!suspended) {
suspended = true
contextPropagation.suspendContext()
}
def resume(): Unit =
if (suspended) {
suspended = false
contextPropagation.resumeContext(context)
}
}
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var partitionsInProgress: mutable.Set[Partition] = _
private var buffer: mutable.Queue[(Partition, Contextual[Holder[In, Out]])] = _
private val futureCB = getAsyncCallback[Holder[In, Out]](holder =>
holder.out match {
case Success(_) => pushNextIfPossible()
case Failure(ex) =>
holder.supervisionDirectiveFor(decider, ex) match {
// fail fast as if supervision says so
case Supervision.Stop => failStage(ex)
case _ => pushNextIfPossible()
}
})
override def preStart(): Unit = {
partitionsInProgress = mutable.Set()
buffer = mutable.Queue()
}
override def onPull(): Unit = pushNextIfPossible()
override def onPush(): Unit = {
try {
val element = grab(in)
val partition = partitioner(element)
val wrappedInput = new Contextual(
contextPropagation.currentContext(),
new Holder[In, Out](element, NotYetThere, futureCB))
buffer.enqueue(partition -> wrappedInput)
if (canStartNextElement(partition)) {
processElement(partition, wrappedInput)
} else {
wrappedInput.suspend()
}
} catch {
case NonFatal(ex) => if (decider(ex) == Supervision.Stop) failStage(ex)
}
pullIfNeeded()
}
override def onUpstreamFinish(): Unit = if (idle()) completeStage()
private def processElement(partition: Partition, wrappedInput: Contextual[Holder[In, Out]]): Unit = {
import wrappedInput.{ element => holder }
val future = f(holder.in, partition)
partitionsInProgress += partition
future.value match {
case None => future.onComplete(holder)(ExecutionContexts.parasitic)
case Some(v) =>
// #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and
// run the logic directly on this thread
holder.setOut(v)
v match {
// this optimization also requires us to stop the stage to fail fast if the decider says so:
case Failure(ex) if holder.supervisionDirectiveFor(decider, ex) == Supervision.Stop => failStage(ex)
case _ => pushNextIfPossible()
}
}
}
private val pushNextIfPossible: () => Unit =
if (orderedOutput) pushNextIfPossibleOrdered _
else pushNextIfPossibleUnordered _
private def pushNextIfPossibleOrdered(): Unit =
if (partitionsInProgress.isEmpty) {
drainQueue()
pullIfNeeded()
} else {
while (buffer.nonEmpty && !(buffer.front._2.element.out eq NotYetThere) && isAvailable(out)) {
val (partition, wrappedInput) = buffer.dequeue()
import wrappedInput.{ element => holder }
partitionsInProgress -= partition
holder.out match {
case Success(elem) =>
if (elem != null) {
push(out, elem)
pullIfNeeded()
} else {
// elem is null
pullIfNeeded()
}
case Failure(NonFatal(ex)) =>
holder.supervisionDirectiveFor(decider, ex) match {
// this could happen if we are looping in pushNextIfPossible and end up on a failed future before the
// onComplete callback has run
case Supervision.Stop =>
failStage(ex)
case _ =>
// try next element
}
case Failure(ex) =>
// fatal exception in buffer, not sure that it can actually happen, but for good measure
throw ex
}
}
drainQueue()
}
private def pushNextIfPossibleUnordered(): Unit =
if (partitionsInProgress.isEmpty) {
drainQueue()
pullIfNeeded()
} else {
buffer = buffer.filter { case (partition, wrappedInput) =>
import wrappedInput.{ element => holder }
if ((holder.out eq NotYetThere) || !isAvailable(out)) {
true
} else {
partitionsInProgress -= partition
holder.out match {
case Success(elem) =>
if (elem != null) {
push(out, elem)
}
case Failure(NonFatal(ex)) =>
holder.supervisionDirectiveFor(decider, ex) match {
// this could happen if we are looping in pushNextIfPossible and end up on a failed future before the
// onComplete callback has run
case Supervision.Stop =>
failStage(ex)
case _ =>
// try next element
}
case Failure(ex) =>
// fatal exception in buffer, not sure that it can actually happen, but for good measure
throw ex
}
false
}
}
pullIfNeeded()
drainQueue()
}
private def drainQueue(): Unit = {
if (buffer.nonEmpty) {
buffer.foreach {
case (partition, wrappedInput) =>
if (canStartNextElement(partition)) {
wrappedInput.resume()
processElement(partition, wrappedInput)
}
}
}
}
private def pullIfNeeded(): Unit =
if (isClosed(in) && idle()) completeStage()
else if (buffer.size < parallelism && !hasBeenPulled(in)) tryPull(in)
// else already pulled and waiting for next element
private def idle(): Boolean = buffer.isEmpty
private def canStartNextElement(partition: Partition): Boolean =
!partitionsInProgress.contains(partition) && partitionsInProgress.size < parallelism
setHandlers(in, out, this)
}