static void sendRequestWithRetry()

in ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java [70:141]


  static void sendRequestWithRetry(PendingClientRequest pending, RaftClientImpl client) {
    final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
    if (f.isDone()) {
      return;
    }

    final RaftClientRequest request = pending.newRequest();
    final int attemptCount = pending.getAttemptCount();

    final ClientId clientId = client.getId();
    LOG.debug("{}: attempt #{} send~ {}", clientId, attemptCount, request);
    client.getClientRpc().sendRequestAsyncUnordered(request).whenCompleteAsync((reply, e) -> {
      try {
        LOG.debug("{}: attempt #{} receive~ {}", clientId, attemptCount, reply);
        final RaftException replyException = reply != null? reply.getException(): null;
        reply = client.handleLeaderException(request, reply);
        if (reply != null) {
          client.handleReply(request, reply);
          f.complete(reply);
          return;
        }

        final Throwable cause = replyException != null ? replyException : e;
        if (client.isClosed()) {
          f.completeExceptionally(new AlreadyClosedException(client + " is closed"));
          return;
        }

        final ClientRetryEvent event = pending.newClientRetryEvent(request, cause);
        RetryPolicy retryPolicy = client.getRetryPolicy();
        final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
        if (!action.shouldRetry()) {
          f.completeExceptionally(client.noMoreRetries(event));
          return;
        }

        if (e != null) {
          if (LOG.isTraceEnabled()) {
            LOG.trace(clientId + ": attempt #" + attemptCount + " failed~ " + request, e);
          } else {
            LOG.debug("{}: attempt #{} failed {} with {}", clientId, attemptCount, request, e.toString());
          }
          e = JavaUtils.unwrapCompletionException(e);

          if (e instanceof IOException) {
            if (e instanceof NotLeaderException) {
              client.handleNotLeaderException(request, (NotLeaderException) e, null);
            } else if (e instanceof GroupMismatchException) {
              f.completeExceptionally(e);
              return;
            } else {
              client.handleIOException(request, (IOException) e);
            }
          } else {
            if (!client.getClientRpc().handleException(request.getServerId(), e, false)) {
              f.completeExceptionally(e);
              return;
            }
          }
        }

        final TimeDuration sleepTime = client.getEffectiveSleepTime(cause, action.getSleepTime());
        LOG.debug("schedule~ attempt #{} with sleep {} and policy {} for {}",
            attemptCount, sleepTime, retryPolicy, request);
        client.getScheduler().onTimeout(sleepTime,
            () -> sendRequestWithRetry(pending, client), LOG, () -> clientId + ": Failed~ to retry " + request);
      } catch (Exception ex) {
        LOG.error(clientId + ": Failed " + request, ex);
        f.completeExceptionally(ex);
      }
    });
  }