in server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java [114:393]
private <IntermediateType, FinalType> void tryRequest(
final RequestBuilder requestBuilder,
final HttpResponseHandler<IntermediateType, FinalType> handler,
final SettableFuture<FinalType> retVal,
final long attemptNumber,
final ImmutableSet<String> redirectLocations
)
{
whenServiceReady(
serviceLocations -> {
if (retVal.isCancelled()) {
// Return early if the caller canceled the return future.
return;
}
final ServiceLocation serviceLocation = pick(serviceLocations);
final long nextAttemptNumber = attemptNumber + 1;
if (serviceLocation == null) {
// Null location means the service is not currently available. Trigger a retry, if retryable.
if (retryPolicy.retryNotAvailable() && shouldTry(nextAttemptNumber)) {
final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber);
log.info(
"Service [%s] not available on attempt #%d; retrying in %,d ms.",
serviceName,
nextAttemptNumber,
backoffMs
);
connectExec.schedule(
() -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()),
backoffMs,
TimeUnit.MILLISECONDS
);
} else {
retVal.setException(new ServiceNotAvailableException(serviceName));
}
return;
}
final Request request = requestBuilder.build(serviceLocation);
ListenableFuture<Either<StringFullResponseHolder, FinalType>> responseFuture;
log.debug("Service [%s] request [%s %s] starting.", serviceName, request.getMethod(), request.getUrl());
responseFuture = httpClient.go(
request,
new ObjectOrErrorResponseHandler<>(handler),
requestBuilder.getTimeout()
);
// Add cancellation listener on the return future to ensure that responseFuture is canceled too.
final ListenableFuture<Either<StringFullResponseHolder, FinalType>> theResponseFuture = responseFuture;
retVal.addListener(
() -> {
if (retVal.isCancelled()) {
theResponseFuture.cancel(true);
}
},
Execs.directExecutor()
);
Futures.addCallback(
responseFuture,
new FutureCallback<>()
{
@Override
public void onSuccess(@Nullable final Either<StringFullResponseHolder, FinalType> result)
{
try {
// result can be null if the HttpClient encounters a problem midstream on an unfinished response.
if (result != null && result.isValue()) {
handleResultValue(result.valueOrThrow());
} else {
final StringFullResponseHolder errorHolder = result != null ? result.error() : null;
if (errorHolder != null && isRedirect(errorHolder.getResponse().getStatus())) {
handleRedirect(errorHolder);
} else if (shouldTry(nextAttemptNumber)
&& (errorHolder == null || retryPolicy.retryHttpResponse(errorHolder.getResponse()))) {
handleRetryableErrorResponse(errorHolder);
} else if (errorHolder != null) {
// Nonretryable server response.
retVal.setException(new HttpResponseException(errorHolder));
} else {
// Nonretryable null result from the HTTP client.
retVal.setException(new RpcException(buildErrorMessage(request, null, -1, nextAttemptNumber)));
}
}
}
catch (Throwable t) {
// It's a bug if this happens. The purpose of this line is to help us debug what went wrong.
retVal.setException(new RpcException(t, "Service [%s] handler exited unexpectedly", serviceName));
}
}
@Override
public void onFailure(final Throwable t)
{
try {
final long nextAttemptNumber = attemptNumber + 1;
if (shouldTry(nextAttemptNumber) && retryPolicy.retryThrowable(t)) {
final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber);
if (retryPolicy.retryLoggable()) {
// log as INFO level if the retry is loggable
log.noStackTrace().info(t, buildErrorMessage(request, null, backoffMs, nextAttemptNumber));
} else if (log.isDebugEnabled()) {
// log as DEBUG level if the debug log is enabled
log.noStackTrace().debug(t, buildErrorMessage(request, null, backoffMs, nextAttemptNumber));
} else {
// If none of the above is valid, we log the error message every tenth time we retry. It seems like
// a good balance between making the logs not too verbose when the retry is due to the same cause
// and enriching logs with useful information, if we keep retrying due to the same reason
if (nextAttemptNumber > 0 && nextAttemptNumber % 10 == 0) {
log.noStackTrace().info(t, buildErrorMessage(request, null, backoffMs, nextAttemptNumber));
}
}
connectExec.schedule(
() -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()),
backoffMs,
TimeUnit.MILLISECONDS
);
} else {
retVal.setException(new RpcException(t, buildErrorMessage(request, null, -1, nextAttemptNumber)));
}
}
catch (Throwable t2) {
// It's a bug if this happens. The purpose of this line is to help us debug what went wrong.
retVal.setException(new RpcException(t, "Service [%s] handler exited unexpectedly", serviceName));
}
}
/**
* Handles HTTP 2xx responses from the server.
*/
private void handleResultValue(final FinalType value)
{
if (nextAttemptNumber > 1) {
// There were retries. Log at INFO level to provide the user some closure.
log.info(
"Service [%s] request [%s %s] completed.",
serviceName,
request.getMethod(),
request.getUrl()
);
} else {
// No retries. Log at debug level to avoid cluttering the logs.
log.debug(
"Service [%s] request [%s %s] completed.",
serviceName,
request.getMethod(),
request.getUrl()
);
}
// Will not throw, because we checked result.isValue() earlier.
retVal.set(value);
}
/**
* Handles retryable HTTP error responses from the server.
*/
private void handleRetryableErrorResponse(final StringFullResponseHolder errorHolder)
{
// Retryable server response (or null errorHolder, which means null result, which can happen
// if the HttpClient encounters an exception in the midst of response processing).
final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber);
if (retryPolicy.retryLoggable()) {
log.noStackTrace().info(buildErrorMessage(request, errorHolder, backoffMs, nextAttemptNumber));
} else if (log.isDebugEnabled()) {
log.noStackTrace().debug(buildErrorMessage(request, errorHolder, backoffMs, nextAttemptNumber));
} else {
if (nextAttemptNumber > 0 && nextAttemptNumber % 10 == 0) {
log.noStackTrace().info(buildErrorMessage(request, errorHolder, backoffMs, nextAttemptNumber));
}
}
connectExec.schedule(
() -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()),
backoffMs,
TimeUnit.MILLISECONDS
);
}
/**
* Handles HTTP redirect responses from the server.
*/
private void handleRedirect(final StringFullResponseHolder errorHolder)
{
// Redirect. Update preferredLocationNoPath if appropriate, then reissue.
final String newUri = errorHolder.getResponse().headers().get("Location");
final ServiceLocation redirectLocationNoPath = serviceLocationNoPathFromUri(newUri);
if (redirectLocationNoPath == null) {
// Redirect to invalid URL. Something is wrong with the server: fail immediately
// without retries.
retVal.setException(
new RpcException(
"Service [%s] redirected to invalid URL [%s]",
serviceName,
newUri
)
);
} else if (serviceLocations.getLocations()
.stream()
.anyMatch(loc -> serviceLocationMatches(loc, redirectLocationNoPath))) {
// Valid redirect, to a server that is one of the known locations.
final boolean isRedirectLoop = redirectLocations.contains(newUri);
final boolean isRedirectChainTooLong = redirectLocations.size() >= MAX_REDIRECTS;
if (isRedirectLoop || isRedirectChainTooLong) {
// Treat redirect loops, or too-long redirect chains, as unavailable services.
if (retryPolicy.retryNotAvailable() && shouldTry(nextAttemptNumber)) {
final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber);
log.info(
"Service [%s] issued too many redirects on attempt #%d; retrying in %,d ms.",
serviceName,
nextAttemptNumber,
backoffMs
);
connectExec.schedule(
() -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()),
backoffMs,
TimeUnit.MILLISECONDS
);
} else {
retVal.setException(new ServiceNotAvailableException(serviceName, "issued too many redirects"));
}
} else {
// Valid redirect. Follow it without incrementing the attempt number.
preferredLocationNoPath.set(redirectLocationNoPath);
final ImmutableSet<String> newRedirectLocations =
ImmutableSet.<String>builder().addAll(redirectLocations).add(newUri).build();
connectExec.submit(
() -> tryRequest(requestBuilder, handler, retVal, attemptNumber, newRedirectLocations)
);
}
} else {
// Redirect to a server that is not one of the known locations. Treat service as unavailable.
if (retryPolicy.retryNotAvailable() && shouldTry(nextAttemptNumber)) {
final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber);
log.info(
"Service [%s] issued redirect to unknown URL [%s] on attempt #%d; retrying in %,d ms.",
serviceName,
newUri,
nextAttemptNumber,
backoffMs
);
connectExec.schedule(
() -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()),
backoffMs,
TimeUnit.MILLISECONDS
);
} else {
retVal.setException(
new ServiceNotAvailableException(
serviceName,
"issued redirect to unknown URL [%s]",
newUri
)
);
}
}
}
},
connectExec
);
},
retVal
);
}