public boolean computeMapping()

in helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithmV2.java [77:172]


  public boolean computeMapping(Map<Node, List<String>> nodeToPartitionMap, int randomSeed) {
    // Records exceed partitions
    TreeMap<String, Integer> toBeReassigned = new TreeMap<>();

    // Calculate total partitions that need to be calculated
    long totalReplicaCount = 0;
    for (List<String> partitions : nodeToPartitionMap.values()) {
      totalReplicaCount += partitions.size();
    }
    if (totalReplicaCount == 0 || _replica > _faultZoneWeight.size()) {
      return false;
    }

    // instance -> target (ideal) partition count
    Map<Node, Float> targetPartitionCount = new HashMap<>();
    for (Node liveInstance : _instanceFaultZone.keySet()) {
      long zoneWeight = _faultZoneWeight.get(_instanceFaultZone.get(liveInstance));
      float instanceRatioInZone = ((float) _instanceWeight.get(liveInstance)) / zoneWeight;
      // 1. if replica = fault zone, fault zone weight does not count, so calculate according to fault zone count.
      // 2. else, should consider fault zone weight to calculate expected threshold.
      float zonePartitions;
      if (_replica == _faultZoneWeight.size()) {
        zonePartitions = ((float) totalReplicaCount) / _faultZoneWeight.size();
      } else {
        zonePartitions = ((float) totalReplicaCount) * zoneWeight / _totalWeight;
      }
      targetPartitionCount.put(liveInstance, instanceRatioInZone * zonePartitions);
    }

    int totalOverflows = 0;
    Map<Node, Integer> maxZoneOverflows = new HashMap<>();
    if (_mode.equals(Mode.MINIMIZE_MOVEMENT)) {
      // Note that keep the spikes if possible will hurt evenness. So only do this for MINIMIZE_MOVEMENT mode

      // Calculate the expected spikes
      // Assign spikes to each zone according to zone weight
      totalOverflows = (int) totalReplicaCount % _instanceFaultZone.size();
      for (Node faultZone : _faultZoneWeight.keySet()) {
        float zoneWeight = _faultZoneWeight.get(faultZone);
        maxZoneOverflows.put(faultZone,
            (int) Math.ceil(((float) totalOverflows) * zoneWeight / _totalWeight));
      }
    }
    Iterator<Node> nodeIter = nodeToPartitionMap.keySet().iterator();
    while (nodeIter.hasNext()) {
      Node instance = nodeIter.next();
      // Cleanup the existing mapping. Remove all non-active nodes from the mapping
      if (!_instanceFaultZone.containsKey(instance)) {
        List<String> partitions = nodeToPartitionMap.get(instance);
        addToReAssignPartition(toBeReassigned, partitions);
        partitions.clear();
        nodeIter.remove();
      }
    }

    List<Node> orderedInstances = new ArrayList<>(_instanceFaultZone.keySet());
    // Different resource should shuffle nodes in different ways.
    Collections.shuffle(orderedInstances, new Random(randomSeed));
    for (Node instance : orderedInstances) {
      if (!nodeToPartitionMap.containsKey(instance)) {
        continue;
      }
      // Cut off the exceed partitions compared with target partition count.
      List<String> partitions = nodeToPartitionMap.get(instance);
      int target = (int) (Math.floor(targetPartitionCount.get(instance)));
      if (partitions.size() > target) {
        Integer maxZoneOverflow = maxZoneOverflows.get(_instanceFaultZone.get(instance));
        if (maxZoneOverflow != null && maxZoneOverflow > 0 && totalOverflows > 0) {
          // When fault zone has overflow capacity AND there are still remaining overflow partitions
          target = (int) (Math.ceil(targetPartitionCount.get(instance)));
          maxZoneOverflows.put(_instanceFaultZone.get(instance), maxZoneOverflow - 1);
          totalOverflows--;
        }

        // Shuffle partitions to randomly pickup exceed ones. Ensure the algorithm generates consistent results when the inputs are the same.
        Collections.shuffle(partitions, new Random(instance.hashCode() * 31 + randomSeed));
        addToReAssignPartition(toBeReassigned, partitions.subList(target, partitions.size()));

        // Put the remaining partitions to the assignment, and record in fault zone partition list
        List<String> remainingPartitions = new ArrayList<>(partitions.subList(0, target));
        partitions.clear();
        nodeToPartitionMap.put(instance, remainingPartitions);
      }
      _faultZonePartitionMap.get(_instanceFaultZone.get(instance))
          .addAll(nodeToPartitionMap.get(instance));
    }

    // Reassign if any instances have space left.
    // Assign partition according to the target capacity, CAP at "Math.floor(target) + adjustment"
    int adjustment = 0;
    while (!toBeReassigned.isEmpty() && adjustment <= MAX_ADJUSTMENT) {
      partitionDealing(_instanceFaultZone.keySet(), toBeReassigned, _faultZonePartitionMap,
          _instanceFaultZone, nodeToPartitionMap, targetPartitionCount, randomSeed, adjustment++);
    }
    return toBeReassigned.isEmpty();
  }