in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [1110:1177]
public InstanceTopicPartitionHolder createNewRoute(String pipeline, int routeId) throws Exception {
if (_availableControllerList.isEmpty()) {
LOGGER.info("No available controller!");
throw new Exception("No available controller!");
}
if (_workerHelixManager.getAvailableWorkerList().isEmpty()) {
LOGGER.info("No available worker!");
throw new Exception("No available worker!");
}
String instanceName = _availableControllerList.get(0);
InstanceTopicPartitionHolder instance = new InstanceTopicPartitionHolder(instanceName,
new TopicPartition(pipeline, routeId));
if (!isPipelineExisted(pipeline)) {
setEmptyResourceConfig(pipeline);
LOGGER
.info("Create new pipeline {} partition {} to instance {}", pipeline, routeId, instance);
_helixAdmin.addResource(_helixClusterName, pipeline,
IdealStateBuilder.buildCustomIdealStateFor(pipeline, String.valueOf(routeId), instance));
} else {
LOGGER.info("Expanding pipeline {} new partition {} to instance {}", pipeline, routeId,
instance);
_helixAdmin.setResourceIdealState(_helixClusterName, pipeline,
IdealStateBuilder.expandCustomIdealStateFor(
_helixAdmin.getResourceIdealState(_helixClusterName, pipeline),
pipeline, String.valueOf(routeId), instance));
LOGGER.info("New IdealState: {}",
_helixAdmin.getResourceIdealState(_helixClusterName, pipeline));
}
String[] srcDst = pipeline.split(SEPARATOR);
String routeString = String.format("%s-%s-%s", srcDst[1], srcDst[2], routeId);
String controllerWorkerHelixClusterName = "controller-worker-" + routeString;
HelixManager spectator = HelixManagerFactory.getZKHelixManager(controllerWorkerHelixClusterName,
_instanceId,
InstanceType.SPECTATOR,
_helixZkURL);
long ts1 = System.currentTimeMillis();
while (true) {
try {
spectator.connect();
break;
} catch (Exception e) {
// Do nothing
}
if (System.currentTimeMillis() - ts1 > 60000) {
throw new Exception(String.format("Controller %s failed to set up new route cluster %s!",
instanceName, controllerWorkerHelixClusterName));
}
Thread.sleep(1000);
}
_availableControllerList.remove(instanceName);
_pipelineToInstanceMap.put(pipeline, new PriorityQueue<>(1,
InstanceTopicPartitionHolder.totalWorkloadComparator(_controllerWorkloadSnapshot.getPipelineWorkloadMap())));
_pipelineToInstanceMap.get(pipeline).add(instance);
_assignedControllerCount.inc();
_workerHelixManager.addTopicToMirrorMaker(instance, pipeline, routeId);
// register metrics
maybeRegisterMetrics(routeString);
spectator.disconnect();
return instance;
}