def apply()

in http-core/src/main/scala/org/apache/pekko/http/impl/engine/client/OutgoingConnectionBlueprint.scala [66:138]


  def apply(
      hostHeader: headers.Host,
      settings: ClientConnectionSettings,
      log: LoggingAdapter): Http.ClientLayer = {
    import settings._

    val core = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val renderingContextCreation = b.add {
        Flow[HttpRequest].map { request =>
          val sendEntityTrigger =
            request.headers.collectFirst { case headers.Expect.`100-continue` => Promise[NotUsed]().future }
          RequestRenderingContext(request, hostHeader, sendEntityTrigger)
        }
      }

      val bypassFanout = b.add(Broadcast[RequestRenderingContext](2, eagerCancel = true))

      val terminationMerge = b.add(TerminationMerge)

      val requestRendering: Flow[RequestRenderingContext, ByteString, NotUsed] = {
        val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log)
        Flow[RequestRenderingContext].flatMapConcat(requestRendererFactory.renderToSource).named("renderer")
      }

      val bypass = Flow[RequestRenderingContext].map { ctx =>
        HttpResponseParser.ResponseContext(ctx.request.method, ctx.sendEntityTrigger.map(_.asInstanceOf[Promise[Unit]]))
      }

      val responseParsingMerge = b.add {
        // the initial header parser we initially use for every connection,
        // will not be mutated, all "shared copy" parsers copy on first-write into the header cache
        val rootParser = new HttpResponseParser(parserSettings, HttpHeaderParser(parserSettings, log))
        new ResponseParsingMerge(rootParser)
      }

      val responsePrep = Flow[List[ParserOutput.ResponseOutput]]
        .mapConcat(ConstantFun.scalaIdentityFunction)
        .via(new PrepareResponse(parserSettings))

      val terminationFanout = b.add(Broadcast[HttpResponse](2))

      val logger =
        b.add(Flow[ByteString].mapError { case t => log.debug(s"Outgoing request stream error {}", t); t }.named(
          "errorLogger"))
      val wrapTls = b.add(Flow[ByteString].map(SendBytes(_)))

      val collectSessionBytes = b.add(Flow[SslTlsInbound].collect { case s: SessionBytes => s })

      renderingContextCreation.out ~> bypassFanout.in
      bypassFanout.out(0)          ~> terminationMerge.in0
      terminationMerge.out         ~> requestRendering ~> logger ~> wrapTls

      bypassFanout.out(1) ~> bypass ~> responseParsingMerge.in1
      collectSessionBytes ~> responseParsingMerge.in0

      responseParsingMerge.out ~> responsePrep ~> terminationFanout.in
      terminationFanout.out(0) ~> terminationMerge.in1

      BidiShape(
        renderingContextCreation.in,
        wrapTls.out,
        collectSessionBytes.in,
        terminationFanout.out(1))
    })

    One2OneBidiFlow[HttpRequest, HttpResponse](
      -1,
      outputTruncationException = new UnexpectedConnectionClosureException(_)).atop(
      core).atop(
      logTLSBidiBySetting("client-plain-text", settings.logUnencryptedNetworkBytes))
  }