in common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala [114:186]
@tailrec private def execute(
request: HttpRequestBase,
timeout: FiniteDuration,
maxConcurrent: Int,
maxResponse: ByteSize,
truncation: ByteSize,
retry: Boolean,
reschedule: Boolean = false)(implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = {
val start = Instant.now
Try(connection.execute(request)).map { response =>
val containerResponse = Option(response.getEntity)
.map { entity =>
val statusCode = response.getStatusLine.getStatusCode
val contentLength = entity.getContentLength
// Negative contentLength means unknown or overflow. We don't want to consume in either case.
if (contentLength >= 0) {
if (contentLength <= maxResponse.toBytes) {
// optimized route to consume the entire stream into a string
val str = EntityUtils.toString(entity, StandardCharsets.UTF_8) // consumes and closes the whole stream
Right(ContainerResponse(statusCode, str, None))
} else {
// only consume a bounded number of bytes according to the system limits
val str = new String(IOUtils.toByteArray(entity.getContent, truncation.toBytes), StandardCharsets.UTF_8)
EntityUtils.consumeQuietly(entity) // consume the rest of the stream to free the connection
Right(ContainerResponse(statusCode, str, Some(contentLength.B, maxResponse)))
}
} else {
EntityUtils.consumeQuietly(entity) // silently consume the whole stream to free the connection
Left(NoResponseReceived())
}
}
.getOrElse {
// entity is null
Left(NoResponseReceived())
}
response.close()
containerResponse
} recoverWith {
// The route to target socket as well as the target socket itself may need some time to be available -
// particularly on a loaded system.
// The following exceptions occur on such transient conditions. In addition, no data has been transmitted
// yet if these exceptions occur. For this reason, it is safe and reasonable to retry.
//
// HttpHostConnectException: no target socket is listening (yet).
case t: HttpHostConnectException => Failure(RetryableConnectionError(t))
//
// NoRouteToHostException: route to target host is not known (yet).
case t: NoRouteToHostException => Failure(RetryableConnectionError(t))
//In general with NoHttpResponseException it cannot be said if server has processed the request or not
//For some cases like in standalone mode setup it should be fine to retry
case t: NoHttpResponseException if ApacheBlockingContainerClient.clientConfig.retryNoHttpResponseException =>
Failure(RetryableConnectionError(t))
} match {
case Success(response) => response
case Failure(_: RetryableConnectionError) if reschedule =>
//propagate as a failed future; clients can retry at a different container
throw ContainerHealthError(tid, request.getURI.toString)
case Failure(t: RetryableConnectionError) if retry =>
if (timeout > Duration.Zero) {
Thread.sleep(50) // Sleep for 50 milliseconds
val newTimeout = timeout - (Instant.now.toEpochMilli - start.toEpochMilli).milliseconds
execute(request, newTimeout, maxConcurrent, maxResponse, truncation, retry = true)
} else {
logging.warn(this, s"POST failed with $t - no retry because timeout exceeded.")
Left(Timeout(t))
}
case Failure(t: Throwable) => Left(ConnectionError(t))
}
}