in helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithmV2.java [77:172]
public boolean computeMapping(Map<Node, List<String>> nodeToPartitionMap, int randomSeed) {
// Records exceed partitions
TreeMap<String, Integer> toBeReassigned = new TreeMap<>();
// Calculate total partitions that need to be calculated
long totalReplicaCount = 0;
for (List<String> partitions : nodeToPartitionMap.values()) {
totalReplicaCount += partitions.size();
}
if (totalReplicaCount == 0 || _replica > _faultZoneWeight.size()) {
return false;
}
// instance -> target (ideal) partition count
Map<Node, Float> targetPartitionCount = new HashMap<>();
for (Node liveInstance : _instanceFaultZone.keySet()) {
long zoneWeight = _faultZoneWeight.get(_instanceFaultZone.get(liveInstance));
float instanceRatioInZone = ((float) _instanceWeight.get(liveInstance)) / zoneWeight;
// 1. if replica = fault zone, fault zone weight does not count, so calculate according to fault zone count.
// 2. else, should consider fault zone weight to calculate expected threshold.
float zonePartitions;
if (_replica == _faultZoneWeight.size()) {
zonePartitions = ((float) totalReplicaCount) / _faultZoneWeight.size();
} else {
zonePartitions = ((float) totalReplicaCount) * zoneWeight / _totalWeight;
}
targetPartitionCount.put(liveInstance, instanceRatioInZone * zonePartitions);
}
int totalOverflows = 0;
Map<Node, Integer> maxZoneOverflows = new HashMap<>();
if (_mode.equals(Mode.MINIMIZE_MOVEMENT)) {
// Note that keep the spikes if possible will hurt evenness. So only do this for MINIMIZE_MOVEMENT mode
// Calculate the expected spikes
// Assign spikes to each zone according to zone weight
totalOverflows = (int) totalReplicaCount % _instanceFaultZone.size();
for (Node faultZone : _faultZoneWeight.keySet()) {
float zoneWeight = _faultZoneWeight.get(faultZone);
maxZoneOverflows.put(faultZone,
(int) Math.ceil(((float) totalOverflows) * zoneWeight / _totalWeight));
}
}
Iterator<Node> nodeIter = nodeToPartitionMap.keySet().iterator();
while (nodeIter.hasNext()) {
Node instance = nodeIter.next();
// Cleanup the existing mapping. Remove all non-active nodes from the mapping
if (!_instanceFaultZone.containsKey(instance)) {
List<String> partitions = nodeToPartitionMap.get(instance);
addToReAssignPartition(toBeReassigned, partitions);
partitions.clear();
nodeIter.remove();
}
}
List<Node> orderedInstances = new ArrayList<>(_instanceFaultZone.keySet());
// Different resource should shuffle nodes in different ways.
Collections.shuffle(orderedInstances, new Random(randomSeed));
for (Node instance : orderedInstances) {
if (!nodeToPartitionMap.containsKey(instance)) {
continue;
}
// Cut off the exceed partitions compared with target partition count.
List<String> partitions = nodeToPartitionMap.get(instance);
int target = (int) (Math.floor(targetPartitionCount.get(instance)));
if (partitions.size() > target) {
Integer maxZoneOverflow = maxZoneOverflows.get(_instanceFaultZone.get(instance));
if (maxZoneOverflow != null && maxZoneOverflow > 0 && totalOverflows > 0) {
// When fault zone has overflow capacity AND there are still remaining overflow partitions
target = (int) (Math.ceil(targetPartitionCount.get(instance)));
maxZoneOverflows.put(_instanceFaultZone.get(instance), maxZoneOverflow - 1);
totalOverflows--;
}
// Shuffle partitions to randomly pickup exceed ones. Ensure the algorithm generates consistent results when the inputs are the same.
Collections.shuffle(partitions, new Random(instance.hashCode() * 31 + randomSeed));
addToReAssignPartition(toBeReassigned, partitions.subList(target, partitions.size()));
// Put the remaining partitions to the assignment, and record in fault zone partition list
List<String> remainingPartitions = new ArrayList<>(partitions.subList(0, target));
partitions.clear();
nodeToPartitionMap.put(instance, remainingPartitions);
}
_faultZonePartitionMap.get(_instanceFaultZone.get(instance))
.addAll(nodeToPartitionMap.get(instance));
}
// Reassign if any instances have space left.
// Assign partition according to the target capacity, CAP at "Math.floor(target) + adjustment"
int adjustment = 0;
while (!toBeReassigned.isEmpty() && adjustment <= MAX_ADJUSTMENT) {
partitionDealing(_instanceFaultZone.keySet(), toBeReassigned, _faultZonePartitionMap,
_instanceFaultZone, nodeToPartitionMap, targetPartitionCount, randomSeed, adjustment++);
}
return toBeReassigned.isEmpty();
}