private boolean balancePartitions()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [465:592]


  private boolean balancePartitions(
      Set<InstanceTopicPartitionHolder> instances,
      List<TopicPartition> partitionsToBeAssigned,
      Set<TopicPartition> blacklistedTopicPartitions,
      boolean forced) {
    if (instances.isEmpty()) {
      LOGGER.error("No workers to take " + partitionsToBeAssigned.size() + " partitions");
      return false;
    }

    boolean assignmentChanged = false;

    Set<TopicPartition> stuckPartitionsMoved = moveStuckPartitions(instances);
    if (stuckPartitionsMoved != null && !stuckPartitionsMoved.isEmpty()) {
      assignmentChanged = true;
    }

    // collect lag information for all topic partitions
    final Map<TopicPartition, Long> lagTimeMap = new HashMap<>();
    List<TopicPartition> laggingPartitions = new ArrayList<>();
    List<TopicPartition> nonLaggingPartitions = new ArrayList<>();
    for (TopicPartition tp : partitionsToBeAssigned) {
      if (blacklistedTopicPartitions.contains(tp)) {
        LOGGER.info("%s in blacklist, skip assignment", tp.toString());
        continue;
      }

      TopicPartitionLag lagInfo = _helixMirrorMakerManager.calculateLagTime(tp);
      if (lagInfo != null && lagInfo.getLag() > 0) {
        lagTimeMap.put(tp, lagInfo.getLag());
        laggingPartitions.add(tp);
      } else {
        nonLaggingPartitions.add(tp);
      }
    }
    for (InstanceTopicPartitionHolder instance : instances) {
      for (TopicPartition tp : instance.getServingTopicPartitionSet()) {
        TopicPartitionLag lagInfo = _helixMirrorMakerManager.calculateLagTime(tp);
        if (lagInfo != null && lagInfo.getLag() > 0) {
          lagTimeMap.put(tp, lagInfo.getLag());
        }
      }
    }
    LOGGER.info("balancePartitions: Current lagging partitions: " + lagTimeMap);

    // re-distribute the lagging partitions first
    ITopicWorkloadWeighter laggingPartitionWeighter = (TopicPartition tp) ->
        lagTimeMap.containsKey(tp) ? 1.0 : 0.0;

    TreeSet<InstanceTopicPartitionHolder> instancesSortedByLag = new TreeSet<>(
        InstanceTopicPartitionHolder
            .perPartitionWorkloadComparator(_helixMirrorMakerManager.getWorkloadInfoRetriever(),
                laggingPartitionWeighter));
    instancesSortedByLag.addAll(instances);
    List<TopicPartition> reassignedLaggingPartitions = removeOverloadedParitions(
        instancesSortedByLag,
        laggingPartitions, null, true, laggingPartitionWeighter);
    laggingPartitions.addAll(reassignedLaggingPartitions);

    if (assignPartitions(instancesSortedByLag, laggingPartitions)) {
      assignmentChanged = true;
    }

    // dedicated instances serve only lagging partitions
    int maxDedicated = (int) (instances.size() * _maxDedicatedInstancesRatio);
    TreeSet<InstanceTopicPartitionHolder> orderedInstances = new TreeSet<>(
        InstanceTopicPartitionHolder
            .perPartitionWorkloadComparator(_helixMirrorMakerManager.getWorkloadInfoRetriever(),
                null));
    // instancesSortedByLag are sorted by lags, so the instances with lags appear after the instances with non-lags only
    List<InstanceTopicPartitionHolder> dedicatedInstances = new ArrayList<>();
    for (InstanceTopicPartitionHolder instance : instancesSortedByLag) {
      if (dedicatedInstances.size() > maxDedicated) {
        orderedInstances.add(instance);
      } else {
        boolean hasLag = false;
        for (TopicPartition tp : instance.getServingTopicPartitionSet()) {
          if (lagTimeMap.containsKey(tp)) {
            hasLag = true;
            break;
          }
        }
        if (hasLag) {
          // this instance is a dedicated one for lagging partitions only
          dedicatedInstances.add(instance);
          for (TopicPartition tp : instance.getServingTopicPartitionSet()) {
            if (!lagTimeMap.containsKey(tp)) {
              instance.removeTopicPartition(tp);
              nonLaggingPartitions.add(tp);
            }
          }
        } else {
          orderedInstances.add(instance);
        }
      }
    }
    if (!dedicatedInstances.isEmpty()) {
      LOGGER.info("balancePartitions: dedicated instances: " + dedicatedInstances);
    }

    // if the current assignment is overloaded on some workers, re-assign some partitions of these workers
    ITopicWorkloadWeighter adjustedWeighter = new ITopicWorkloadWeighter() {
      @Override
      public double partitionWeight(TopicPartition tp) {
        // give 1.0 more weight for each minute lag up to 2 hour
        Long lag = lagTimeMap.get(tp);
        if (lag == null) {
          return 1.0;
        }
        return 1.0 + Math.min(120, lag / 60);
      }
    };

    Set<TopicPartition> pinnedPartitions = new HashSet<>();
    pinnedPartitions.addAll(lagTimeMap.keySet());
    if (stuckPartitionsMoved != null) {
      pinnedPartitions.addAll(stuckPartitionsMoved);
    }

    List<TopicPartition> reassignedPartitions = removeOverloadedParitions(orderedInstances,
        nonLaggingPartitions, pinnedPartitions, forced, adjustedWeighter);
    nonLaggingPartitions.addAll(reassignedPartitions);
    if (assignPartitions(orderedInstances, nonLaggingPartitions)) {
      assignmentChanged = true;
    }

    return assignmentChanged;
  }