in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/WorkerHelixManager.java [208:254]
public synchronized void addWorkersToMirrorMaker(InstanceTopicPartitionHolder controller, String pipeline,
int routeId, int numWorkersToAdd) throws Exception {
LOGGER.info("Trying to add {} workers to route: {}@{}", numWorkersToAdd, pipeline, routeId);
_lock.lock();
try {
if (_availableWorkerList.size() == 0) {
LOGGER.warn("No available worker!");
return;
}
List<String> instances = new ArrayList<>();
for (int i = 0; i < numWorkersToAdd && i < _availableWorkerList.size(); i++) {
instances.add(_availableWorkerList.get(i));
}
LOGGER.info("Add {} instance to route {}: {}", instances.size(), pipeline + SEPARATOR + routeId, instances);
if (_helixAdmin.getResourceIdealState(_helixClusterName, pipeline) == null) {
// this can happen when manager-controller idealstate is finished but manager-worker idealstate not exist
if (!isPipelineExisted(pipeline)) {
setEmptyResourceConfig(pipeline);
_helixAdmin.addResource(_helixClusterName, pipeline,
IdealStateBuilder.buildCustomIdealStateFor(pipeline, String.valueOf(routeId), instances));
} else {
_helixAdmin.setResourceIdealState(_helixClusterName, pipeline,
IdealStateBuilder.expandCustomIdealStateFor(_helixAdmin.getResourceIdealState(_helixClusterName, pipeline),
pipeline, String.valueOf(routeId), instances, _conf.getMaxNumWorkersPerRoute()));
}
} else if (_helixAdmin.getResourceIdealState(_helixClusterName, pipeline).getPartitionSet().contains(String.valueOf(routeId))) {
LOGGER.info("Topic {} Partition {} exists", pipeline, routeId);
_helixAdmin.setResourceIdealState(_helixClusterName, pipeline,
IdealStateBuilder.expandInstanceCustomIdealStateFor(_helixAdmin.getResourceIdealState(_helixClusterName, pipeline),
pipeline, String.valueOf(routeId), instances, _conf.getMaxNumWorkersPerRoute()));
} else {
LOGGER.info("Topic {} Partition {} does not exist", pipeline, routeId);
_helixAdmin.setResourceIdealState(_helixClusterName, pipeline,
IdealStateBuilder.expandCustomIdealStateFor(_helixAdmin.getResourceIdealState(_helixClusterName, pipeline),
pipeline, String.valueOf(routeId), instances, _conf.getMaxNumWorkersPerRoute()));
}
TopicPartition route = new TopicPartition(pipeline, routeId);
_routeToInstanceMap.putIfAbsent(route, new ArrayList<>());
_routeToInstanceMap.get(route).addAll(instances);
_availableWorkerList.removeAll(instances);
controller.addWorkers(instances);
} finally {
_lock.unlock();
}
}