private void initializeProperties()

in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/WorkerInstance.java [289:335]


  private void initializeProperties(String srcCluster, String dstCluster) {
    String commitZk = consumerProps.getProperty(Constants.ZK_SERVER, "");
    topicObserverZk = producerProps.getProperty(Constants.ZK_SERVER, "");
    // override properties for federated
    if (workerConf.getFederatedEnabled()) {
      if (StringUtils.isEmpty(srcCluster) || StringUtils.isEmpty(dstCluster)) {
        throw new RuntimeException(
            String.format(
                "srcCluster and dstCluster are required for federated mode. current value: {} - {}",
                srcCluster, dstCluster)
        );
      }

      boolean isSrcClusterSecure = workerConf.getSecureFeatureEnabled() && workerConf.getSecureClustersSet().contains(srcCluster);
      boolean isDstClusterSecure = workerConf.getSecureFeatureEnabled() && workerConf.getSecureClustersSet().contains(dstCluster);

      if (isSrcClusterSecure) {
        consumerProps.putAll(WorkerUtils.loadProperties(workerConf.getSecureConfigFile()));
      }

      if (isDstClusterSecure) {
        producerProps.putAll(WorkerUtils.loadProperties(workerConf.getSecureConfigFile()));
      }

      consumerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterBootstrapServers(srcCluster, isSrcClusterSecure));
      producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterBootstrapServers(dstCluster, isDstClusterSecure));

      commitZk = clusterProps
          .getProperty(Constants.FEDERATED_CLUSTER_ZK_CONFIG_PREFIX + srcCluster, "");
      topicObserverZk = clusterProps
          .getProperty(Constants.FEDERATED_CLUSTER_ZK_CONFIG_PREFIX + dstCluster, "");
      if (StringUtils.isBlank(commitZk)) {
        throw new IllegalArgumentException(
            String.format("Failed to find %s in  config file %s:",
                Constants.FEDERATED_CLUSTER_ZK_CONFIG_PREFIX + srcCluster,
                workerConf.getClusterConfigFile()));
      }
    } else {
      commitZk = consumerProps.getProperty(Constants.COMMIT_ZOOKEEPER_SERVER_CONFIG, commitZk);
      if (StringUtils.isBlank(commitZk)) {
        throw new IllegalArgumentException(
            "Failed to find commitZk server, property zkServer or commit.zookeeper.connect is required in Consumer config file :"
                + workerConf.getConsumerConfigFile());
      }
    }
    consumerProps.setProperty(Constants.COMMIT_ZOOKEEPER_SERVER_CONFIG, commitZk);
  }