def apply[I, O]()

in http-core/src/main/scala/org/apache/pekko/http/impl/util/One2OneBidiFlow.scala [50:123]


  def apply[I, O](
      maxPending: Int,
      outputTruncationException: Int => Throwable = OutputTruncationException(_),
      unexpectedOutputException: Any => Throwable = UnexpectedOutputException(_)): BidiFlow[I, I, O, O, NotUsed] =
    BidiFlow.fromGraph(new One2OneBidi[I, O](maxPending, outputTruncationException, unexpectedOutputException))

  /*
   *    +--------------------+
   * ~> | in       toWrapped | ~>
   *    |                    |
   * <~ | out    fromWrapped | <~
   *    +--------------------+
   */
  class One2OneBidi[I, O](
      maxPending: Int,
      outputTruncationException: Int => Throwable,
      unexpectedOutputException: Any => Throwable) extends GraphStage[BidiShape[I, I, O, O]] {
    val in = Inlet[I]("One2OneBidi.in")
    val out = Outlet[O]("One2OneBidi.out")
    val toWrapped = Outlet[I]("One2OneBidi.toWrapped")
    val fromWrapped = Inlet[O]("One2OneBidi.fromWrapped")

    override def initialAttributes = Attributes.name("One2OneBidi")
    val shape = BidiShape(in, toWrapped, fromWrapped, out)

    override def toString = "One2OneBidi"

    override def createLogic(effectiveAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
      private var insideWrappedFlow = 0
      private var pullSuppressed = false

      setHandler(in,
        new InHandler {
          override def onPush(): Unit = {
            insideWrappedFlow += 1
            push(toWrapped, grab(in))
          }
          override def onUpstreamFinish(): Unit = complete(toWrapped)
        })

      setHandler(toWrapped,
        new OutHandler {
          override def onPull(): Unit =
            if (insideWrappedFlow < maxPending || maxPending == -1) pull(in)
            else pullSuppressed = true
          override def onDownstreamFinish(): Unit = cancel(in)
        })

      setHandler(fromWrapped,
        new InHandler {
          override def onPush(): Unit = {
            val element = grab(fromWrapped)
            if (insideWrappedFlow > 0) {
              insideWrappedFlow -= 1
              push(out, element)
              if (pullSuppressed) {
                pullSuppressed = false
                if (!isClosed(in)) pull(in)
              }
            } else failStage(unexpectedOutputException(element))
          }
          override def onUpstreamFinish(): Unit = {
            if (insideWrappedFlow > 0) failStage(outputTruncationException(insideWrappedFlow))
            else completeStage()
          }
        })

      setHandler(out,
        new OutHandler {
          override def onPull(): Unit = pull(fromWrapped)
          override def onDownstreamFinish(): Unit = cancel(fromWrapped)
        })
    }
  }