in helix-core/src/main/java/org/apache/helix/tools/DefaultIdealStateCalculator.java [432:593]
public static Map<String, Object> calculateNextIdealState(List<String> newInstances,
Map<String, Object> previousIdealState) {
// Obtain the master / slave assignment info maps
Collections.sort(newInstances);
Map<String, List<Integer>> previousMasterAssignmentMap =
(Map<String, List<Integer>>) (previousIdealState.get("MasterAssignmentMap"));
Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap =
(Map<String, Map<String, List<Integer>>>) (previousIdealState.get("SlaveAssignmentMap"));
List<String> oldInstances = new ArrayList<String>();
for (String oldInstance : previousMasterAssignmentMap.keySet()) {
oldInstances.add(oldInstance);
}
int previousInstanceNum = previousMasterAssignmentMap.size();
int partitions = (Integer) (previousIdealState.get("partitions"));
// TODO: take weight into account when calculate this
int totalMasterParitionsToMove =
partitions * (newInstances.size()) / (previousInstanceNum + newInstances.size());
int numMastersFromEachNode = totalMasterParitionsToMove / previousInstanceNum;
int remain = totalMasterParitionsToMove % previousInstanceNum;
// Note that when remain > 0, we should make [remain] moves with (numMastersFromEachNode + 1)
// partitions.
// And we should first choose those (numMastersFromEachNode + 1) moves from the instances that
// has more
// master partitions
List<Integer> masterPartitionListToMove = new ArrayList<Integer>();
// For corresponding moved slave partitions, keep track of their original location; the new node
// does not
// need to migrate all of them.
Map<String, List<Integer>> slavePartitionsToMoveMap = new TreeMap<String, List<Integer>>();
// Make sure that the instances that holds more master partitions are put in front
List<String> bigList = new ArrayList<String>(), smallList = new ArrayList<String>();
for (String oldInstance : previousMasterAssignmentMap.keySet()) {
List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
if (masterAssignmentList.size() > numMastersFromEachNode) {
bigList.add(oldInstance);
} else {
smallList.add(oldInstance);
}
}
// "sort" the list, such that the nodes that has more master partitions moves more partitions to
// the
// new added batch of instances.
bigList.addAll(smallList);
int totalSlaveMoves = 0;
for (String oldInstance : bigList) {
List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
int numToChoose = numMastersFromEachNode;
if (remain > 0) {
numToChoose = numMastersFromEachNode + 1;
remain--;
}
// randomly remove numToChoose of master partitions to the new added nodes
ArrayList<Integer> masterPartionsMoved = new ArrayList<Integer>();
randomSelect(masterAssignmentList, masterPartionsMoved, numToChoose);
masterPartitionListToMove.addAll(masterPartionsMoved);
Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(oldInstance);
removeFromSlaveAssignmentMap(slaveAssignmentMap, masterPartionsMoved,
slavePartitionsToMoveMap);
// Make sure that for old instances, the slave placement map is evenly distributed
// Trace the "local slave moves", which should together contribute to most of the slave
// migrations
int movesWithinInstance =
migrateSlaveAssignMapToNewInstances(slaveAssignmentMap, newInstances);
// System.out.println("local moves: "+ movesWithinInstance);
totalSlaveMoves += movesWithinInstance;
}
// System.out.println("local slave moves total: "+ totalSlaveMoves);
// calculate the master /slave assignment for the new added nodes
// We already have the list of master partitions that will migrate to new batch of instances,
// shuffle the partitions and assign them to new instances
Collections.shuffle(masterPartitionListToMove, new Random(12345));
for (int i = 0; i < newInstances.size(); i++) {
String newInstance = newInstances.get(i);
List<Integer> masterPartitionList = new ArrayList<Integer>();
for (int j = 0; j < masterPartitionListToMove.size(); j++) {
if (j % newInstances.size() == i) {
masterPartitionList.add(masterPartitionListToMove.get(j));
}
}
Map<String, List<Integer>> slavePartitionMap = new TreeMap<String, List<Integer>>();
for (String oldInstance : oldInstances) {
slavePartitionMap.put(oldInstance, new ArrayList<Integer>());
}
// Build the slave assignment map for the new instance, based on the saved information
// about those slave partition locations in slavePartitionsToMoveMap
for (Integer x : masterPartitionList) {
for (String oldInstance : slavePartitionsToMoveMap.keySet()) {
List<Integer> slaves = slavePartitionsToMoveMap.get(oldInstance);
if (slaves.contains(x)) {
slavePartitionMap.get(oldInstance).add(x);
}
}
}
// add entry for other new instances into the slavePartitionMap
List<String> otherNewInstances = new ArrayList<String>();
for (String instance : newInstances) {
if (!instance.equalsIgnoreCase(newInstance)) {
otherNewInstances.add(instance);
}
}
// Make sure that slave partitions are evenly distributed
migrateSlaveAssignMapToNewInstances(slavePartitionMap, otherNewInstances);
// Update the result in the result map. We can reuse the input previousIdealState map as
// the result.
previousMasterAssignmentMap.put(newInstance, masterPartitionList);
nodeSlaveAssignmentMap.put(newInstance, slavePartitionMap);
}
/*
* // Print content of the master/ slave assignment maps
* for(String instanceName : previousMasterAssignmentMap.keySet())
* {
* System.out.println(instanceName+":");
* for(Integer x : previousMasterAssignmentMap.get(instanceName))
* {
* System.out.print(x+" ");
* }
* System.out.println("\nmaster partition moved:");
* System.out.println();
* System.out.println("Slave assignment:");
* Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
* for(String slaveName : slaveAssignmentMap.keySet())
* {
* System.out.print("\t" + slaveName +":\n\t" );
* for(Integer x : slaveAssignmentMap.get(slaveName))
* {
* System.out.print(x + " ");
* }
* System.out.println("\n");
* }
* }
* System.out.println("Master partitions migrated to new instances");
* for(Integer x : masterPartitionListToMove)
* {
* System.out.print(x+" ");
* }
* System.out.println();
* System.out.println("Slave partitions migrated to new instances");
* for(String oldInstance : slavePartitionsToMoveMap.keySet())
* {
* System.out.print(oldInstance + ": ");
* for(Integer x : slavePartitionsToMoveMap.get(oldInstance))
* {
* System.out.print(x+" ");
* }
* System.out.println();
* }
*/
return previousIdealState;
}