public CompletableFuture applyTransaction()

in hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java [1096:1219]


  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
    long index = trx.getLogEntry().getIndex();
    try {
      // Remove the stateMachine data once both followers have caught up. If any
      // one of the follower is behind, the pending queue will max out as
      // configurable limit on pending request size and count and then will
      // block and client will backoff as a result of that.
      removeStateMachineDataIfNeeded(index);
      // if waitOnBothFollower is false, remove the entry from the cache
      // as soon as its applied and such entry exists in the cache.
      removeStateMachineDataIfMajorityFollowSync(index);
      final DispatcherContext.Builder builder = DispatcherContext
          .newBuilder(DispatcherContext.Op.APPLY_TRANSACTION)
          .setTerm(trx.getLogEntry().getTerm())
          .setLogIndex(index);

      final Context context = (Context) trx.getStateMachineContext();
      long applyTxnStartTime = Time.monotonicNowNanos();
      metrics.recordUntilApplyTransactionNs(applyTxnStartTime - context.getStartTime());
      applyTransactionSemaphore.acquire();
      metrics.incNumApplyTransactionsOps();


      Objects.requireNonNull(context, "context == null");
      final ContainerCommandRequestProto requestProto = context.getLogProto();
      final Type cmdType = requestProto.getCmdType();
      // Make sure that in write chunk, the user data is not set
      if (cmdType == Type.WriteChunk) {
        Preconditions
            .checkArgument(requestProto.getWriteChunk().getData().isEmpty());
        builder.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
      }
      if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile
          || cmdType == Type.PutBlock || cmdType == Type.CreateContainer
          || cmdType == Type.StreamInit) {
        builder.setContainer2BCSIDMap(container2BCSIDMap);
      }
      CompletableFuture<Message> applyTransactionFuture =
          new CompletableFuture<>();
      final Consumer<Throwable> exceptionHandler = e -> {
        LOG.error(getGroupId() + ": failed to applyTransaction at logIndex " + index
            + " for " + requestProto.getCmdType(), e);
        stateMachineHealthy.compareAndSet(true, false);
        unhealthyContainers.add(requestProto.getContainerID());
        metrics.incNumApplyTransactionsFails();
        applyTransactionFuture.completeExceptionally(e);
      };

      // Ensure the command gets executed in a separate thread than
      // stateMachineUpdater thread which is calling applyTransaction here.
      final CompletableFuture<ContainerCommandResponseProto> future =
          applyTransaction(requestProto, builder.build(), exceptionHandler);
      future.thenApply(r -> {
        // TODO: add metrics for non-leader case
        if (trx.getServerRole() == RaftPeerRole.LEADER) {
          final long startTime = context.getStartTime();
          metrics.incPipelineLatencyMs(cmdType,
              (Time.monotonicNowNanos() - startTime) / 1000000L);
        }
        // ignore close container exception while marking the stateMachine
        // unhealthy
        if (r.getResult() != ContainerProtos.Result.SUCCESS
            && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
            && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
            && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
          StorageContainerException sce =
              new StorageContainerException(r.getMessage(), r.getResult());
          LOG.error(
              "gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : "
                  + "{} Container Result: {}", getGroupId(), r.getCmdType(), index,
              r.getMessage(), r.getResult());
          metrics.incNumApplyTransactionsFails();
          // Since the applyTransaction now is completed exceptionally,
          // before any further snapshot is taken , the exception will be
          // caught in stateMachineUpdater in Ratis and ratis server will
          // shutdown.
          applyTransactionFuture.completeExceptionally(sce);
          stateMachineHealthy.compareAndSet(true, false);
          unhealthyContainers.add(requestProto.getContainerID());
          ratisServer.handleApplyTransactionFailure(getGroupId(), trx.getServerRole());
        } else {
          if (LOG.isDebugEnabled()) {
            LOG.debug(
                "gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : "
                    + "{} Container Result: {}", getGroupId(), r.getCmdType(), index,
                r.getMessage(), r.getResult());
          }
          if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
            metrics.incNumBytesCommittedCount(
                requestProto.getWriteChunk().getChunkData().getLen());
          }
          applyTransactionFuture.complete(r::toByteString);
          // add the entry to the applyTransactionCompletionMap only if the
          // stateMachine is healthy i.e, there has been no applyTransaction
          // failures before.
          if (isStateMachineHealthy()) {
            final Long previous = applyTransactionCompletionMap
                .put(index, trx.getLogEntry().getTerm());
            Preconditions.checkState(previous == null);
            updateLastApplied();
          }
        }
        return applyTransactionFuture;
      }).whenComplete((r, t) -> {
        if (t != null) {
          exceptionHandler.accept(t);
        }
        applyTransactionSemaphore.release();
        metrics.recordApplyTransactionCompletionNs(
            Time.monotonicNowNanos() - applyTxnStartTime);
        if (trx.getServerRole() == RaftPeerRole.LEADER) {
          metrics.decPendingApplyTransactions();
        }
      });
      return applyTransactionFuture;
    } catch (InterruptedException e) {
      metrics.incNumApplyTransactionsFails();
      Thread.currentThread().interrupt();
      return completeExceptionally(e);
    } catch (Exception e) {
      metrics.incNumApplyTransactionsFails();
      return completeExceptionally(e);
    }
  }