in atlas-pekko/src/main/scala/com/netflix/atlas/pekko/PekkoHttpClient.scala [149:217]
private def isConnectException(t: Throwable): Boolean = t.isInstanceOf[ConnectException]
/** Default implementation based on Pekko `Http()`. */
private[pekko] class HttpClientImpl(name: String)(implicit val system: ActorSystem)
extends PekkoHttpClient {
private implicit val ec: ExecutionContext = system.dispatcher
private val http = Http()
protected def doSingleRequest(request: HttpRequest): Future[HttpResponse] = {
http.singleRequest(request)
}
override def singleRequest(request: HttpRequest): Future[HttpResponse] = {
val accessLogger = AccessLogger.newClientLogger(name, request)
doSingleRequest(request).andThen { case t => accessLogger.complete(t) }
}
protected def superPoolFlow[C](
connectionContext: HttpsConnectionContext,
settings: ConnectionPoolSettings
): Flow[(HttpRequest, C), (Try[HttpResponse], C), NotUsed] = {
http.superPool(connectionContext = connectionContext, settings = settings)
}
override def superPool[C](
config: ClientConfig
): Flow[(HttpRequest, C), (Try[HttpResponse], C), NotUsed] = {
val connectionContext = config.connectionContext.getOrElse(http.defaultClientHttpsContext)
val settings = config.settings.getOrElse(ConnectionPoolSettings(system))
// All retries will be handled in this flow, disable in the pekko pool
val pekkoSettings = settings.withMaxRetries(0)
val clientFlow =
superPoolFlow[Context[C]](connectionContext = connectionContext, settings = pekkoSettings)
.map {
case (response, context) =>
context.accessLogger.complete(response)
response -> context
}
// Retry requests if needed with instrumentation via common IPC
val retryFlow =
RetryFlow.withBackoff[(HttpRequest, Context[C]), (Try[HttpResponse], Context[C]), NotUsed](
minBackoff = settings.baseConnectionBackoff,
maxBackoff = settings.maxConnectionBackoff,
randomFactor = 0.25,
maxRetries = settings.maxRetries,
flow = clientFlow
) {
case (request, (response, _)) if config.shouldRetry(response) => Some(request)
case _ => None
}
// Map caller context to internal version that includes access logger
Flow[(HttpRequest, C)]
.map {
case (request, callerContext) =>
val accessLogger = AccessLogger
.newClientLogger(name, request)
.withMaxAttempts(settings.maxRetries + 1)
request -> Context(accessLogger, callerContext)
}
.via(if (settings.maxRetries > 0) retryFlow else clientFlow)
.map {
case (response, context) => response -> context.callerContext
}
}
}