private Set rescaleInstanceToTopicPartitionMap()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [254:369]


  private Set<InstanceTopicPartitionHolder> rescaleInstanceToTopicPartitionMap(
      List<LiveInstance> liveInstances,
      List<String> excludedInstances,
      Map<String, Set<TopicPartition>> instanceToTopicPartitionMap,
      Set<TopicPartition> unassignedTopicPartitions,
      Set<TopicPartition> blacklistedTopicPartitions,
      boolean checkInstanceChange,
      boolean forced) {
    Set<String> liveInstanceNames = getLiveInstanceName(liveInstances, excludedInstances);
    // removedInstances contains the excludedInstances that currently have workload too
    Set<String> removedInstances = getRemovedInstanceSet(liveInstanceNames,
        instanceToTopicPartitionMap.keySet());
    Set<String> idleInstances = getIdleInstanceSet(liveInstanceNames, instanceToTopicPartitionMap);

    int numIdleInstancesToAssign;
    if (_maxWorkingInstances == 0) {
      numIdleInstancesToAssign = idleInstances.size();
    } else {
      numIdleInstancesToAssign =
          _maxWorkingInstances - (instanceToTopicPartitionMap.size() - removedInstances.size());
      if (numIdleInstancesToAssign < 0) {
        numIdleInstancesToAssign = 0;
      } else if (numIdleInstancesToAssign > idleInstances.size()) {
        numIdleInstancesToAssign = idleInstances.size();
      }
    }

    if (numIdleInstancesToAssign > 0) {
      _numIdleInstances
          .inc(idleInstances.size() - numIdleInstancesToAssign - _numIdleInstances.getCount());
    } else {
      _numIdleInstances.inc(idleInstances.size() - _numIdleInstances.getCount());
    }

    if (!forced && numIdleInstancesToAssign <= 0 && checkInstanceChange && removedInstances
        .isEmpty()
        && (_maxWorkingInstances <= 0 || _maxWorkingInstances >= instanceToTopicPartitionMap
        .size())) {
      // no instance change
      return null;
    }

    LOGGER.info("Trying to rescale cluster with " + liveInstanceNames.size() + " live instances ("
        + excludedInstances.size() + " excluded - " + excludedInstances + "), using "
        + numIdleInstancesToAssign
        + " out of " + idleInstances.size() + " idle instances, and removed " + removedInstances
        .size()
        + " instances - " + Arrays.toString(removedInstances.toArray(new String[0]))
        + " for unassigned partitions: "
        + unassignedTopicPartitions);
    Set<InstanceTopicPartitionHolder> instances = new HashSet<>();
    List<TopicPartition> tpiNeedsToBeAssigned = new ArrayList<>();
    tpiNeedsToBeAssigned.addAll(unassignedTopicPartitions);

    boolean assignmentChanged = false;
    for (Map.Entry<String, Set<TopicPartition>> entry : instanceToTopicPartitionMap.entrySet()) {
      String instanceName = entry.getKey();
      if (entry.getValue().isEmpty()) {
        // it is an idle instance, do nothing
      } else if (!removedInstances.contains(instanceName)) {
        // keep the instance assignment as it
        InstanceTopicPartitionHolder instance = new InstanceTopicPartitionHolder(instanceName);
        instance.addTopicPartitions(entry.getValue());
        instances.add(instance);
      } else if (!idleInstances.isEmpty() && numIdleInstancesToAssign > 0) {
        // assign all the workload to another idle instance
        String idleInstanceToAssign = idleInstances.iterator().next();
        idleInstances.remove(idleInstanceToAssign);
        numIdleInstancesToAssign--;
        InstanceTopicPartitionHolder instance = new InstanceTopicPartitionHolder(
            idleInstanceToAssign);
        instance.addTopicPartitions(entry.getValue());
        instances.add(instance);
        assignmentChanged = true;
        LOGGER.info(
            "Move workload from instance " + instanceName + " to " + idleInstanceToAssign + ": "
                + entry.getValue());
      } else {
        // assign the workload to all instances
        tpiNeedsToBeAssigned.addAll(entry.getValue());
      }
    }

    if (_maxWorkingInstances > 0 && instances.size() > _maxWorkingInstances) {
      // there are working instances more than the expected number, move them to idle pool
      Iterator<InstanceTopicPartitionHolder> iter = instances.iterator();
      while (instances.size() > _maxWorkingInstances && iter.hasNext()) {
        InstanceTopicPartitionHolder itph = iter.next();
        LOGGER.info("Move workload from instance " + itph.getInstanceName() + " to become idle: "
            + itph.getServingTopicPartitionSet());
        tpiNeedsToBeAssigned.addAll(itph.getServingTopicPartitionSet());
        iter.remove();
      }
    } else if (numIdleInstancesToAssign > 0) {
      // put more idle instances to the list for rebalancing
      for (String instanceName : idleInstances) {
        if (numIdleInstancesToAssign <= 0) {
          break;
        }
        instances.add(new InstanceTopicPartitionHolder(instanceName));
        forced = true; // force to rebalance so as to assign partitions to the idle instances
        numIdleInstancesToAssign--;
      }
    }

    if (balancePartitions(instances, tpiNeedsToBeAssigned, blacklistedTopicPartitions, forced)) {
      assignmentChanged = true;
    }

    // add empty task for the removed/excluded instances
    for (String ri : removedInstances) {
      instances.add(new InstanceTopicPartitionHolder(ri));
      assignmentChanged = true;
    }
    return assignmentChanged ? instances : null;
  }