private ZNRecord computeBestPartitionAssignment()

in helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java [104:202]


  private ZNRecord computeBestPartitionAssignment(List<String> allNodes, List<String> liveNodes,
      Map<String, Map<String, String>> currentMapping, ResourceControllerDataProvider clusterData) {
    // Round 1: Calculate mapping using the base strategy.
    // Note to use all nodes for minimizing the influence of live node changes to mapping.
    ZNRecord origAssignment = getBaseRebalanceStrategy()
        .computePartitionAssignment(allNodes, allNodes, currentMapping, clusterData);
    Map<String, List<String>> origPartitionMap = origAssignment.getListFields();

    // For logging only
    String eventId = clusterData.getClusterEventId();

    // Try to re-assign if the original map is not empty
    if (!origPartitionMap.isEmpty()) {
      Map<String, List<Node>> finalPartitionMap = null;
      Topology allNodeTopo =
          new Topology(allNodes, allNodes, clusterData.getAssignableInstanceConfigMap(),
              clusterData.getClusterConfig(), true);
      // Transform current assignment to instance->partitions map, and get total partitions
      Map<Node, List<String>> nodeToPartitionMap =
          convertPartitionMap(origPartitionMap, allNodeTopo);
      // Round 2: Rebalance mapping using card dealing algorithm. For ensuring evenness distribution.
      CardDealingAdjustmentAlgorithmV2 cardDealer = getCardDealingAlgorithm(allNodeTopo);
      if (cardDealer.computeMapping(nodeToPartitionMap, _resourceName.hashCode())) {
        // Round 3: Reorder preference Lists to ensure participants' orders (so as the states) are uniform.
        finalPartitionMap = shufflePreferenceList(nodeToPartitionMap);
        if (!liveNodes.containsAll(allNodes)) {
          try {
            // Round 4: Re-mapping the partitions on non-live nodes using consistent hashing for reducing movement.
            ConsistentHashingAdjustmentAlgorithm hashPlacement =
                new ConsistentHashingAdjustmentAlgorithm(allNodeTopo, liveNodes);
            if (hashPlacement.computeMapping(nodeToPartitionMap, _resourceName.hashCode())) {
              // Since mapping is changed by hashPlacement, need to adjust nodes order.
              Map<String, List<Node>> adjustedPartitionMap =
                  convertAssignment(nodeToPartitionMap);
              for (String partition : adjustedPartitionMap.keySet()) {
                List<Node> preSelectedList = finalPartitionMap.get(partition);
                Set<Node> adjustedNodeList =
                    new HashSet<>(adjustedPartitionMap.get(partition));
                List<Node> finalNodeList = adjustedPartitionMap.get(partition);
                int index = 0;
                // 1. Add the ones in pre-selected node list first, in order
                for (Node node : preSelectedList) {
                  if (adjustedNodeList.remove(node)) {
                    finalNodeList.set(index++, node);
                  }
                }
                // 2. Add the rest of nodes to the map
                for (Node node : adjustedNodeList) {
                  finalNodeList.set(index++, node);
                }
              }
              finalPartitionMap = adjustedPartitionMap;
            } else {
              // Adjustment failed, the final partition map is not valid
              finalPartitionMap = null;
            }
          } catch (ExecutionException e) {
            LogUtil.logError(_logger, eventId,
                "Failed to perform consistent hashing partition assigner.", e);
            finalPartitionMap = null;
          }
        }
      }

      if (null != finalPartitionMap) {
        ZNRecord result = new ZNRecord(_resourceName);
        Map<String, List<String>> resultPartitionMap = new HashMap<>();
        for (String partitionName : finalPartitionMap.keySet()) {
          List<String> instanceNames = new ArrayList<>();
          for (Node node : finalPartitionMap.get(partitionName)) {
            if (node instanceof InstanceNode) {
              instanceNames.add(((InstanceNode) node).getInstanceName());
            } else {
              LogUtil.logError(_logger, eventId,
                  String.format("Selected node is not associated with an instance: %s", node));
            }
          }
          resultPartitionMap.put(partitionName, instanceNames);
        }
        result.setListFields(resultPartitionMap);
        return result;
      }
    }

    // Force even is not possible, fallback to use default strategy
    if (_logger.isDebugEnabled()) {
      LogUtil.logDebug(_logger, eventId,
          "Force even distribution is not possible, using the default strategy: "
              + getBaseRebalanceStrategy().getClass().getSimpleName());
    }

    if (liveNodes.equals(allNodes)) {
      return origAssignment;
    } else {
      // need to re-calculate since node list is different.
      return getBaseRebalanceStrategy()
          .computePartitionAssignment(allNodes, liveNodes, currentMapping, clusterData);
    }
  }