in hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java [1096:1219]
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
long index = trx.getLogEntry().getIndex();
try {
// Remove the stateMachine data once both followers have caught up. If any
// one of the follower is behind, the pending queue will max out as
// configurable limit on pending request size and count and then will
// block and client will backoff as a result of that.
removeStateMachineDataIfNeeded(index);
// if waitOnBothFollower is false, remove the entry from the cache
// as soon as its applied and such entry exists in the cache.
removeStateMachineDataIfMajorityFollowSync(index);
final DispatcherContext.Builder builder = DispatcherContext
.newBuilder(DispatcherContext.Op.APPLY_TRANSACTION)
.setTerm(trx.getLogEntry().getTerm())
.setLogIndex(index);
final Context context = (Context) trx.getStateMachineContext();
long applyTxnStartTime = Time.monotonicNowNanos();
metrics.recordUntilApplyTransactionNs(applyTxnStartTime - context.getStartTime());
applyTransactionSemaphore.acquire();
metrics.incNumApplyTransactionsOps();
Objects.requireNonNull(context, "context == null");
final ContainerCommandRequestProto requestProto = context.getLogProto();
final Type cmdType = requestProto.getCmdType();
// Make sure that in write chunk, the user data is not set
if (cmdType == Type.WriteChunk) {
Preconditions
.checkArgument(requestProto.getWriteChunk().getData().isEmpty());
builder.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
}
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile
|| cmdType == Type.PutBlock || cmdType == Type.CreateContainer
|| cmdType == Type.StreamInit) {
builder.setContainer2BCSIDMap(container2BCSIDMap);
}
CompletableFuture<Message> applyTransactionFuture =
new CompletableFuture<>();
final Consumer<Throwable> exceptionHandler = e -> {
LOG.error(getGroupId() + ": failed to applyTransaction at logIndex " + index
+ " for " + requestProto.getCmdType(), e);
stateMachineHealthy.compareAndSet(true, false);
unhealthyContainers.add(requestProto.getContainerID());
metrics.incNumApplyTransactionsFails();
applyTransactionFuture.completeExceptionally(e);
};
// Ensure the command gets executed in a separate thread than
// stateMachineUpdater thread which is calling applyTransaction here.
final CompletableFuture<ContainerCommandResponseProto> future =
applyTransaction(requestProto, builder.build(), exceptionHandler);
future.thenApply(r -> {
// TODO: add metrics for non-leader case
if (trx.getServerRole() == RaftPeerRole.LEADER) {
final long startTime = context.getStartTime();
metrics.incPipelineLatencyMs(cmdType,
(Time.monotonicNowNanos() - startTime) / 1000000L);
}
// ignore close container exception while marking the stateMachine
// unhealthy
if (r.getResult() != ContainerProtos.Result.SUCCESS
&& r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
&& r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
&& r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(
"gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : "
+ "{} Container Result: {}", getGroupId(), r.getCmdType(), index,
r.getMessage(), r.getResult());
metrics.incNumApplyTransactionsFails();
// Since the applyTransaction now is completed exceptionally,
// before any further snapshot is taken , the exception will be
// caught in stateMachineUpdater in Ratis and ratis server will
// shutdown.
applyTransactionFuture.completeExceptionally(sce);
stateMachineHealthy.compareAndSet(true, false);
unhealthyContainers.add(requestProto.getContainerID());
ratisServer.handleApplyTransactionFailure(getGroupId(), trx.getServerRole());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : "
+ "{} Container Result: {}", getGroupId(), r.getCmdType(), index,
r.getMessage(), r.getResult());
}
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
metrics.incNumBytesCommittedCount(
requestProto.getWriteChunk().getChunkData().getLen());
}
applyTransactionFuture.complete(r::toByteString);
// add the entry to the applyTransactionCompletionMap only if the
// stateMachine is healthy i.e, there has been no applyTransaction
// failures before.
if (isStateMachineHealthy()) {
final Long previous = applyTransactionCompletionMap
.put(index, trx.getLogEntry().getTerm());
Preconditions.checkState(previous == null);
updateLastApplied();
}
}
return applyTransactionFuture;
}).whenComplete((r, t) -> {
if (t != null) {
exceptionHandler.accept(t);
}
applyTransactionSemaphore.release();
metrics.recordApplyTransactionCompletionNs(
Time.monotonicNowNanos() - applyTxnStartTime);
if (trx.getServerRole() == RaftPeerRole.LEADER) {
metrics.decPendingApplyTransactions();
}
});
return applyTransactionFuture;
} catch (InterruptedException e) {
metrics.incNumApplyTransactionsFails();
Thread.currentThread().interrupt();
return completeExceptionally(e);
} catch (Exception e) {
metrics.incNumApplyTransactionsFails();
return completeExceptionally(e);
}
}