in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/WorkerInstance.java [289:335]
private void initializeProperties(String srcCluster, String dstCluster) {
String commitZk = consumerProps.getProperty(Constants.ZK_SERVER, "");
topicObserverZk = producerProps.getProperty(Constants.ZK_SERVER, "");
// override properties for federated
if (workerConf.getFederatedEnabled()) {
if (StringUtils.isEmpty(srcCluster) || StringUtils.isEmpty(dstCluster)) {
throw new RuntimeException(
String.format(
"srcCluster and dstCluster are required for federated mode. current value: {} - {}",
srcCluster, dstCluster)
);
}
boolean isSrcClusterSecure = workerConf.getSecureFeatureEnabled() && workerConf.getSecureClustersSet().contains(srcCluster);
boolean isDstClusterSecure = workerConf.getSecureFeatureEnabled() && workerConf.getSecureClustersSet().contains(dstCluster);
if (isSrcClusterSecure) {
consumerProps.putAll(WorkerUtils.loadProperties(workerConf.getSecureConfigFile()));
}
if (isDstClusterSecure) {
producerProps.putAll(WorkerUtils.loadProperties(workerConf.getSecureConfigFile()));
}
consumerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterBootstrapServers(srcCluster, isSrcClusterSecure));
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterBootstrapServers(dstCluster, isDstClusterSecure));
commitZk = clusterProps
.getProperty(Constants.FEDERATED_CLUSTER_ZK_CONFIG_PREFIX + srcCluster, "");
topicObserverZk = clusterProps
.getProperty(Constants.FEDERATED_CLUSTER_ZK_CONFIG_PREFIX + dstCluster, "");
if (StringUtils.isBlank(commitZk)) {
throw new IllegalArgumentException(
String.format("Failed to find %s in config file %s:",
Constants.FEDERATED_CLUSTER_ZK_CONFIG_PREFIX + srcCluster,
workerConf.getClusterConfigFile()));
}
} else {
commitZk = consumerProps.getProperty(Constants.COMMIT_ZOOKEEPER_SERVER_CONFIG, commitZk);
if (StringUtils.isBlank(commitZk)) {
throw new IllegalArgumentException(
"Failed to find commitZk server, property zkServer or commit.zookeeper.connect is required in Consumer config file :"
+ workerConf.getConsumerConfigFile());
}
}
consumerProps.setProperty(Constants.COMMIT_ZOOKEEPER_SERVER_CONFIG, commitZk);
}