in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/ManagerControllerHelix.java [107:186]
private void handleRouteAssignmentOnline(String srcCluster, String dstCluster, String routePartition) {
if (_currentControllerInstance != null) {
if (!(srcCluster.equals(_currentSrcCluster) && dstCluster.equals(_currentDstCluster)
&& routePartition.equals(_currentRoutePartition))) {
String msg = String.format(
"Invalid route partition assignment. Current route src=%s, dst=%s, partition=%s; new route src=%s, dst=%s, partition=%s",
_currentSrcCluster, _currentDstCluster, _currentRoutePartition, srcCluster, dstCluster, routePartition);
LOGGER.error(msg);
throw new IllegalArgumentException(msg);
} else {
if (_currentControllerInstance.isStarted()) {
LOGGER.info("Controller has already been started");
} else {
String msg = "Controller has already been initiated but not started yet";
LOGGER.error(msg);
throw new IllegalStateException(msg);
}
}
return;
}
// validate src and dst clusters in configuration
if (srcCluster.equals(dstCluster)) {
String msg = String.format("The source cluster %s cannot be the same as destination cluster", srcCluster);
LOGGER.error(msg);
throw new IllegalArgumentException(msg);
}
if (!_controllerConf.getSourceClusters().contains(srcCluster)) {
String msg = String.format("The cluster %s is not a valid source cluster", srcCluster);
LOGGER.error(msg);
throw new IllegalArgumentException(msg);
}
if (!_controllerConf.getDestinationClusters().contains(dstCluster)) {
String msg = String.format("The cluster %s is not a valid destination cluster", dstCluster);
LOGGER.error(msg);
throw new IllegalArgumentException(msg);
}
// set corresponding zkpath for src and dst clusters
String srcKafkaZkPath = (String) _controllerConf.getProperty(CONFIG_KAFKA_CLUSTER_KEY_PREFIX + srcCluster);
if (srcKafkaZkPath == null) {
String msg = "Failed to find configuration of ZooKeeper path for source cluster " + srcCluster;
LOGGER.error(msg);
throw new IllegalArgumentException(msg);
}
_controllerConf.setSrcKafkaZkPath(srcKafkaZkPath);
String destKafkaZkPath = (String) _controllerConf.getProperty(CONFIG_KAFKA_CLUSTER_KEY_PREFIX + dstCluster);
if (destKafkaZkPath == null) {
String msg = "Failed to find configuration of ZooKeeper path for destination cluster " + dstCluster;
LOGGER.error(msg);
throw new IllegalArgumentException(msg);
}
_controllerConf.setDestKafkaZkPath(destKafkaZkPath);
String clusterName = CONTROLLER_WORKER_HELIX_PREFIX + srcCluster + "-" + dstCluster + "-" + routePartition;
_controllerConf.setHelixClusterName(clusterName);
_controllerConf.setEnableSrcKafkaValidation("true");
_controllerConf.setGroupId("ureplicator-" + srcCluster + "-" + dstCluster);
_controllerConf.setSourceCluster(srcCluster);
_controllerConf.setDestinationCluster(dstCluster);
_currentControllerInstance = new ControllerInstance(this, _controllerConf);
LOGGER.info("Starting controller instance for route {}", clusterName);
try {
_currentControllerInstance.start();
} catch (Exception e) {
String msg = "Failed to start controller instance. Roll back.";
LOGGER.error(msg, e);
if (_currentControllerInstance.stop()) {
_currentControllerInstance = null;
} else {
LOGGER.error("Failed to stop the controller instance.");
}
throw new RuntimeException(msg);
}
_currentSrcCluster = srcCluster;
_currentDstCluster = dstCluster;
_currentRoutePartition = routePartition;
LOGGER.info("Successfully started controller instance for route {}", clusterName);
}