in ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java [834:886]
private CompletableFuture<RaftClientReply> appendTransaction(
RaftClientRequest request, TransactionContextImpl context, CacheEntry cacheEntry) {
Objects.requireNonNull(request, "request == null");
CodeInjectionForTesting.execute(APPEND_TRANSACTION, getId(),
request.getClientId(), request, context, cacheEntry);
final PendingRequest pending;
synchronized (this) {
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, cacheEntry);
if (reply != null) {
return reply;
}
// append the message to its local log
final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
writeIndexCache.add(request.getClientId(), context.getLogIndexFuture());
final PendingRequests.Permit permit = leaderState.tryAcquirePendingRequest(request.getMessage());
if (permit == null) {
cacheEntry.failWithException(new ResourceUnavailableException(
getMemberId() + ": Failed to acquire a pending write request for " + request));
return cacheEntry.getReplyFuture();
}
try {
assertLifeCycleState(LifeCycle.States.RUNNING);
state.appendLog(context);
} catch (StateMachineException e) {
// the StateMachineException is thrown by the SM in the preAppend stage.
// Return the exception in a RaftClientReply.
RaftClientReply exceptionReply = newExceptionReply(request, e);
cacheEntry.failWithReply(exceptionReply);
// leader will step down here
if (e.leaderShouldStepDown() && getInfo().isLeader()) {
leaderState.submitStepDownEvent(LeaderState.StepDownReason.STATE_MACHINE_EXCEPTION);
}
return CompletableFuture.completedFuture(exceptionReply);
} catch (ServerNotReadyException e) {
final RaftClientReply exceptionReply = newExceptionReply(request, e);
return CompletableFuture.completedFuture(exceptionReply);
}
// put the request into the pending queue
pending = leaderState.addPendingRequest(permit, request, context);
if (pending == null) {
cacheEntry.failWithException(new ResourceUnavailableException(
getMemberId() + ": Failed to add a pending write request for " + request));
return cacheEntry.getReplyFuture();
}
leaderState.notifySenders();
}
return pending.getFuture();
}