in hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java [207:372]
protected List<DatanodeDetails> chooseDatanodesInternal(
List<DatanodeDetails> usedNodes,
final List<DatanodeDetails> excludedNodes,
final List<DatanodeDetails> favoredNodes,
final int nodesRequired, final long metadataSizeRequired,
final long dataSizeRequired) throws SCMException {
if (nodesRequired <= 0) {
String errorMsg = "num of nodes required to choose should bigger" +
"than 0, but the given num is " + nodesRequired;
throw new SCMException(errorMsg, null);
}
if (metrics != null) {
metrics.incrDatanodeRequestCount(nodesRequired);
}
int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size();
int usedNodesCount = usedNodes == null ? 0 : usedNodes.size();
List<Node> availableNodes = networkTopology.getNodes(
networkTopology.getMaxLevel());
int totalNodesCount = availableNodes.size();
if (excludedNodes != null) {
availableNodes.removeAll(excludedNodes);
}
if (usedNodes != null) {
availableNodes.removeAll(usedNodes);
}
if (availableNodes.size() < nodesRequired) {
throw new SCMException("No enough datanodes to choose. " +
"TotalNodes = " + totalNodesCount +
" AvailableNodes = " + availableNodes.size() +
" RequiredNodes = " + nodesRequired +
" ExcludedNodes = " + excludedNodesCount +
" UsedNodes = " + usedNodesCount,
FAILED_TO_FIND_SUITABLE_NODE);
}
List<DatanodeDetails> mutableFavoredNodes = new ArrayList<>();
if (favoredNodes != null) {
// Generate mutableFavoredNodes, only stores valid favoredNodes
for (DatanodeDetails datanodeDetails : favoredNodes) {
if (isValidNode(datanodeDetails, metadataSizeRequired,
dataSizeRequired)) {
mutableFavoredNodes.add(datanodeDetails);
}
}
Collections.shuffle(mutableFavoredNodes);
}
if (excludedNodes != null) {
mutableFavoredNodes.removeAll(excludedNodes);
}
if (usedNodes == null) {
usedNodes = Collections.emptyList();
}
List<Node> racks = getAllRacks();
// usedRacksCntMap maps a rack to the number of usedNodes it contains
Map<Node, Integer> usedRacksCntMap = new HashMap<>();
for (Node node : usedNodes) {
Node rack = networkTopology.getAncestor(node, RACK_LEVEL);
if (rack != null) {
usedRacksCntMap.merge(rack, 1, Math::addExact);
}
}
List<Node> unavailableRacks = findRacksWithOnlyExcludedNodes(excludedNodes,
usedRacksCntMap);
for (Node rack : unavailableRacks) {
racks.remove(rack);
}
int requiredReplicationFactor = usedNodes.size() + nodesRequired;
int numberOfRacksRequired = getRequiredRackCount(requiredReplicationFactor,
unavailableRacks.size());
int additionalRacksRequired =
Math.min(nodesRequired, numberOfRacksRequired - usedRacksCntMap.size());
LOG.debug("Additional nodes required: {}. Additional racks required: {}.",
nodesRequired, additionalRacksRequired);
int maxReplicasPerRack = getMaxReplicasPerRack(requiredReplicationFactor,
numberOfRacksRequired);
LOG.debug("According to required replication factor: {}, and total number" +
" of racks required: {}, max replicas per rack is {}.",
requiredReplicationFactor, numberOfRacksRequired, maxReplicasPerRack);
// For excluded nodes, we sort their racks at rear
racks = sortRackWithExcludedNodes(racks, excludedNodes, usedRacksCntMap);
List<Node> unavailableNodes = new ArrayList<>(usedNodes);
if (excludedNodes != null) {
unavailableNodes.addAll(excludedNodes);
}
LOG.debug("Available racks excluding racks with used nodes: {}.", racks);
if (racks.size() < additionalRacksRequired) {
String reason = "Number of existing racks: " + racks.size()
+ "is less than additional required number of racks to choose: "
+ additionalRacksRequired + " do not match.";
LOG.warn("Placement policy cannot choose the enough racks. {}"
+ "Total number of Required Racks: {} Used Racks Count:" +
" {}, Required Nodes count: {}",
reason, numberOfRacksRequired, usedRacksCntMap.size(),
nodesRequired);
throw new SCMException(reason,
FAILED_TO_FIND_SUITABLE_NODE);
}
Set<DatanodeDetails> chosenNodes = new LinkedHashSet<>(
chooseNodesFromRacks(racks, unavailableNodes,
mutableFavoredNodes, additionalRacksRequired,
metadataSizeRequired, dataSizeRequired, maxReplicasPerRack,
usedRacksCntMap, maxReplicasPerRack));
if (chosenNodes.size() < additionalRacksRequired) {
String reason = "Chosen nodes size from Unique Racks: " + chosenNodes
.size() + ", but required nodes to choose from Unique Racks: "
+ additionalRacksRequired + " do not match.";
LOG.warn("Placement policy could not choose the enough nodes from " +
"available racks. {} Available racks count: {},"
+ " Excluded nodes count: {}, UsedNodes count: {}",
reason, racks.size(), excludedNodesCount, usedNodesCount);
throw new SCMException(reason,
SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
}
if (chosenNodes.size() < nodesRequired) {
racks.addAll(usedRacksCntMap.keySet());
racks = sortRackWithExcludedNodes(racks, excludedNodes, usedRacksCntMap);
racks.addAll(usedRacksCntMap.keySet());
LOG.debug("Available racks considering racks with used and exclude " +
"nodes: {}.", racks);
chosenNodes.addAll(chooseNodesFromRacks(racks, unavailableNodes,
mutableFavoredNodes, nodesRequired - chosenNodes.size(),
metadataSizeRequired, dataSizeRequired,
Integer.MAX_VALUE, usedRacksCntMap, maxReplicasPerRack));
}
List<DatanodeDetails> result = new ArrayList<>(chosenNodes);
if (nodesRequired != chosenNodes.size()) {
String reason = "Chosen nodes size: " + chosenNodes
.size() + ", but required nodes to choose: "
+ nodesRequired + " do not match.";
LOG.warn("Placement policy could not choose the enough nodes."
+ " {} Available nodes count: {}, Excluded nodes count: {}, "
+ " Used nodes count: {}",
reason, totalNodesCount, excludedNodesCount, usedNodesCount);
throw new SCMException(reason,
SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
}
List<DatanodeDetails> newPlacement =
new ArrayList<>(usedNodes.size() + result.size());
newPlacement.addAll(usedNodes);
newPlacement.addAll(chosenNodes);
ContainerPlacementStatus placementStatus =
validateContainerPlacement(newPlacement, requiredReplicationFactor);
if (!placementStatus.isPolicySatisfied()) {
ContainerPlacementStatus initialPlacementStatus =
validateContainerPlacement(usedNodes, requiredReplicationFactor);
if (initialPlacementStatus.misReplicationCount()
< placementStatus.misReplicationCount()) {
String errorMsg = "ContainerPlacementPolicy not met. Misreplication" +
" Reason: " + placementStatus.misReplicatedReason() +
" Initial Used nodes mis-replication Count: " +
initialPlacementStatus.misReplicationCount() +
" Used nodes + Chosen nodes mis-replication Count: " +
placementStatus.misReplicationCount();
throw new SCMException(errorMsg, FAILED_TO_FIND_SUITABLE_NODE);
}
}
LOG.info("Chosen nodes: {}. isPolicySatisfied: {}.", result,
placementStatus.isPolicySatisfied());
return result;
}