private CompletableFuture appendTransaction()

in ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java [834:886]


  private CompletableFuture<RaftClientReply> appendTransaction(
      RaftClientRequest request, TransactionContextImpl context, CacheEntry cacheEntry) {
    Objects.requireNonNull(request, "request == null");
    CodeInjectionForTesting.execute(APPEND_TRANSACTION, getId(),
        request.getClientId(), request, context, cacheEntry);

    final PendingRequest pending;
    synchronized (this) {
      final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, cacheEntry);
      if (reply != null) {
        return reply;
      }

      // append the message to its local log
      final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
      writeIndexCache.add(request.getClientId(), context.getLogIndexFuture());

      final PendingRequests.Permit permit = leaderState.tryAcquirePendingRequest(request.getMessage());
      if (permit == null) {
        cacheEntry.failWithException(new ResourceUnavailableException(
            getMemberId() + ": Failed to acquire a pending write request for " + request));
        return cacheEntry.getReplyFuture();
      }
      try {
        assertLifeCycleState(LifeCycle.States.RUNNING);
        state.appendLog(context);
      } catch (StateMachineException e) {
        // the StateMachineException is thrown by the SM in the preAppend stage.
        // Return the exception in a RaftClientReply.
        RaftClientReply exceptionReply = newExceptionReply(request, e);
        cacheEntry.failWithReply(exceptionReply);
        // leader will step down here
        if (e.leaderShouldStepDown() && getInfo().isLeader()) {
          leaderState.submitStepDownEvent(LeaderState.StepDownReason.STATE_MACHINE_EXCEPTION);
        }
        return CompletableFuture.completedFuture(exceptionReply);
      } catch (ServerNotReadyException e) {
        final RaftClientReply exceptionReply = newExceptionReply(request, e);
        return CompletableFuture.completedFuture(exceptionReply);
      }

      // put the request into the pending queue
      pending = leaderState.addPendingRequest(permit, request, context);
      if (pending == null) {
        cacheEntry.failWithException(new ResourceUnavailableException(
            getMemberId() + ": Failed to add a pending write request for " + request));
        return cacheEntry.getReplyFuture();
      }
      leaderState.notifySenders();
    }

    return pending.getFuture();
  }