in src/main/java/com/uber/rss/StreamServer.java [121:159]
private void createServiceRegistry(StreamServerConfig serverConfig) {
switch (serverConfig.getServiceRegistryType()) {
case ServiceRegistry.TYPE_ZOOKEEPER:
String zkServersBackup = serverConfig.getZooKeeperServersBackup();
if (zkServersBackup == null || zkServersBackup.isEmpty()) {
this.serviceRegistry = ZooKeeperServiceRegistry.createTimingInstance(
serverConfig.getZooKeeperServers(),
serverConfig.getNetworkTimeout(),
serverConfig.getNetworkRetries());
} else {
this.serviceRegistry = ZooKeeperFaultTolerantServiceRegistry.createTimingInstance(
Arrays.asList(serverConfig.getZooKeeperServers(), zkServersBackup),
serverConfig.getNetworkTimeout(),
serverConfig.getNetworkRetries());
}
break;
case ServiceRegistry.TYPE_INMEMORY:
this.serviceRegistry = new InMemoryServiceRegistry();
break;
case ServiceRegistry.TYPE_STANDALONE:
String registryServer = serverConfig.getRegistryServer();
if (registryServer != null && !registryServer.isEmpty()) {
ServerHostAndPort hostAndPort = ServerHostAndPort.fromString(registryServer);
logger.info(String.format("Creating registry client connecting to registry server: %s:%s",
hostAndPort.getHost(), hostAndPort.getPort()));
this.serviceRegistry = new StandaloneServiceRegistryClient(hostAndPort.getHost(),
hostAndPort.getPort(),
serverConfig.getNetworkTimeout(),
"streamServer");
} else {
logger.info("Registry server is not specified, will use localhost as registry server" +
" and create registry client when local stream server is " +
"started (need to get port at that time)");
}
break;
default:
throw new RuntimeException("Unknown service registry type: " + serverConfig.getServiceRegistryType());
}
}