def this()

in stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala [813:908]


  def this(outputPorts: Int, partitioner: T => Int) = this(outputPorts, partitioner, false)

  val in: Inlet[T] = Inlet[T]("Partition.in")
  val out: Seq[Outlet[T]] = Seq.tabulate(outputPorts)(i => Outlet[T]("Partition.out" + i)) // FIXME BC make this immutable.IndexedSeq as type + Vector as concrete impl
  override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler {
      lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
      private var outPendingElem: Any = null
      private var outPendingIdx: Int = _
      private var downstreamRunning = outputPorts

      def onPush() = {
        val elem = grab(in)
        val idx =
          try {
            val i = partitioner(elem)
            if (i < 0 || i >= outputPorts)
              throw PartitionOutOfBoundsException(
                s"partitioner must return an index in the range [0,${outputPorts - 1}]. returned: [$i] for input " +
                s"[${elem.getClass.getName}].")
            else i
          } catch {
            case NonFatal(ex) =>
              decider(ex) match {
                case Supervision.Stop    => failStage(ex)
                case Supervision.Restart => pull(in)
                case Supervision.Resume  => pull(in)
              }
              Int.MinValue
          }

        if (idx != Int.MinValue) {
          if (!isClosed(out(idx))) {
            if (isAvailable(out(idx))) {
              push(out(idx), elem)
              pullIfAnyOutIsAvailable()
            } else {
              outPendingElem = elem
              outPendingIdx = idx
            }

          } else
            pullIfAnyOutIsAvailable()
        }
      }

      private def pullIfAnyOutIsAvailable(): Unit = {
        if (out.exists(isAvailable(_)))
          pull(in)
      }

      override def onUpstreamFinish(): Unit = {
        if (outPendingElem == null) completeStage()
      }

      setHandler(in, this)

      out.iterator.zipWithIndex.foreach {
        case (o, idx) =>
          setHandler(
            o,
            new OutHandler {
              override def onPull() = {
                if (outPendingElem != null) {
                  val elem = outPendingElem.asInstanceOf[T]
                  if (idx == outPendingIdx) {
                    push(o, elem)
                    outPendingElem = null
                    if (isClosed(in))
                      completeStage()
                    else if (!hasBeenPulled(in))
                      pull(in)
                  }
                } else if (!hasBeenPulled(in))
                  pull(in)
              }

              override def onDownstreamFinish(cause: Throwable) =
                if (eagerCancel) cancelStage(cause)
                else {
                  downstreamRunning -= 1
                  if (downstreamRunning == 0)
                    cancelStage(cause)
                  else if (outPendingElem != null && idx == outPendingIdx) {
                    outPendingElem = null
                    if (isClosed(in))
                      cancelStage(cause)
                    else if (!hasBeenPulled(in))
                      pull(in)
                  }
                }
            })
      }
    }