in google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala [90:158]
def cachedHostConnectionPool[T: FromResponseUnmarshaller](
host: String,
port: Int = -1,
https: Boolean = true,
authenticate: Boolean = true,
parallelism: Int = 1): Flow[HttpRequest, T, Future[HostConnectionPool]] =
Flow[HttpRequest]
.map((_, ()))
.viaMat(cachedHostConnectionPoolWithContext[T, Unit](host, port, https, authenticate, parallelism).asFlow)(
Keep.right)
.map(_._1.get)
/**
* Creates a cached host connection pool that sends requests and emits the [[Unmarshal]]led response.
* If `authenticate = true` adds an Authorization header to each request.
* Retries the request if the [[FromResponseUnmarshaller]] throws a [[pekko.stream.connectors.google.util.Retry]].
*/
def cachedHostConnectionPoolWithContext[T: FromResponseUnmarshaller, Ctx](
host: String,
port: Int = -1,
https: Boolean = true,
authenticate: Boolean = true,
parallelism: Int = 1): FlowWithContext[HttpRequest, Ctx, Try[T], Ctx, Future[HostConnectionPool]] =
FlowWithContext.fromTuples {
Flow.fromMaterializer { (mat, attr) =>
implicit val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr)
val p = if (port == -1) if (https) 443 else 80 else port
val uriFlow = FlowWithContext[HttpRequest, Ctx].map(addStandardQuery)
val authFlow =
if (authenticate)
FlowWithContext[HttpRequest, Ctx].mapAsync(1)(addAuth)
else
FlowWithContext[HttpRequest, Ctx]
val requestFlow = settings.requestSettings.forwardProxy match {
case None if !https =>
http.cachedHostConnectionPool[Ctx](host, p)
case Some(proxy) if !https =>
http.cachedHostConnectionPool[Ctx](host, p, proxy.poolSettings)
case None if https =>
http.cachedHostConnectionPoolHttps[Ctx](host, p)
case Some(proxy) if https =>
http.cachedHostConnectionPoolHttps[Ctx](host, p, proxy.connectionContext, proxy.poolSettings)
case _ => throw new RuntimeException(s"illegal proxy settings with https=$https")
}
val unmarshalFlow = Flow[(Try[HttpResponse], Ctx)].mapAsyncUnordered(parallelism) {
case (res, ctx) =>
Future
.fromTry(res)
.flatMap(Unmarshal(_).to[T])(ExecutionContexts.parasitic)
.transform(Success(_))(ExecutionContexts.parasitic)
.zip(Future.successful(ctx))
}
val flow = uriFlow.via(authFlow).viaMat(requestFlow)(Keep.right).via(unmarshalFlow).asFlow
val RetrySettings(maxRetries, minBackoff, maxBackoff, randomFactor) = settings.requestSettings.retrySettings
RetryFlow.withBackoff(minBackoff, maxBackoff, randomFactor, maxRetries, flow) {
case (in, (Failure(Retry(_)), _)) => Some(in)
case _ => None
}.map {
case (Failure(Retry(ex)), ctx) => (Failure(ex), ctx)
case x => x
}
}
}