protected List chooseDatanodesInternal()

in hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java [207:372]


  protected List<DatanodeDetails> chooseDatanodesInternal(
          List<DatanodeDetails> usedNodes,
          final List<DatanodeDetails> excludedNodes,
          final List<DatanodeDetails> favoredNodes,
          final int nodesRequired, final long metadataSizeRequired,
          final long dataSizeRequired) throws SCMException {
    if (nodesRequired <= 0) {
      String errorMsg = "num of nodes required to choose should bigger" +
          "than 0, but the given num is " + nodesRequired;
      throw new SCMException(errorMsg, null);
    }
    if (metrics != null) {
      metrics.incrDatanodeRequestCount(nodesRequired);
    }
    int excludedNodesCount = excludedNodes == null ? 0 : excludedNodes.size();
    int usedNodesCount = usedNodes == null ? 0 : usedNodes.size();
    List<Node> availableNodes = networkTopology.getNodes(
        networkTopology.getMaxLevel());
    int totalNodesCount = availableNodes.size();
    if (excludedNodes != null) {
      availableNodes.removeAll(excludedNodes);
    }
    if (usedNodes != null) {
      availableNodes.removeAll(usedNodes);
    }
    if (availableNodes.size() < nodesRequired) {
      throw new SCMException("No enough datanodes to choose. " +
          "TotalNodes = " + totalNodesCount +
          " AvailableNodes = " + availableNodes.size() +
          " RequiredNodes = " + nodesRequired +
          " ExcludedNodes = " + excludedNodesCount +
          " UsedNodes = " + usedNodesCount,
          FAILED_TO_FIND_SUITABLE_NODE);
    }
    List<DatanodeDetails> mutableFavoredNodes = new ArrayList<>();
    if (favoredNodes != null) {
      // Generate mutableFavoredNodes, only stores valid favoredNodes
      for (DatanodeDetails datanodeDetails : favoredNodes) {
        if (isValidNode(datanodeDetails, metadataSizeRequired,
            dataSizeRequired)) {
          mutableFavoredNodes.add(datanodeDetails);
        }
      }
      Collections.shuffle(mutableFavoredNodes);
    }
    if (excludedNodes != null) {
      mutableFavoredNodes.removeAll(excludedNodes);
    }
    if (usedNodes == null) {
      usedNodes = Collections.emptyList();
    }
    List<Node> racks = getAllRacks();
    // usedRacksCntMap maps a rack to the number of usedNodes it contains
    Map<Node, Integer> usedRacksCntMap = new HashMap<>();
    for (Node node : usedNodes) {
      Node rack = networkTopology.getAncestor(node, RACK_LEVEL);
      if (rack != null) {
        usedRacksCntMap.merge(rack, 1, Math::addExact);
      }
    }

    List<Node> unavailableRacks = findRacksWithOnlyExcludedNodes(excludedNodes,
        usedRacksCntMap);
    for (Node rack : unavailableRacks) {
      racks.remove(rack);
    }

    int requiredReplicationFactor = usedNodes.size() + nodesRequired;
    int numberOfRacksRequired = getRequiredRackCount(requiredReplicationFactor,
        unavailableRacks.size());
    int additionalRacksRequired =
        Math.min(nodesRequired, numberOfRacksRequired - usedRacksCntMap.size());
    LOG.debug("Additional nodes required: {}. Additional racks required: {}.",
        nodesRequired, additionalRacksRequired);
    int maxReplicasPerRack = getMaxReplicasPerRack(requiredReplicationFactor,
            numberOfRacksRequired);
    LOG.debug("According to required replication factor: {}, and total number" +
            " of racks required: {}, max replicas per rack is {}.",
        requiredReplicationFactor, numberOfRacksRequired, maxReplicasPerRack);
    // For excluded nodes, we sort their racks at rear
    racks = sortRackWithExcludedNodes(racks, excludedNodes, usedRacksCntMap);

    List<Node> unavailableNodes = new ArrayList<>(usedNodes);
    if (excludedNodes != null) {
      unavailableNodes.addAll(excludedNodes);
    }

    LOG.debug("Available racks excluding racks with used nodes: {}.", racks);
    if (racks.size() < additionalRacksRequired) {
      String reason = "Number of existing racks: " + racks.size()
              + "is less than additional required number of racks to choose: "
              + additionalRacksRequired + " do not match.";
      LOG.warn("Placement policy cannot choose the enough racks. {}"
                      + "Total number of Required Racks: {} Used Racks Count:" +
                      " {}, Required Nodes count: {}",
              reason, numberOfRacksRequired, usedRacksCntMap.size(),
              nodesRequired);
      throw new SCMException(reason,
              FAILED_TO_FIND_SUITABLE_NODE);
    }

    Set<DatanodeDetails> chosenNodes = new LinkedHashSet<>(
        chooseNodesFromRacks(racks, unavailableNodes,
            mutableFavoredNodes, additionalRacksRequired,
            metadataSizeRequired, dataSizeRequired, maxReplicasPerRack,
            usedRacksCntMap, maxReplicasPerRack));

    if (chosenNodes.size() < additionalRacksRequired) {
      String reason = "Chosen nodes size from Unique Racks: " + chosenNodes
              .size() + ", but required nodes to choose from Unique Racks: "
              + additionalRacksRequired + " do not match.";
      LOG.warn("Placement policy could not choose the enough nodes from " +
                      "available racks. {} Available racks count: {},"
                      + " Excluded nodes count: {}, UsedNodes count: {}",
              reason, racks.size(), excludedNodesCount, usedNodesCount);
      throw new SCMException(reason,
              SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
    }

    if (chosenNodes.size() < nodesRequired) {
      racks.addAll(usedRacksCntMap.keySet());
      racks = sortRackWithExcludedNodes(racks, excludedNodes, usedRacksCntMap);
      racks.addAll(usedRacksCntMap.keySet());
      LOG.debug("Available racks considering racks with used and exclude " +
              "nodes: {}.", racks);
      chosenNodes.addAll(chooseNodesFromRacks(racks, unavailableNodes,
              mutableFavoredNodes, nodesRequired - chosenNodes.size(),
              metadataSizeRequired, dataSizeRequired,
              Integer.MAX_VALUE, usedRacksCntMap, maxReplicasPerRack));
    }
    List<DatanodeDetails> result = new ArrayList<>(chosenNodes);
    if (nodesRequired != chosenNodes.size()) {
      String reason = "Chosen nodes size: " + chosenNodes
              .size() + ", but required nodes to choose: "
              + nodesRequired + " do not match.";
      LOG.warn("Placement policy could not choose the enough nodes."
               + " {} Available nodes count: {}, Excluded nodes count: {}, "
               + " Used nodes count: {}",
              reason, totalNodesCount, excludedNodesCount, usedNodesCount);
      throw new SCMException(reason,
              SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
    }
    List<DatanodeDetails> newPlacement =
        new ArrayList<>(usedNodes.size() + result.size());
    newPlacement.addAll(usedNodes);
    newPlacement.addAll(chosenNodes);
    ContainerPlacementStatus placementStatus =
        validateContainerPlacement(newPlacement, requiredReplicationFactor);
    if (!placementStatus.isPolicySatisfied()) {
      ContainerPlacementStatus initialPlacementStatus =
          validateContainerPlacement(usedNodes, requiredReplicationFactor);
      if (initialPlacementStatus.misReplicationCount()
              < placementStatus.misReplicationCount()) {
        String errorMsg = "ContainerPlacementPolicy not met. Misreplication" +
                " Reason: " + placementStatus.misReplicatedReason() +
                " Initial Used nodes mis-replication Count: " +
                initialPlacementStatus.misReplicationCount() +
                " Used nodes + Chosen nodes mis-replication Count: " +
                placementStatus.misReplicationCount();
        throw new SCMException(errorMsg, FAILED_TO_FIND_SUITABLE_NODE);
      }
    }
    LOG.info("Chosen nodes: {}. isPolicySatisfied: {}.", result,
        placementStatus.isPolicySatisfied());
    return result;
  }