private ContainerCommandResponseProto dispatchRequest()

in hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java [198:436]


  private ContainerCommandResponseProto dispatchRequest(
      ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
    Preconditions.checkNotNull(msg);
    if (LOG.isTraceEnabled()) {
      LOG.trace("Command {}, trace ID: {} ", msg.getCmdType(),
          msg.getTraceID());
    }

    AuditAction action = getAuditAction(msg.getCmdType());
    EventType eventType = getEventType(msg);
    PerformanceStringBuilder perf = new PerformanceStringBuilder();

    ContainerType containerType;
    ContainerCommandResponseProto responseProto = null;
    long startTime = Time.monotonicNowNanos();
    Type cmdType = msg.getCmdType();
    long containerID = msg.getContainerID();
    Container container = getContainer(containerID);
    boolean isWriteStage =
        (cmdType == Type.WriteChunk && dispatcherContext != null
            && dispatcherContext.getStage()
            == DispatcherContext.WriteChunkStage.WRITE_DATA)
            || (cmdType == Type.StreamInit);
    boolean isWriteCommitStage =
        (cmdType == Type.WriteChunk && dispatcherContext != null
            && dispatcherContext.getStage()
            == DispatcherContext.WriteChunkStage.COMMIT_DATA);

    if (dispatcherContext == null) {
      // increase all op not through ratis
      metrics.incContainerOpsMetrics(cmdType);
    } else if (isWriteStage) {
      // increase WriteChunk in only WRITE_STAGE
      metrics.incContainerOpsMetrics(cmdType);
    } else if (cmdType != Type.WriteChunk) {
      metrics.incContainerOpsMetrics(cmdType);
    }

    try {
      if (DispatcherContext.op(dispatcherContext).validateToken()) {
        validateToken(msg);
      }
    } catch (IOException ioe) {
      final String s = ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED
          + " for " + dispatcherContext + ": " + ioe.getMessage();
      final StorageContainerException sce = new StorageContainerException(
          s, ioe, ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
      return ContainerUtils.logAndReturnError(LOG, sce, msg);
    }
    // if the command gets executed other than Ratis, the default write stage
    // is WriteChunkStage.COMBINED
    boolean isCombinedStage =
        cmdType == Type.WriteChunk && (dispatcherContext == null
            || dispatcherContext.getStage()
            == DispatcherContext.WriteChunkStage.COMBINED);
    Map<Long, Long> container2BCSIDMap = null;
    if (dispatcherContext != null) {
      container2BCSIDMap = dispatcherContext.getContainer2BCSIDMap();
    }
    if (isWriteCommitStage) {
      //  check if the container Id exist in the loaded snapshot file. if
      // it does not , it infers that , this is a restart of dn where
      // the we are reapplying the transaction which was not captured in the
      // snapshot.
      // just add it to the list, and remove it from missing container set
      // as it might have been added in the list during "init".
      Preconditions.checkNotNull(container2BCSIDMap);
      if (container != null && container2BCSIDMap.get(containerID) == null) {
        container2BCSIDMap.put(
            containerID, container.getBlockCommitSequenceId());
        getMissingContainerSet().remove(containerID);
      }
    }
    if (cmdType != Type.CreateContainer && !HddsUtils.isReadOnly(msg)
        && getMissingContainerSet().contains(containerID)) {
      StorageContainerException sce = new StorageContainerException(
          "ContainerID " + containerID
              + " has been lost and cannot be recreated on this DataNode",
          ContainerProtos.Result.CONTAINER_MISSING);
      audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce);
      return ContainerUtils.logAndReturnError(LOG, sce, msg);
    }

    if (cmdType != Type.CreateContainer) {
      /**
       * Create Container should happen only as part of Write_Data phase of
       * writeChunk.
       * In EC, we are doing empty putBlock. In the partial stripe writes, if
       * file size is less than chunkSize*(ECData-1), we are making empty block
       * to get the container created in non writing nodes. If replica index is
       * >0 then we know it's for ec container.
       */
      if (container == null && ((isWriteStage || isCombinedStage)
          || cmdType == Type.PutSmallFile
          || cmdType == Type.PutBlock)) {
        // If container does not exist, create one for WriteChunk and
        // PutSmallFile request
        responseProto = createContainer(msg);
        metrics.incContainerOpsMetrics(Type.CreateContainer);
        metrics.incContainerOpsLatencies(Type.CreateContainer,
                Time.monotonicNowNanos() - startTime);

        if (responseProto.getResult() != Result.SUCCESS) {
          StorageContainerException sce = new StorageContainerException(
              "ContainerID " + containerID + " creation failed",
              responseProto.getResult());
          audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce);
          return ContainerUtils.logAndReturnError(LOG, sce, msg);
        }
        Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null
            || dispatcherContext == null
            || cmdType == Type.PutBlock);
        if (container2BCSIDMap != null) {
          // adds this container to list of containers created in the pipeline
          // with initial BCSID recorded as 0.
          container2BCSIDMap.putIfAbsent(containerID, 0L);
        }
        container = getContainer(containerID);
      }

      // if container not found return error
      if (container == null) {
        StorageContainerException sce = new StorageContainerException(
            "ContainerID " + containerID + " does not exist",
            ContainerProtos.Result.CONTAINER_NOT_FOUND);
        audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce);
        return ContainerUtils.logAndReturnError(LOG, sce, msg);
      }
      containerType = getContainerType(container);
    } else {
      if (!msg.hasCreateContainer()) {
        audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
            new Exception("MALFORMED_REQUEST"));
        return malformedRequest(msg);
      }
      containerType = msg.getCreateContainer().getContainerType();
    }
    // Small performance optimization. We check if the operation is of type
    // write before trying to send CloseContainerAction.
    if (!HddsUtils.isReadOnly(msg)) {
      sendCloseContainerActionIfNeeded(container);
    }
    Handler handler = getHandler(containerType);
    if (handler == null) {
      StorageContainerException ex = new StorageContainerException("Invalid " +
          "ContainerType " + containerType,
          ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
      // log failure
      audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, ex);
      return ContainerUtils.logAndReturnError(LOG, ex, msg);
    }
    perf.appendPreOpLatencyNano(Time.monotonicNowNanos() - startTime);
    responseProto = handler.handle(msg, container, dispatcherContext);
    long opLatencyNs = Time.monotonicNowNanos() - startTime;
    if (responseProto != null) {
      metrics.incContainerOpsLatencies(cmdType, opLatencyNs);

      // If the request is of Write Type and the container operation
      // is unsuccessful, it implies the applyTransaction on the container
      // failed. All subsequent transactions on the container should fail and
      // hence replica will be marked unhealthy here. In this case, a close
      // container action will be sent to SCM to close the container.

      // ApplyTransaction called on closed Container will fail with Closed
      // container exception. In such cases, ignore the exception here
      // If the container is already marked unhealthy, no need to change the
      // state here.

      Result result = responseProto.getResult();
      if (!HddsUtils.isReadOnly(msg) && !canIgnoreException(result)) {
        // If the container is open/closing and the container operation
        // has failed, it should be first marked unhealthy and the initiate the
        // close container action. This also implies this is the first
        // transaction which has failed, so the container is marked unhealthy
        // right here.
        // Once container is marked unhealthy, all the subsequent write
        // transactions will fail with UNHEALTHY_CONTAINER exception.

        if (container == null) {
          throw new NullPointerException(
              "Error on creating containers " + result + " " + responseProto
                  .getMessage());
        }
        // For container to be moved to unhealthy state here, the container can
        // only be in open or closing state.
        State containerState = container.getContainerData().getState();
        Preconditions.checkState(
            containerState == State.OPEN
                || containerState == State.CLOSING
                || containerState == State.RECOVERING);
        // mark and persist the container state to be unhealthy
        try {
          // TODO HDDS-7096 + HDDS-8781: Use on demand scanning for the open
          //  container instead.
          handler.markContainerUnhealthy(container,
              ScanResult.unhealthy(ScanResult.FailureType.WRITE_FAILURE,
                  new File(container.getContainerData().getContainerPath()),
                  new StorageContainerException(result)));
          LOG.info("Marked Container UNHEALTHY, ContainerID: {}", containerID);
        } catch (IOException ioe) {
          // just log the error here in case marking the container fails,
          // Return the actual failure response to the client
          LOG.error("Failed to mark container " + containerID + " UNHEALTHY. ",
              ioe);
        }
        // in any case, the in memory state of the container should be unhealthy
        Preconditions.checkArgument(
            container.getContainerData().getState() == State.UNHEALTHY);
        sendCloseContainerActionIfNeeded(container);
      }
      if (cmdType == Type.CreateContainer
          && result == Result.SUCCESS && dispatcherContext != null) {
        Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
        container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
      }
      if (result == Result.SUCCESS) {
        updateBCSID(container, dispatcherContext, cmdType);
        audit(action, eventType, msg, dispatcherContext, AuditEventStatus.SUCCESS, null);
      } else {
        //TODO HDDS-7096:
        // This is a too general place for on demand scanning.
        // Create a specific exception that signals for on demand scanning
        // and move this general scan to where it is more appropriate.
        // Add integration tests to test the full functionality.
        OnDemandContainerDataScanner.scanContainer(container);
        audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
            new Exception(responseProto.getMessage()));
      }
      perf.appendOpLatencyNanos(opLatencyNs);
      performanceAudit(action, msg, dispatcherContext, perf, opLatencyNs);

      return responseProto;
    } else {
      // log failure
      audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
          new Exception("UNSUPPORTED_REQUEST"));
      return unsupportedRequest(msg);
    }
  }