in pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java [273:507]
private void replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) {
int numPools = pools.size();
int numReplicaGroups = getNumReplicaGroups();
Map<String, Integer> instanceToPoolMap = new HashMap<>();
for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
int pool = entry.getKey();
for (InstanceConfig instanceConfig : entry.getValue()) {
instanceToPoolMap.put(instanceConfig.getInstanceName(), pool);
}
}
// Calculate the mapping from pool to replica-groups assigned to the pool
List<Set<String>> replicaGroupIdToExistingInstancesMap = new ArrayList<>(numReplicaGroups);
Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
int maxReplicaGroupsPerPool = (numReplicaGroups + numPools - 1) / numPools;
int startIndex = Math.abs(tableNameHash % numPools);
int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
// For each replica-group, gather number of existing instances within each pool
Set<String> existingInstanceSet = new HashSet<>();
replicaGroupIdToExistingInstancesMap.add(existingInstanceSet);
Map<Integer, Integer> poolToNumExistingInstancesMap = new TreeMap<>();
if (replicaGroupId < existingNumReplicaGroups) {
for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
existingInstanceSet.addAll(existingInstances);
for (String existingInstance : existingInstances) {
Integer existingPool = instanceToPoolMap.get(existingInstance);
if (existingPool != null) {
poolToNumExistingInstancesMap.merge(existingPool, 1, Integer::sum);
}
}
}
}
// Sort the pools based on the number of existing instances in the pool in descending order, then use the table
// name hash to break even
// Triple stores (pool, numExistingInstances, poolIndex) for sorting
List<Triple<Integer, Integer, Integer>> triples = new ArrayList<>(numPools);
for (int i = 0; i < numPools; i++) {
int pool = pools.get((startIndex + replicaGroupId + i) % numPools);
triples.add(Triple.of(pool, poolToNumExistingInstancesMap.getOrDefault(pool, 0), i));
}
triples.sort((o1, o2) -> {
int result = Integer.compare(o2.getMiddle(), o1.getMiddle());
return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight());
});
for (Triple<Integer, Integer, Integer> triple : triples) {
int pool = triple.getLeft();
List<Integer> replicaGroupIds = poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>());
if (replicaGroupIds.size() < maxReplicaGroupsPerPool) {
replicaGroupIds.add(replicaGroupId);
break;
}
}
}
LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
_tableNameWithType);
int numInstancesPerReplicaGroup =
getNumInstancesPerReplicaGroup(poolToInstanceConfigsMap, poolToReplicaGroupIdsMap);
LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup,
_tableNameWithType);
int numPartitions = getNumPartitions();
int numInstancesPerPartition = getNumInstancesPerPartition(numInstancesPerReplicaGroup);
LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
numPartitions, numInstancesPerPartition, _tableNameWithType);
List<List<String>> replicaGroupIdToInstancesMap = new ArrayList<>(numReplicaGroups);
for (int i = 0; i < numReplicaGroups; i++) {
replicaGroupIdToInstancesMap.add(new ArrayList<>(numInstancesPerReplicaGroup));
}
for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
// For each pool, keep the existing instances that are still alive within each replica-group
int pool = entry.getKey();
List<Integer> replicaGroupIds = entry.getValue();
List<String> newInstances = new ArrayList<>();
for (InstanceConfig instanceConfig : poolToInstanceConfigsMap.get(pool)) {
String instanceName = instanceConfig.getInstanceName();
boolean isExistingInstance = false;
for (int replicaGroupId : replicaGroupIds) {
List<String> instances = replicaGroupIdToInstancesMap.get(replicaGroupId);
if (instances.size() == numInstancesPerReplicaGroup) {
continue;
}
if (replicaGroupIdToExistingInstancesMap.get(replicaGroupId).contains(instanceName)) {
instances.add(instanceName);
isExistingInstance = true;
break;
}
}
if (!isExistingInstance) {
newInstances.add(instanceName);
}
}
// Fill the vacant positions with the new instances. First fill the replica groups with the least instances, then
// use round-robin to assign instances to each replica-group so that they get instances with similar picking
// priority.
int numInstancesToFill = numInstancesPerReplicaGroup * replicaGroupIds.size();
for (int replicaGroupId : replicaGroupIds) {
numInstancesToFill -= replicaGroupIdToInstancesMap.get(replicaGroupId).size();
}
for (int i = 0; i < numInstancesToFill; i++) {
int leastNumInstances = Integer.MAX_VALUE;
int replicaGroupIdWithLeastInstances = -1;
for (int replicaGroupId : replicaGroupIds) {
int numInstances = replicaGroupIdToInstancesMap.get(replicaGroupId).size();
if (numInstances < leastNumInstances) {
leastNumInstances = numInstances;
replicaGroupIdWithLeastInstances = replicaGroupId;
}
}
replicaGroupIdToInstancesMap.get(replicaGroupIdWithLeastInstances).add(newInstances.get(i));
}
}
if (numPartitions == 1) {
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId);
if (replicaGroupId < existingNumReplicaGroups) {
List<String> existingInstances = _existingInstancePartitions.getInstances(0, replicaGroupId);
LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(instancesInReplicaGroup);
List<String> instances =
selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup, candidateInstances, existingInstances);
LOGGER.info(
"Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, existing instances: {}",
instances, replicaGroupId, _tableNameWithType, existingInstances);
instancePartitions.setInstances(0, replicaGroupId, instances);
} else {
LOGGER.info("Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, "
+ "there is no existing instances", instancesInReplicaGroup, replicaGroupId, _tableNameWithType);
instancePartitions.setInstances(0, replicaGroupId, instancesInReplicaGroup);
}
}
} else {
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId);
if (replicaGroupId < existingNumReplicaGroups) {
int maxNumPartitionsPerInstance =
(numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
Map<String, Integer> instanceToNumPartitionsMap =
Maps.newHashMapWithExpectedSize(numInstancesPerReplicaGroup);
for (String instance : instancesInReplicaGroup) {
instanceToNumPartitionsMap.put(instance, 0);
}
List<List<String>> partitionIdToInstancesMap = new ArrayList<>(numPartitions);
List<Set<String>> partitionIdToInstanceSetMap = new ArrayList<>(numPartitions);
List<List<String>> partitionIdToExistingInstancesMap = new ArrayList<>(existingNumPartitions);
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
// Initialize the list with empty positions to fill
List<String> instances = new ArrayList<>(numInstancesPerPartition);
for (int i = 0; i < numInstancesPerPartition; i++) {
instances.add(null);
}
partitionIdToInstancesMap.add(instances);
Set<String> instanceSet = Sets.newHashSetWithExpectedSize(numInstancesPerPartition);
partitionIdToInstanceSetMap.add(instanceSet);
// Keep the existing instances that are still alive
if (partitionId < existingNumPartitions) {
List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
partitionIdToExistingInstancesMap.add(existingInstances);
int numInstancesToCheck = Math.min(numInstancesPerPartition, existingInstances.size());
for (int i = 0; i < numInstancesToCheck; i++) {
String existingInstance = existingInstances.get(i);
Integer numPartitionsOnInstance = instanceToNumPartitionsMap.get(existingInstance);
if (numPartitionsOnInstance != null && numPartitionsOnInstance < maxNumPartitionsPerInstance) {
instances.set(i, existingInstance);
instanceSet.add(existingInstance);
instanceToNumPartitionsMap.put(existingInstance, numPartitionsOnInstance + 1);
}
}
}
}
// Fill the vacant positions with instance that serves the least partitions
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
List<String> instances = partitionIdToInstancesMap.get(partitionId);
Set<String> instanceSet = partitionIdToInstanceSetMap.get(partitionId);
int numInstancesToFill = numInstancesPerPartition - instanceSet.size();
if (numInstancesToFill > 0) {
// Triple stores (instance, numPartitionsOnInstance, instanceIndex) for sorting
List<Triple<String, Integer, Integer>> triples = new ArrayList<>(numInstancesPerReplicaGroup);
for (int i = 0; i < numInstancesPerReplicaGroup; i++) {
String instance = instancesInReplicaGroup.get(i);
if (!instanceSet.contains(instance)) {
triples.add(Triple.of(instance, instanceToNumPartitionsMap.get(instance), i));
}
}
triples.sort((o1, o2) -> {
int result = Integer.compare(o1.getMiddle(), o2.getMiddle());
return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight());
});
int instanceIdToFill = 0;
for (int i = 0; i < numInstancesPerPartition; i++) {
if (instances.get(i) == null) {
String instance = triples.get(instanceIdToFill++).getLeft();
instances.set(i, instance);
instanceToNumPartitionsMap.put(instance, instanceToNumPartitionsMap.get(instance) + 1);
}
}
}
if (partitionId < existingNumPartitions) {
LOGGER.info(
"Selecting instances: {} for replica-group: {}, partition: {} for table: {}, existing instances: {}",
instances, replicaGroupId, partitionId, _tableNameWithType,
partitionIdToExistingInstancesMap.get(partitionId));
} else {
LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, "
+ "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType);
}
instancePartitions.setInstances(partitionId, replicaGroupId, instances);
}
} else {
// Assign consecutive instances within a replica-group to each partition
int instanceIdInReplicaGroup = 0;
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
List<String> instances = new ArrayList<>(numInstancesPerPartition);
for (int i = 0; i < numInstancesPerPartition; i++) {
instances.add(instancesInReplicaGroup.get(instanceIdInReplicaGroup));
instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup;
}
LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, "
+ "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType);
instancePartitions.setInstances(partitionId, replicaGroupId, instances);
}
}
}
}
}