in crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java [58:129]
public void start(Map<String,Object> properties ) {
for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
String val = System.getProperty(configKey.getKey());
if (val != null) {
properties.put(configKey.getKey(), val);
}
}
log.info("Consumer startup config properties before adding additional properties from Zookeeper={}",
SensitivePropRedactionUtils.flattenAndRedactForLogging(properties));
String zkConnectString = (String) properties.get("zkConnectString");
if (zkConnectString == null) {
throw new IllegalArgumentException("zkConnectString not specified for producer");
}
try (SolrZkClient client = new SolrZkClient(zkConnectString, 15000)) {
try {
if (client.exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
CrossDcConf.CROSSDC_PROPERTIES), true)) {
byte[] data = client.getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
CrossDcConf.CROSSDC_PROPERTIES), null, null, true);
Properties zkProps = new Properties();
zkProps.load(new ByteArrayInputStream(data));
KafkaCrossDcConf.readZkProps(properties, zkProps);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
String bootstrapServers = (String) properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS);
if (bootstrapServers == null) {
throw new IllegalArgumentException("bootstrapServers not specified for Consumer");
}
String topicName = (String) properties.get(TOPIC_NAME);
if (topicName == null) {
throw new IllegalArgumentException("topicName not specified for Consumer");
}
//server = new Server();
//ServerConnector connector = new ServerConnector(server);
//connector.setPort(port);
//server.setConnectors(new Connector[] {connector})
KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
crossDcConsumer = getCrossDcConsumer(conf, startLatch);
// Start consumer thread
log.info("Starting CrossDC Consumer {}", conf);
ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
consumerThreadExecutor.submit(crossDcConsumer);
// Register shutdown hook
Thread shutdownHook = new Thread(() -> System.out.println("Shutting down consumers!"));
Runtime.getRuntime().addShutdownHook(shutdownHook);
try {
startLatch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
}
}