in helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java [104:202]
private ZNRecord computeBestPartitionAssignment(List<String> allNodes, List<String> liveNodes,
Map<String, Map<String, String>> currentMapping, ResourceControllerDataProvider clusterData) {
// Round 1: Calculate mapping using the base strategy.
// Note to use all nodes for minimizing the influence of live node changes to mapping.
ZNRecord origAssignment = getBaseRebalanceStrategy()
.computePartitionAssignment(allNodes, allNodes, currentMapping, clusterData);
Map<String, List<String>> origPartitionMap = origAssignment.getListFields();
// For logging only
String eventId = clusterData.getClusterEventId();
// Try to re-assign if the original map is not empty
if (!origPartitionMap.isEmpty()) {
Map<String, List<Node>> finalPartitionMap = null;
Topology allNodeTopo =
new Topology(allNodes, allNodes, clusterData.getAssignableInstanceConfigMap(),
clusterData.getClusterConfig(), true);
// Transform current assignment to instance->partitions map, and get total partitions
Map<Node, List<String>> nodeToPartitionMap =
convertPartitionMap(origPartitionMap, allNodeTopo);
// Round 2: Rebalance mapping using card dealing algorithm. For ensuring evenness distribution.
CardDealingAdjustmentAlgorithmV2 cardDealer = getCardDealingAlgorithm(allNodeTopo);
if (cardDealer.computeMapping(nodeToPartitionMap, _resourceName.hashCode())) {
// Round 3: Reorder preference Lists to ensure participants' orders (so as the states) are uniform.
finalPartitionMap = shufflePreferenceList(nodeToPartitionMap);
if (!liveNodes.containsAll(allNodes)) {
try {
// Round 4: Re-mapping the partitions on non-live nodes using consistent hashing for reducing movement.
ConsistentHashingAdjustmentAlgorithm hashPlacement =
new ConsistentHashingAdjustmentAlgorithm(allNodeTopo, liveNodes);
if (hashPlacement.computeMapping(nodeToPartitionMap, _resourceName.hashCode())) {
// Since mapping is changed by hashPlacement, need to adjust nodes order.
Map<String, List<Node>> adjustedPartitionMap =
convertAssignment(nodeToPartitionMap);
for (String partition : adjustedPartitionMap.keySet()) {
List<Node> preSelectedList = finalPartitionMap.get(partition);
Set<Node> adjustedNodeList =
new HashSet<>(adjustedPartitionMap.get(partition));
List<Node> finalNodeList = adjustedPartitionMap.get(partition);
int index = 0;
// 1. Add the ones in pre-selected node list first, in order
for (Node node : preSelectedList) {
if (adjustedNodeList.remove(node)) {
finalNodeList.set(index++, node);
}
}
// 2. Add the rest of nodes to the map
for (Node node : adjustedNodeList) {
finalNodeList.set(index++, node);
}
}
finalPartitionMap = adjustedPartitionMap;
} else {
// Adjustment failed, the final partition map is not valid
finalPartitionMap = null;
}
} catch (ExecutionException e) {
LogUtil.logError(_logger, eventId,
"Failed to perform consistent hashing partition assigner.", e);
finalPartitionMap = null;
}
}
}
if (null != finalPartitionMap) {
ZNRecord result = new ZNRecord(_resourceName);
Map<String, List<String>> resultPartitionMap = new HashMap<>();
for (String partitionName : finalPartitionMap.keySet()) {
List<String> instanceNames = new ArrayList<>();
for (Node node : finalPartitionMap.get(partitionName)) {
if (node instanceof InstanceNode) {
instanceNames.add(((InstanceNode) node).getInstanceName());
} else {
LogUtil.logError(_logger, eventId,
String.format("Selected node is not associated with an instance: %s", node));
}
}
resultPartitionMap.put(partitionName, instanceNames);
}
result.setListFields(resultPartitionMap);
return result;
}
}
// Force even is not possible, fallback to use default strategy
if (_logger.isDebugEnabled()) {
LogUtil.logDebug(_logger, eventId,
"Force even distribution is not possible, using the default strategy: "
+ getBaseRebalanceStrategy().getClass().getSimpleName());
}
if (liveNodes.equals(allNodes)) {
return origAssignment;
} else {
// need to re-calculate since node list is different.
return getBaseRebalanceStrategy()
.computePartitionAssignment(allNodes, liveNodes, currentMapping, clusterData);
}
}