in http-core/src/main/scala/org/apache/pekko/http/impl/util/One2OneBidiFlow.scala [50:123]
def apply[I, O](
maxPending: Int,
outputTruncationException: Int => Throwable = OutputTruncationException(_),
unexpectedOutputException: Any => Throwable = UnexpectedOutputException(_)): BidiFlow[I, I, O, O, NotUsed] =
BidiFlow.fromGraph(new One2OneBidi[I, O](maxPending, outputTruncationException, unexpectedOutputException))
/*
* +--------------------+
* ~> | in toWrapped | ~>
* | |
* <~ | out fromWrapped | <~
* +--------------------+
*/
class One2OneBidi[I, O](
maxPending: Int,
outputTruncationException: Int => Throwable,
unexpectedOutputException: Any => Throwable) extends GraphStage[BidiShape[I, I, O, O]] {
val in = Inlet[I]("One2OneBidi.in")
val out = Outlet[O]("One2OneBidi.out")
val toWrapped = Outlet[I]("One2OneBidi.toWrapped")
val fromWrapped = Inlet[O]("One2OneBidi.fromWrapped")
override def initialAttributes = Attributes.name("One2OneBidi")
val shape = BidiShape(in, toWrapped, fromWrapped, out)
override def toString = "One2OneBidi"
override def createLogic(effectiveAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var insideWrappedFlow = 0
private var pullSuppressed = false
setHandler(in,
new InHandler {
override def onPush(): Unit = {
insideWrappedFlow += 1
push(toWrapped, grab(in))
}
override def onUpstreamFinish(): Unit = complete(toWrapped)
})
setHandler(toWrapped,
new OutHandler {
override def onPull(): Unit =
if (insideWrappedFlow < maxPending || maxPending == -1) pull(in)
else pullSuppressed = true
override def onDownstreamFinish(): Unit = cancel(in)
})
setHandler(fromWrapped,
new InHandler {
override def onPush(): Unit = {
val element = grab(fromWrapped)
if (insideWrappedFlow > 0) {
insideWrappedFlow -= 1
push(out, element)
if (pullSuppressed) {
pullSuppressed = false
if (!isClosed(in)) pull(in)
}
} else failStage(unexpectedOutputException(element))
}
override def onUpstreamFinish(): Unit = {
if (insideWrappedFlow > 0) failStage(outputTruncationException(insideWrappedFlow))
else completeStage()
}
})
setHandler(out,
new OutHandler {
override def onPull(): Unit = pull(fromWrapped)
override def onDownstreamFinish(): Unit = cancel(fromWrapped)
})
}
}