private XceiverClientReply sendCommandWithRetry()

in hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java [378:496]


  private XceiverClientReply sendCommandWithRetry(
      ContainerCommandRequestProto request, List<Validator> validators)
      throws IOException {
    ContainerCommandResponseProto responseProto = null;
    IOException ioException = null;

    // In case of an exception or an error, we will try to read from the
    // datanodes in the pipeline in a round-robin fashion.
    XceiverClientReply reply = new XceiverClientReply(null);
    List<DatanodeDetails> datanodeList = null;

    DatanodeBlockID blockID = null;
    if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
      blockID = request.getGetBlock().getBlockID();
    } else if  (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
      blockID = request.getReadChunk().getBlockID();
    } else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
      blockID = request.getGetSmallFile().getBlock().getBlockID();
    }

    if (blockID != null) {
      if (request.getCmdType() != ContainerProtos.Type.ReadChunk) {
        datanodeList = pipeline.getNodes();
        int getBlockDNLeaderIndex = datanodeList.indexOf(pipeline.getLeaderNode());
        if (getBlockDNLeaderIndex > 0) {
          // Pull the leader DN to the top of the DN list
          Collections.swap(datanodeList, 0, getBlockDNLeaderIndex);
        }
      }
      // Check if the DN to which the GetBlock command was sent has been cached.
      DatanodeDetails cachedDN = getBlockDNcache.get(blockID);
      if (cachedDN != null && !topologyAwareRead) {
        datanodeList = pipeline.getNodes();
        int getBlockDNCacheIndex = datanodeList.indexOf(cachedDN);
        if (getBlockDNCacheIndex > 0) {
          // Pull the Cached DN to the top of the DN list
          Collections.swap(datanodeList, 0, getBlockDNCacheIndex);
        }
      }
    }
    if (datanodeList == null) {
      if (topologyAwareRead) {
        datanodeList = pipeline.getNodesInOrder();
      } else {
        datanodeList = pipeline.getNodes();
        // Shuffle datanode list so that clients do not read in the same order
        // every time.
        Collections.shuffle(datanodeList);
      }
    }

    boolean allInService = datanodeList.stream()
        .allMatch(dn -> dn.getPersistedOpState() == NodeOperationalState.IN_SERVICE);
    if (!allInService) {
      datanodeList = sortDatanodeByOperationalState(datanodeList);
    }

    for (DatanodeDetails dn : datanodeList) {
      try {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Executing command {} on datanode {}",
              processForDebug(request), dn);
        }
        // In case the command gets retried on a 2nd datanode,
        // sendCommandAsyncCall will create a new channel and async stub
        // in case these don't exist for the specific datanode.
        reply.addDatanode(dn);
        responseProto = sendCommandAsync(request, dn).getResponse().get();
        if (validators != null && !validators.isEmpty()) {
          for (Validator validator : validators) {
            validator.accept(request, responseProto);
          }
        }
        if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
          DatanodeBlockID getBlockID = request.getGetBlock().getBlockID();
          getBlockDNcache.put(getBlockID, dn);
        }
        break;
      } catch (IOException e) {
        ioException = e;
        responseProto = null;
        if (LOG.isDebugEnabled()) {
          LOG.debug("Failed to execute command {} on datanode {}",
              processForDebug(request), dn, e);
        }
      } catch (ExecutionException e) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Failed to execute command {} on datanode {}",
              processForDebug(request), dn, e);
        }
        if (Status.fromThrowable(e.getCause()).getCode()
            == Status.UNAUTHENTICATED.getCode()) {
          throw new SCMSecurityException("Failed to authenticate with "
              + "GRPC XceiverServer with Ozone block token.");
        }

        ioException = new IOException(e);
      } catch (InterruptedException e) {
        LOG.error("Command execution was interrupted ", e);
        Thread.currentThread().interrupt();
      }
    }

    if (responseProto != null) {
      reply.setResponse(CompletableFuture.completedFuture(responseProto));
      return reply;
    } else {
      Objects.requireNonNull(ioException);
      String message = "Failed to execute command {}";
      if (LOG.isDebugEnabled()) {
        LOG.debug(message + " on the pipeline {}.",
                processForDebug(request), pipeline);
      } else {
        LOG.warn(message + " on the pipeline {}.",
                request.getCmdType(), pipeline);
      }
      throw ioException;
    }
  }