in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [198:252]
private synchronized void rebalanceCurrentCluster(List<LiveInstance> liveInstances,
List<String> excludedInstances, boolean checkInstanceChange, boolean forced) {
Context context = _rebalanceTimer.time();
LOGGER.info("AutoRebalanceLiveInstanceChangeListener.rebalanceCurrentCluster() wakes up!");
try {
_numLiveInstances.inc(liveInstances.size() - _numLiveInstances.getCount());
_numBlacklistedInstances.inc(excludedInstances.size() - _numBlacklistedInstances.getCount());
if (!_helixManager.isLeader()) {
LOGGER.info("Not leader, do nothing!");
return;
}
Set<TopicPartition> blacklistedTopicPartitions = _helixMirrorMakerManager
.getTopicPartitionBlacklist();
_numBlacklistedTopicPartitions
.inc(blacklistedTopicPartitions.size() - _numBlacklistedTopicPartitions.getCount());
if (!_helixMirrorMakerManager.isAutoBalancingEnabled()) {
LOGGER.info("Is leader, but auto-balancing is disabled, do nothing!");
return;
}
if (liveInstances.isEmpty()) {
LOGGER.info("No live instances, do nothing!");
return;
}
final Map<String, Set<TopicPartition>> instanceToTopicPartitionMap =
HelixUtils.getInstanceToTopicPartitionsMap(_helixManager);
Set<TopicPartition> unassignedTopicPartitions = HelixUtils
.getUnassignedPartitions(_helixManager);
if (instanceToTopicPartitionMap.isEmpty() && unassignedTopicPartitions.isEmpty()) {
LOGGER.info("No topic got assigned yet, do nothing!");
return;
}
Set<InstanceTopicPartitionHolder> newAssignment = rescaleInstanceToTopicPartitionMap(
liveInstances,
excludedInstances, instanceToTopicPartitionMap, unassignedTopicPartitions,
blacklistedTopicPartitions, checkInstanceChange, forced);
if (newAssignment == null) {
LOGGER.info("No assignment got changed, do nothing!");
return;
}
LOGGER.info("New assignment: " + newAssignment);
_lastRebalanceTimeMillis = System.currentTimeMillis();
Map<String, IdealState> idealStatesFromAssignment =
HelixUtils.getIdealStatesFromAssignment(newAssignment, blacklistedTopicPartitions);
LOGGER.info("Trying to assign new IdealStatesMap!");
assignIdealStates(_helixManager, idealStatesFromAssignment);
_helixMirrorMakerManager.updateCurrentServingInstance();
_rebalanceRate.mark();
} finally {
context.close();
}
}