in http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala [66:138]
def apply(
hostHeader: headers.Host,
settings: ClientConnectionSettings,
log: LoggingAdapter): Http.ClientLayer = {
import settings._
val core = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val renderingContextCreation = b.add {
Flow[HttpRequest].map { request =>
val sendEntityTrigger =
request.headers.collectFirst { case headers.Expect.`100-continue` => Promise[NotUsed]().future }
RequestRenderingContext(request, hostHeader, sendEntityTrigger)
}
}
val bypassFanout = b.add(Broadcast[RequestRenderingContext](2, eagerCancel = true))
val terminationMerge = b.add(TerminationMerge)
val requestRendering: Flow[RequestRenderingContext, ByteString, NotUsed] = {
val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log)
Flow[RequestRenderingContext].flatMapConcat(requestRendererFactory.renderToSource).named("renderer")
}
val bypass = Flow[RequestRenderingContext].map { ctx =>
HttpResponseParser.ResponseContext(ctx.request.method, ctx.sendEntityTrigger.map(_.asInstanceOf[Promise[Unit]]))
}
val responseParsingMerge = b.add {
// the initial header parser we initially use for every connection,
// will not be mutated, all "shared copy" parsers copy on first-write into the header cache
val rootParser = new HttpResponseParser(parserSettings, HttpHeaderParser(parserSettings, log))
new ResponseParsingMerge(rootParser)
}
val responsePrep = Flow[List[ParserOutput.ResponseOutput]]
.mapConcat(ConstantFun.scalaIdentityFunction)
.via(new PrepareResponse(parserSettings))
val terminationFanout = b.add(Broadcast[HttpResponse](2))
val logger =
b.add(Flow[ByteString].mapError { case t => log.debug(s"Outgoing request stream error {}", t); t }.named(
"errorLogger"))
val wrapTls = b.add(Flow[ByteString].map(SendBytes(_)))
val collectSessionBytes = b.add(Flow[SslTlsInbound].collect { case s: SessionBytes => s })
renderingContextCreation.out ~> bypassFanout.in
bypassFanout.out(0) ~> terminationMerge.in0
terminationMerge.out ~> requestRendering ~> logger ~> wrapTls
bypassFanout.out(1) ~> bypass ~> responseParsingMerge.in1
collectSessionBytes ~> responseParsingMerge.in0
responseParsingMerge.out ~> responsePrep ~> terminationFanout.in
terminationFanout.out(0) ~> terminationMerge.in1
BidiShape(
renderingContextCreation.in,
wrapTls.out,
collectSessionBytes.in,
terminationFanout.out(1))
})
One2OneBidiFlow[HttpRequest, HttpResponse](
-1,
outputTruncationException = new UnexpectedConnectionClosureException(_)).atop(
core).atop(
logTLSBidiBySetting("client-plain-text", settings.logUnencryptedNetworkBytes))
}