private void tryRequest()

in server/src/main/java/org/apache/druid/rpc/ServiceClientImpl.java [114:393]


  private <IntermediateType, FinalType> void tryRequest(
      final RequestBuilder requestBuilder,
      final HttpResponseHandler<IntermediateType, FinalType> handler,
      final SettableFuture<FinalType> retVal,
      final long attemptNumber,
      final ImmutableSet<String> redirectLocations
  )
  {
    whenServiceReady(
        serviceLocations -> {
          if (retVal.isCancelled()) {
            // Return early if the caller canceled the return future.
            return;
          }

          final ServiceLocation serviceLocation = pick(serviceLocations);
          final long nextAttemptNumber = attemptNumber + 1;

          if (serviceLocation == null) {
            // Null location means the service is not currently available. Trigger a retry, if retryable.
            if (retryPolicy.retryNotAvailable() && shouldTry(nextAttemptNumber)) {
              final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber);

              log.info(
                  "Service [%s] not available on attempt #%d; retrying in %,d ms.",
                  serviceName,
                  nextAttemptNumber,
                  backoffMs
              );

              connectExec.schedule(
                  () -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()),
                  backoffMs,
                  TimeUnit.MILLISECONDS
              );
            } else {
              retVal.setException(new ServiceNotAvailableException(serviceName));
            }

            return;
          }

          final Request request = requestBuilder.build(serviceLocation);
          ListenableFuture<Either<StringFullResponseHolder, FinalType>> responseFuture;

          log.debug("Service [%s] request [%s %s] starting.", serviceName, request.getMethod(), request.getUrl());

          responseFuture = httpClient.go(
              request,
              new ObjectOrErrorResponseHandler<>(handler),
              requestBuilder.getTimeout()
          );

          // Add cancellation listener on the return future to ensure that responseFuture is canceled too.
          final ListenableFuture<Either<StringFullResponseHolder, FinalType>> theResponseFuture = responseFuture;

          retVal.addListener(
              () -> {
                if (retVal.isCancelled()) {
                  theResponseFuture.cancel(true);
                }
              },
              Execs.directExecutor()
          );

          Futures.addCallback(
              responseFuture,
              new FutureCallback<>()
              {
                @Override
                public void onSuccess(@Nullable final Either<StringFullResponseHolder, FinalType> result)
                {
                  try {
                    // result can be null if the HttpClient encounters a problem midstream on an unfinished response.
                    if (result != null && result.isValue()) {
                      handleResultValue(result.valueOrThrow());
                    } else {
                      final StringFullResponseHolder errorHolder = result != null ? result.error() : null;

                      if (errorHolder != null && isRedirect(errorHolder.getResponse().getStatus())) {
                        handleRedirect(errorHolder);
                      } else if (shouldTry(nextAttemptNumber)
                                 && (errorHolder == null || retryPolicy.retryHttpResponse(errorHolder.getResponse()))) {
                        handleRetryableErrorResponse(errorHolder);
                      } else if (errorHolder != null) {
                        // Nonretryable server response.
                        retVal.setException(new HttpResponseException(errorHolder));
                      } else {
                        // Nonretryable null result from the HTTP client.
                        retVal.setException(new RpcException(buildErrorMessage(request, null, -1, nextAttemptNumber)));
                      }
                    }
                  }
                  catch (Throwable t) {
                    // It's a bug if this happens. The purpose of this line is to help us debug what went wrong.
                    retVal.setException(new RpcException(t, "Service [%s] handler exited unexpectedly", serviceName));
                  }
                }

                @Override
                public void onFailure(final Throwable t)
                {
                  try {
                    final long nextAttemptNumber = attemptNumber + 1;

                    if (shouldTry(nextAttemptNumber) && retryPolicy.retryThrowable(t)) {
                      final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber);

                      if (retryPolicy.retryLoggable()) {
                        // log as INFO level if the retry is loggable
                        log.noStackTrace().info(t, buildErrorMessage(request, null, backoffMs, nextAttemptNumber));
                      } else if (log.isDebugEnabled()) {
                        // log as DEBUG level if the debug log is enabled
                        log.noStackTrace().debug(t, buildErrorMessage(request, null, backoffMs, nextAttemptNumber));
                      } else {
                        // If none of the above is valid, we log the error message every tenth time we retry. It seems like
                        // a good balance between making the logs not too verbose when the retry is due to the same cause
                        // and enriching logs with useful information, if we keep retrying due to the same reason
                        if (nextAttemptNumber > 0 && nextAttemptNumber % 10 == 0) {
                          log.noStackTrace().info(t, buildErrorMessage(request, null, backoffMs, nextAttemptNumber));
                        }
                      }

                      connectExec.schedule(
                          () -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()),
                          backoffMs,
                          TimeUnit.MILLISECONDS
                      );
                    } else {
                      retVal.setException(new RpcException(t, buildErrorMessage(request, null, -1, nextAttemptNumber)));
                    }
                  }
                  catch (Throwable t2) {
                    // It's a bug if this happens. The purpose of this line is to help us debug what went wrong.
                    retVal.setException(new RpcException(t, "Service [%s] handler exited unexpectedly", serviceName));
                  }
                }

                /**
                 * Handles HTTP 2xx responses from the server.
                 */
                private void handleResultValue(final FinalType value)
                {
                  if (nextAttemptNumber > 1) {
                    // There were retries. Log at INFO level to provide the user some closure.
                    log.info(
                        "Service [%s] request [%s %s] completed.",
                        serviceName,
                        request.getMethod(),
                        request.getUrl()
                    );
                  } else {
                    // No retries. Log at debug level to avoid cluttering the logs.
                    log.debug(
                        "Service [%s] request [%s %s] completed.",
                        serviceName,
                        request.getMethod(),
                        request.getUrl()
                    );
                  }

                  // Will not throw, because we checked result.isValue() earlier.
                  retVal.set(value);
                }

                /**
                 * Handles retryable HTTP error responses from the server.
                 */
                private void handleRetryableErrorResponse(final StringFullResponseHolder errorHolder)
                {
                  // Retryable server response (or null errorHolder, which means null result, which can happen
                  // if the HttpClient encounters an exception in the midst of response processing).
                  final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber);
                  if (retryPolicy.retryLoggable()) {
                    log.noStackTrace().info(buildErrorMessage(request, errorHolder, backoffMs, nextAttemptNumber));
                  } else if (log.isDebugEnabled()) {
                    log.noStackTrace().debug(buildErrorMessage(request, errorHolder, backoffMs, nextAttemptNumber));
                  } else {
                    if (nextAttemptNumber > 0 && nextAttemptNumber % 10 == 0) {
                      log.noStackTrace().info(buildErrorMessage(request, errorHolder, backoffMs, nextAttemptNumber));
                    }
                  }
                  connectExec.schedule(
                      () -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()),
                      backoffMs,
                      TimeUnit.MILLISECONDS
                  );
                }

                /**
                 * Handles HTTP redirect responses from the server.
                 */
                private void handleRedirect(final StringFullResponseHolder errorHolder)
                {
                  // Redirect. Update preferredLocationNoPath if appropriate, then reissue.
                  final String newUri = errorHolder.getResponse().headers().get("Location");
                  final ServiceLocation redirectLocationNoPath = serviceLocationNoPathFromUri(newUri);

                  if (redirectLocationNoPath == null) {
                    // Redirect to invalid URL. Something is wrong with the server: fail immediately
                    // without retries.
                    retVal.setException(
                        new RpcException(
                            "Service [%s] redirected to invalid URL [%s]",
                            serviceName,
                            newUri
                        )
                    );
                  } else if (serviceLocations.getLocations()
                                             .stream()
                                             .anyMatch(loc -> serviceLocationMatches(loc, redirectLocationNoPath))) {
                    // Valid redirect, to a server that is one of the known locations.
                    final boolean isRedirectLoop = redirectLocations.contains(newUri);
                    final boolean isRedirectChainTooLong = redirectLocations.size() >= MAX_REDIRECTS;

                    if (isRedirectLoop || isRedirectChainTooLong) {
                      // Treat redirect loops, or too-long redirect chains, as unavailable services.
                      if (retryPolicy.retryNotAvailable() && shouldTry(nextAttemptNumber)) {
                        final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber);

                        log.info(
                            "Service [%s] issued too many redirects on attempt #%d; retrying in %,d ms.",
                            serviceName,
                            nextAttemptNumber,
                            backoffMs
                        );

                        connectExec.schedule(
                            () -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()),
                            backoffMs,
                            TimeUnit.MILLISECONDS
                        );
                      } else {
                        retVal.setException(new ServiceNotAvailableException(serviceName, "issued too many redirects"));
                      }
                    } else {
                      // Valid redirect. Follow it without incrementing the attempt number.
                      preferredLocationNoPath.set(redirectLocationNoPath);
                      final ImmutableSet<String> newRedirectLocations =
                          ImmutableSet.<String>builder().addAll(redirectLocations).add(newUri).build();
                      connectExec.submit(
                          () -> tryRequest(requestBuilder, handler, retVal, attemptNumber, newRedirectLocations)
                      );
                    }
                  } else {
                    // Redirect to a server that is not one of the known locations. Treat service as unavailable.
                    if (retryPolicy.retryNotAvailable() && shouldTry(nextAttemptNumber)) {
                      final long backoffMs = computeBackoffMs(retryPolicy, attemptNumber);

                      log.info(
                          "Service [%s] issued redirect to unknown URL [%s] on attempt #%d; retrying in %,d ms.",
                          serviceName,
                          newUri,
                          nextAttemptNumber,
                          backoffMs
                      );

                      connectExec.schedule(
                          () -> tryRequest(requestBuilder, handler, retVal, nextAttemptNumber, ImmutableSet.of()),
                          backoffMs,
                          TimeUnit.MILLISECONDS
                      );
                    } else {
                      retVal.setException(
                          new ServiceNotAvailableException(
                              serviceName,
                              "issued redirect to unknown URL [%s]",
                              newUri
                          )
                      );
                    }
                  }
                }
              },
              connectExec
          );
        },
        retVal
    );
  }