private synchronized void rebalanceCurrentCluster()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [198:252]


  private synchronized void rebalanceCurrentCluster(List<LiveInstance> liveInstances,
      List<String> excludedInstances, boolean checkInstanceChange, boolean forced) {
    Context context = _rebalanceTimer.time();
    LOGGER.info("AutoRebalanceLiveInstanceChangeListener.rebalanceCurrentCluster() wakes up!");
    try {
      _numLiveInstances.inc(liveInstances.size() - _numLiveInstances.getCount());
      _numBlacklistedInstances.inc(excludedInstances.size() - _numBlacklistedInstances.getCount());

      if (!_helixManager.isLeader()) {
        LOGGER.info("Not leader, do nothing!");
        return;
      }
      Set<TopicPartition> blacklistedTopicPartitions = _helixMirrorMakerManager
          .getTopicPartitionBlacklist();
      _numBlacklistedTopicPartitions
          .inc(blacklistedTopicPartitions.size() - _numBlacklistedTopicPartitions.getCount());

      if (!_helixMirrorMakerManager.isAutoBalancingEnabled()) {
        LOGGER.info("Is leader, but auto-balancing is disabled, do nothing!");
        return;
      }
      if (liveInstances.isEmpty()) {
        LOGGER.info("No live instances, do nothing!");
        return;
      }
      final Map<String, Set<TopicPartition>> instanceToTopicPartitionMap =
          HelixUtils.getInstanceToTopicPartitionsMap(_helixManager);
      Set<TopicPartition> unassignedTopicPartitions = HelixUtils
          .getUnassignedPartitions(_helixManager);
      if (instanceToTopicPartitionMap.isEmpty() && unassignedTopicPartitions.isEmpty()) {
        LOGGER.info("No topic got assigned yet, do nothing!");
        return;
      }

      Set<InstanceTopicPartitionHolder> newAssignment = rescaleInstanceToTopicPartitionMap(
          liveInstances,
          excludedInstances, instanceToTopicPartitionMap, unassignedTopicPartitions,
          blacklistedTopicPartitions, checkInstanceChange, forced);
      if (newAssignment == null) {
        LOGGER.info("No assignment got changed, do nothing!");
        return;
      }
      LOGGER.info("New assignment: " + newAssignment);
      _lastRebalanceTimeMillis = System.currentTimeMillis();
      Map<String, IdealState> idealStatesFromAssignment =
          HelixUtils.getIdealStatesFromAssignment(newAssignment, blacklistedTopicPartitions);
      LOGGER.info("Trying to assign new IdealStatesMap!");
      assignIdealStates(_helixManager, idealStatesFromAssignment);

      _helixMirrorMakerManager.updateCurrentServingInstance();
      _rebalanceRate.mark();
    } finally {
      context.close();
    }
  }