in stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala [813:908]
def this(outputPorts: Int, partitioner: T => Int) = this(outputPorts, partitioner, false)
val in: Inlet[T] = Inlet[T]("Partition.in")
val out: Seq[Outlet[T]] = Seq.tabulate(outputPorts)(i => Outlet[T]("Partition.out" + i)) // FIXME BC make this immutable.IndexedSeq as type + Vector as concrete impl
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var outPendingElem: Any = null
private var outPendingIdx: Int = _
private var downstreamRunning = outputPorts
def onPush() = {
val elem = grab(in)
val idx =
try {
val i = partitioner(elem)
if (i < 0 || i >= outputPorts)
throw PartitionOutOfBoundsException(
s"partitioner must return an index in the range [0,${outputPorts - 1}]. returned: [$i] for input " +
s"[${elem.getClass.getName}].")
else i
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop => failStage(ex)
case Supervision.Restart => pull(in)
case Supervision.Resume => pull(in)
}
Int.MinValue
}
if (idx != Int.MinValue) {
if (!isClosed(out(idx))) {
if (isAvailable(out(idx))) {
push(out(idx), elem)
pullIfAnyOutIsAvailable()
} else {
outPendingElem = elem
outPendingIdx = idx
}
} else
pullIfAnyOutIsAvailable()
}
}
private def pullIfAnyOutIsAvailable(): Unit = {
if (out.exists(isAvailable(_)))
pull(in)
}
override def onUpstreamFinish(): Unit = {
if (outPendingElem == null) completeStage()
}
setHandler(in, this)
out.iterator.zipWithIndex.foreach {
case (o, idx) =>
setHandler(
o,
new OutHandler {
override def onPull() = {
if (outPendingElem != null) {
val elem = outPendingElem.asInstanceOf[T]
if (idx == outPendingIdx) {
push(o, elem)
outPendingElem = null
if (isClosed(in))
completeStage()
else if (!hasBeenPulled(in))
pull(in)
}
} else if (!hasBeenPulled(in))
pull(in)
}
override def onDownstreamFinish(cause: Throwable) =
if (eagerCancel) cancelStage(cause)
else {
downstreamRunning -= 1
if (downstreamRunning == 0)
cancelStage(cause)
else if (outPendingElem != null && idx == outPendingIdx) {
outPendingElem = null
if (isClosed(in))
cancelStage(cause)
else if (!hasBeenPulled(in))
pull(in)
}
}
})
}
}