in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/AdminHelper.java [87:116]
public Map<String, Boolean> setControllerAutobalancing(String srcCluster, String dstCluster, boolean enable) {
HashMap<String, Boolean> retVal = new HashMap<>();
String pipelineFilter = "";
if (StringUtils.isNotEmpty(srcCluster) && StringUtils.isNotEmpty(dstCluster)) {
pipelineFilter = ControllerUtils.getPipelineName(srcCluster, dstCluster);
}
Map<String, PriorityQueue<InstanceTopicPartitionHolder>> map = helixManager.getPipelineToInstanceMap();
for(String pipeline : map.keySet()) {
if (pipelineFilter.isEmpty() || pipelineFilter.equals(pipeline)) {
PriorityQueue<InstanceTopicPartitionHolder> routes =
helixManager.getPipelineToInstanceMap().get(pipeline);
if (routes != null) {
for (InstanceTopicPartitionHolder route : routes) {
String instanceName = route.getInstanceName();
try {
boolean notified = helixManager.notifyControllerAutobalancing(instanceName, enable);
retVal.put(instanceName, notified);
if(!notified) {
LOGGER.warn("Failed to notify instanceName : {}, enable : {} , but we will continue", instanceName, enable);
}
} catch (ControllerException ex) {
LOGGER.warn("Failed to notify instanceName : {}, enable : {} , but we will continue", instanceName, enable);
retVal.put(instanceName, false);
}
}
}
}
}
return retVal;
}