in http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala [303:374]
override def initialAttributes = Attributes.name("ResponseParsingMerge")
val shape = new FanInShape2(dataIn, bypassIn, responseOut)
override def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) {
// each connection uses a single (private) response parser instance for all its responses
// which builds a cache of all header instances seen on that connection
val parser = rootParser.createShallowCopy()
var waitingForMethod = true
var completeStagePending = false
setHandler(bypassIn,
new InHandler {
override def onPush(): Unit = {
val responseContext = grab(bypassIn)
parser.setContextForNextResponse(responseContext)
val output = parser.parseBytes(ByteString.empty)
drainParser(output)
}
override def onUpstreamFinish(): Unit =
if (waitingForMethod) completeStage()
})
setHandler(dataIn,
new InHandler {
override def onPush(): Unit = {
val bytes = grab(dataIn)
val output = parser.parseSessionBytes(bytes)
drainParser(output)
}
override def onUpstreamFinish(): Unit =
if (waitingForMethod) completeStage()
else {
if (parser.onUpstreamFinish()) {
completeStage()
} else {
completeStagePending = true
emit(responseOut, parser.onPull() :: Nil, () => completeStage())
}
}
})
setHandler(responseOut, eagerTerminateOutput)
val getNextMethod = () => {
waitingForMethod = true
if (isClosed(bypassIn)) completeStage()
else pull(bypassIn)
}
val getNextData = () => {
waitingForMethod = false
if (isClosed(dataIn)) {
if (!completeStagePending)
completeStage()
} else pull(dataIn)
}
@tailrec def drainParser(current: ResponseOutput, b: ListBuffer[ResponseOutput] = ListBuffer.empty): Unit = {
def e(output: List[ResponseOutput], andThen: () => Unit): Unit =
if (output.nonEmpty) emit(responseOut, output, andThen)
else andThen()
current match {
case NeedNextRequestMethod => e(b.result(), getNextMethod)
case StreamEnd => e(b.result(), () => completeStage())
case NeedMoreData => e(b.result(), getNextData)
case x => drainParser(parser.onPull(), b += x)
}
}
override def preStart(): Unit = getNextMethod()
}