private void replicaGroupBasedMinimumMovement()

in pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java [273:507]


  private void replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap,
      InstancePartitions instancePartitions, List<Integer> pools, int tableNameHash) {
    int numPools = pools.size();
    int numReplicaGroups = getNumReplicaGroups();

    Map<String, Integer> instanceToPoolMap = new HashMap<>();
    for (Map.Entry<Integer, List<InstanceConfig>> entry : poolToInstanceConfigsMap.entrySet()) {
      int pool = entry.getKey();
      for (InstanceConfig instanceConfig : entry.getValue()) {
        instanceToPoolMap.put(instanceConfig.getInstanceName(), pool);
      }
    }

    // Calculate the mapping from pool to replica-groups assigned to the pool
    List<Set<String>> replicaGroupIdToExistingInstancesMap = new ArrayList<>(numReplicaGroups);
    Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
    int maxReplicaGroupsPerPool = (numReplicaGroups + numPools - 1) / numPools;
    int startIndex = Math.abs(tableNameHash % numPools);

    int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
    int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
    for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
      // For each replica-group, gather number of existing instances within each pool
      Set<String> existingInstanceSet = new HashSet<>();
      replicaGroupIdToExistingInstancesMap.add(existingInstanceSet);
      Map<Integer, Integer> poolToNumExistingInstancesMap = new TreeMap<>();
      if (replicaGroupId < existingNumReplicaGroups) {
        for (int partitionId = 0; partitionId < existingNumPartitions; partitionId++) {
          List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
          existingInstanceSet.addAll(existingInstances);
          for (String existingInstance : existingInstances) {
            Integer existingPool = instanceToPoolMap.get(existingInstance);
            if (existingPool != null) {
              poolToNumExistingInstancesMap.merge(existingPool, 1, Integer::sum);
            }
          }
        }
      }
      // Sort the pools based on the number of existing instances in the pool in descending order, then use the table
      // name hash to break even
      // Triple stores (pool, numExistingInstances, poolIndex) for sorting
      List<Triple<Integer, Integer, Integer>> triples = new ArrayList<>(numPools);
      for (int i = 0; i < numPools; i++) {
        int pool = pools.get((startIndex + replicaGroupId + i) % numPools);
        triples.add(Triple.of(pool, poolToNumExistingInstancesMap.getOrDefault(pool, 0), i));
      }
      triples.sort((o1, o2) -> {
        int result = Integer.compare(o2.getMiddle(), o1.getMiddle());
        return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight());
      });
      for (Triple<Integer, Integer, Integer> triple : triples) {
        int pool = triple.getLeft();
        List<Integer> replicaGroupIds = poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>());
        if (replicaGroupIds.size() < maxReplicaGroupsPerPool) {
          replicaGroupIds.add(replicaGroupId);
          break;
        }
      }
    }
    LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap,
        _tableNameWithType);

    int numInstancesPerReplicaGroup =
        getNumInstancesPerReplicaGroup(poolToInstanceConfigsMap, poolToReplicaGroupIdsMap);
    LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup,
        _tableNameWithType);
    int numPartitions = getNumPartitions();
    int numInstancesPerPartition = getNumInstancesPerPartition(numInstancesPerReplicaGroup);
    LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}",
        numPartitions, numInstancesPerPartition, _tableNameWithType);

    List<List<String>> replicaGroupIdToInstancesMap = new ArrayList<>(numReplicaGroups);
    for (int i = 0; i < numReplicaGroups; i++) {
      replicaGroupIdToInstancesMap.add(new ArrayList<>(numInstancesPerReplicaGroup));
    }
    for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) {
      // For each pool, keep the existing instances that are still alive within each replica-group
      int pool = entry.getKey();
      List<Integer> replicaGroupIds = entry.getValue();
      List<String> newInstances = new ArrayList<>();
      for (InstanceConfig instanceConfig : poolToInstanceConfigsMap.get(pool)) {
        String instanceName = instanceConfig.getInstanceName();
        boolean isExistingInstance = false;
        for (int replicaGroupId : replicaGroupIds) {
          List<String> instances = replicaGroupIdToInstancesMap.get(replicaGroupId);
          if (instances.size() == numInstancesPerReplicaGroup) {
            continue;
          }
          if (replicaGroupIdToExistingInstancesMap.get(replicaGroupId).contains(instanceName)) {
            instances.add(instanceName);
            isExistingInstance = true;
            break;
          }
        }
        if (!isExistingInstance) {
          newInstances.add(instanceName);
        }
      }
      // Fill the vacant positions with the new instances. First fill the replica groups with the least instances, then
      // use round-robin to assign instances to each replica-group so that they get instances with similar picking
      // priority.
      int numInstancesToFill = numInstancesPerReplicaGroup * replicaGroupIds.size();
      for (int replicaGroupId : replicaGroupIds) {
        numInstancesToFill -= replicaGroupIdToInstancesMap.get(replicaGroupId).size();
      }
      for (int i = 0; i < numInstancesToFill; i++) {
        int leastNumInstances = Integer.MAX_VALUE;
        int replicaGroupIdWithLeastInstances = -1;
        for (int replicaGroupId : replicaGroupIds) {
          int numInstances = replicaGroupIdToInstancesMap.get(replicaGroupId).size();
          if (numInstances < leastNumInstances) {
            leastNumInstances = numInstances;
            replicaGroupIdWithLeastInstances = replicaGroupId;
          }
        }
        replicaGroupIdToInstancesMap.get(replicaGroupIdWithLeastInstances).add(newInstances.get(i));
      }
    }

    if (numPartitions == 1) {
      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
        List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId);
        if (replicaGroupId < existingNumReplicaGroups) {
          List<String> existingInstances = _existingInstancePartitions.getInstances(0, replicaGroupId);
          LinkedHashSet<String> candidateInstances = new LinkedHashSet<>(instancesInReplicaGroup);
          List<String> instances =
              selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup, candidateInstances, existingInstances);
          LOGGER.info(
              "Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, existing instances: {}",
              instances, replicaGroupId, _tableNameWithType, existingInstances);
          instancePartitions.setInstances(0, replicaGroupId, instances);
        } else {
          LOGGER.info("Selecting instances: {} for replica-group: {}, partition: 0 for table: {}, "
              + "there is no existing instances", instancesInReplicaGroup, replicaGroupId, _tableNameWithType);
          instancePartitions.setInstances(0, replicaGroupId, instancesInReplicaGroup);
        }
      }
    } else {
      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
        List<String> instancesInReplicaGroup = replicaGroupIdToInstancesMap.get(replicaGroupId);
        if (replicaGroupId < existingNumReplicaGroups) {
          int maxNumPartitionsPerInstance =
              (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
          Map<String, Integer> instanceToNumPartitionsMap =
              Maps.newHashMapWithExpectedSize(numInstancesPerReplicaGroup);
          for (String instance : instancesInReplicaGroup) {
            instanceToNumPartitionsMap.put(instance, 0);
          }

          List<List<String>> partitionIdToInstancesMap = new ArrayList<>(numPartitions);
          List<Set<String>> partitionIdToInstanceSetMap = new ArrayList<>(numPartitions);
          List<List<String>> partitionIdToExistingInstancesMap = new ArrayList<>(existingNumPartitions);
          for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
            // Initialize the list with empty positions to fill
            List<String> instances = new ArrayList<>(numInstancesPerPartition);
            for (int i = 0; i < numInstancesPerPartition; i++) {
              instances.add(null);
            }
            partitionIdToInstancesMap.add(instances);
            Set<String> instanceSet = Sets.newHashSetWithExpectedSize(numInstancesPerPartition);
            partitionIdToInstanceSetMap.add(instanceSet);

            // Keep the existing instances that are still alive
            if (partitionId < existingNumPartitions) {
              List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId);
              partitionIdToExistingInstancesMap.add(existingInstances);
              int numInstancesToCheck = Math.min(numInstancesPerPartition, existingInstances.size());
              for (int i = 0; i < numInstancesToCheck; i++) {
                String existingInstance = existingInstances.get(i);
                Integer numPartitionsOnInstance = instanceToNumPartitionsMap.get(existingInstance);
                if (numPartitionsOnInstance != null && numPartitionsOnInstance < maxNumPartitionsPerInstance) {
                  instances.set(i, existingInstance);
                  instanceSet.add(existingInstance);
                  instanceToNumPartitionsMap.put(existingInstance, numPartitionsOnInstance + 1);
                }
              }
            }
          }

          // Fill the vacant positions with instance that serves the least partitions
          for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
            List<String> instances = partitionIdToInstancesMap.get(partitionId);
            Set<String> instanceSet = partitionIdToInstanceSetMap.get(partitionId);
            int numInstancesToFill = numInstancesPerPartition - instanceSet.size();
            if (numInstancesToFill > 0) {
              // Triple stores (instance, numPartitionsOnInstance, instanceIndex) for sorting
              List<Triple<String, Integer, Integer>> triples = new ArrayList<>(numInstancesPerReplicaGroup);
              for (int i = 0; i < numInstancesPerReplicaGroup; i++) {
                String instance = instancesInReplicaGroup.get(i);
                if (!instanceSet.contains(instance)) {
                  triples.add(Triple.of(instance, instanceToNumPartitionsMap.get(instance), i));
                }
              }
              triples.sort((o1, o2) -> {
                int result = Integer.compare(o1.getMiddle(), o2.getMiddle());
                return result != 0 ? result : Integer.compare(o1.getRight(), o2.getRight());
              });
              int instanceIdToFill = 0;
              for (int i = 0; i < numInstancesPerPartition; i++) {
                if (instances.get(i) == null) {
                  String instance = triples.get(instanceIdToFill++).getLeft();
                  instances.set(i, instance);
                  instanceToNumPartitionsMap.put(instance, instanceToNumPartitionsMap.get(instance) + 1);
                }
              }
            }

            if (partitionId < existingNumPartitions) {
              LOGGER.info(
                  "Selecting instances: {} for replica-group: {}, partition: {} for table: {}, existing instances: {}",
                  instances, replicaGroupId, partitionId, _tableNameWithType,
                  partitionIdToExistingInstancesMap.get(partitionId));
            } else {
              LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, "
                  + "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType);
            }
            instancePartitions.setInstances(partitionId, replicaGroupId, instances);
          }
        } else {
          // Assign consecutive instances within a replica-group to each partition
          int instanceIdInReplicaGroup = 0;
          for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
            List<String> instances = new ArrayList<>(numInstancesPerPartition);
            for (int i = 0; i < numInstancesPerPartition; i++) {
              instances.add(instancesInReplicaGroup.get(instanceIdInReplicaGroup));
              instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup;
            }
            LOGGER.info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}, "
                + "there is no existing instances", instances, replicaGroupId, partitionId, _tableNameWithType);
            instancePartitions.setInstances(partitionId, replicaGroupId, instances);
          }
        }
      }
    }
  }