in hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java [378:496]
private XceiverClientReply sendCommandWithRetry(
ContainerCommandRequestProto request, List<Validator> validators)
throws IOException {
ContainerCommandResponseProto responseProto = null;
IOException ioException = null;
// In case of an exception or an error, we will try to read from the
// datanodes in the pipeline in a round-robin fashion.
XceiverClientReply reply = new XceiverClientReply(null);
List<DatanodeDetails> datanodeList = null;
DatanodeBlockID blockID = null;
if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
blockID = request.getGetBlock().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
blockID = request.getReadChunk().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
blockID = request.getGetSmallFile().getBlock().getBlockID();
}
if (blockID != null) {
if (request.getCmdType() != ContainerProtos.Type.ReadChunk) {
datanodeList = pipeline.getNodes();
int getBlockDNLeaderIndex = datanodeList.indexOf(pipeline.getLeaderNode());
if (getBlockDNLeaderIndex > 0) {
// Pull the leader DN to the top of the DN list
Collections.swap(datanodeList, 0, getBlockDNLeaderIndex);
}
}
// Check if the DN to which the GetBlock command was sent has been cached.
DatanodeDetails cachedDN = getBlockDNcache.get(blockID);
if (cachedDN != null && !topologyAwareRead) {
datanodeList = pipeline.getNodes();
int getBlockDNCacheIndex = datanodeList.indexOf(cachedDN);
if (getBlockDNCacheIndex > 0) {
// Pull the Cached DN to the top of the DN list
Collections.swap(datanodeList, 0, getBlockDNCacheIndex);
}
}
}
if (datanodeList == null) {
if (topologyAwareRead) {
datanodeList = pipeline.getNodesInOrder();
} else {
datanodeList = pipeline.getNodes();
// Shuffle datanode list so that clients do not read in the same order
// every time.
Collections.shuffle(datanodeList);
}
}
boolean allInService = datanodeList.stream()
.allMatch(dn -> dn.getPersistedOpState() == NodeOperationalState.IN_SERVICE);
if (!allInService) {
datanodeList = sortDatanodeByOperationalState(datanodeList);
}
for (DatanodeDetails dn : datanodeList) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing command {} on datanode {}",
processForDebug(request), dn);
}
// In case the command gets retried on a 2nd datanode,
// sendCommandAsyncCall will create a new channel and async stub
// in case these don't exist for the specific datanode.
reply.addDatanode(dn);
responseProto = sendCommandAsync(request, dn).getResponse().get();
if (validators != null && !validators.isEmpty()) {
for (Validator validator : validators) {
validator.accept(request, responseProto);
}
}
if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
DatanodeBlockID getBlockID = request.getGetBlock().getBlockID();
getBlockDNcache.put(getBlockID, dn);
}
break;
} catch (IOException e) {
ioException = e;
responseProto = null;
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to execute command {} on datanode {}",
processForDebug(request), dn, e);
}
} catch (ExecutionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to execute command {} on datanode {}",
processForDebug(request), dn, e);
}
if (Status.fromThrowable(e.getCause()).getCode()
== Status.UNAUTHENTICATED.getCode()) {
throw new SCMSecurityException("Failed to authenticate with "
+ "GRPC XceiverServer with Ozone block token.");
}
ioException = new IOException(e);
} catch (InterruptedException e) {
LOG.error("Command execution was interrupted ", e);
Thread.currentThread().interrupt();
}
}
if (responseProto != null) {
reply.setResponse(CompletableFuture.completedFuture(responseProto));
return reply;
} else {
Objects.requireNonNull(ioException);
String message = "Failed to execute command {}";
if (LOG.isDebugEnabled()) {
LOG.debug(message + " on the pipeline {}.",
processForDebug(request), pipeline);
} else {
LOG.warn(message + " on the pipeline {}.",
request.getCmdType(), pipeline);
}
throw ioException;
}
}