def cachedHostConnectionPool[T: FromResponseUnmarshaller]()

in google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala [90:158]


  def cachedHostConnectionPool[T: FromResponseUnmarshaller](
      host: String,
      port: Int = -1,
      https: Boolean = true,
      authenticate: Boolean = true,
      parallelism: Int = 1): Flow[HttpRequest, T, Future[HostConnectionPool]] =
    Flow[HttpRequest]
      .map((_, ()))
      .viaMat(cachedHostConnectionPoolWithContext[T, Unit](host, port, https, authenticate, parallelism).asFlow)(
        Keep.right)
      .map(_._1.get)

  /**
   * Creates a cached host connection pool that sends requests and emits the [[Unmarshal]]led response.
   * If `authenticate = true` adds an Authorization header to each request.
   * Retries the request if the [[FromResponseUnmarshaller]] throws a [[pekko.stream.connectors.google.util.Retry]].
   */
  def cachedHostConnectionPoolWithContext[T: FromResponseUnmarshaller, Ctx](
      host: String,
      port: Int = -1,
      https: Boolean = true,
      authenticate: Boolean = true,
      parallelism: Int = 1): FlowWithContext[HttpRequest, Ctx, Try[T], Ctx, Future[HostConnectionPool]] =
    FlowWithContext.fromTuples {
      Flow.fromMaterializer { (mat, attr) =>
        implicit val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr)
        val p = if (port == -1) if (https) 443 else 80 else port

        val uriFlow = FlowWithContext[HttpRequest, Ctx].map(addStandardQuery)

        val authFlow =
          if (authenticate)
            FlowWithContext[HttpRequest, Ctx].mapAsync(1)(addAuth)
          else
            FlowWithContext[HttpRequest, Ctx]

        val requestFlow = settings.requestSettings.forwardProxy match {
          case None if !https =>
            http.cachedHostConnectionPool[Ctx](host, p)
          case Some(proxy) if !https =>
            http.cachedHostConnectionPool[Ctx](host, p, proxy.poolSettings)
          case None if https =>
            http.cachedHostConnectionPoolHttps[Ctx](host, p)
          case Some(proxy) if https =>
            http.cachedHostConnectionPoolHttps[Ctx](host, p, proxy.connectionContext, proxy.poolSettings)
          case _ => throw new RuntimeException(s"illegal proxy settings with https=$https")
        }

        val unmarshalFlow = Flow[(Try[HttpResponse], Ctx)].mapAsyncUnordered(parallelism) {
          case (res, ctx) =>
            Future
              .fromTry(res)
              .flatMap(Unmarshal(_).to[T])(ExecutionContexts.parasitic)
              .transform(Success(_))(ExecutionContexts.parasitic)
              .zip(Future.successful(ctx))
        }

        val flow = uriFlow.via(authFlow).viaMat(requestFlow)(Keep.right).via(unmarshalFlow).asFlow

        val RetrySettings(maxRetries, minBackoff, maxBackoff, randomFactor) = settings.requestSettings.retrySettings
        RetryFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRetries, flow) {
          case (in, (Failure(Retry(_)), _)) => Some(in)
          case _                            => None
        }.map {
          case (Failure(Retry(ex)), ctx) => (Failure(ex), ctx)
          case x                         => x
        }
      }
    }