in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManager.java [165:197]
public synchronized void updateCurrentServingInstance() {
synchronized (_currentServingInstance) {
Map<String, InstanceTopicPartitionHolder> instanceMap = new HashMap<>();
Map<String, Set<TopicPartition>> instanceToTopicPartitionsMap =
HelixUtils.getInstanceToTopicPartitionsMap(_helixZkManager);
List<String> liveInstances = HelixUtils.liveInstances(_helixZkManager);
Set<String> blacklistedInstances = new HashSet<>(getBlacklistedInstances());
for (String instanceName : liveInstances) {
if (!blacklistedInstances.contains(instanceName)) {
InstanceTopicPartitionHolder instance = new InstanceTopicPartitionHolder(instanceName);
instanceMap.put(instanceName, instance);
}
}
for (String instanceName : instanceToTopicPartitionsMap.keySet()) {
if (instanceMap.containsKey(instanceName)) {
instanceMap.get(instanceName)
.addTopicPartitions(instanceToTopicPartitionsMap.get(instanceName));
}
}
_currentServingInstance.clear();
int maxStandbyHosts = (_controllerConf.getMaxWorkingInstances() <= 0) ? 0
: instanceMap.size() - _controllerConf.getMaxWorkingInstances();
int standbyHosts = 0;
for (InstanceTopicPartitionHolder itph : instanceMap.values()) {
if (standbyHosts >= maxStandbyHosts || itph.getNumServingTopicPartitions() > 0) {
_currentServingInstance.add(itph);
} else {
// exclude it as a standby host
standbyHosts++;
}
}
}
}