in crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java [64:136]
public void start(Map<String,Object> properties ) {
ConfUtil.fillProperties(null, properties);
log.info("Consumer startup config properties before adding additional properties from Zookeeper={}",
SensitivePropRedactionUtils.flattenAndRedactForLogging(properties));
String zkConnectString = (String) properties.get("zkConnectString");
if (zkConnectString == null) {
throw new IllegalArgumentException("zkConnectString not specified for producer");
}
try (SolrZkClient client = new SolrZkClient.Builder().withUrl(zkConnectString).withTimeout(15000, TimeUnit.MILLISECONDS).build()) {
// update properties, potentially also from ZK
ConfUtil.fillProperties(client, properties);
}
ConfUtil.verifyProperties(properties);
String bootstrapServers = (String) properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS);
String topicName = (String) properties.get(TOPIC_NAME);
KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
crossDcConsumer = getCrossDcConsumer(conf, startLatch);
// jetty endpoint for /metrics
int port = conf.getInt(PORT);
if (port > 0) {
log.info("Starting API endpoints...");
server = new Server(port);
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
context.setContextPath("/");
server.setHandler(context);
context.addServlet(ThreadDumpServlet.class, "/threads/*");
context.addServlet(MetricsServlet.class, "/metrics/*");
context.setAttribute("com.codahale.metrics.servlets.MetricsServlet.registry", SharedMetricRegistries.getOrCreate(METRICS_REGISTRY));
for (ServletMapping mapping : context.getServletHandler().getServletMappings()) {
log.info(" - {}", mapping.getPathSpecs()[0]);
}
}
// Start consumer thread
log.info("Starting CrossDC Consumer {}", conf);
ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
consumerThreadExecutor.submit(crossDcConsumer);
// Register shutdown hook
Thread shutdownHook = new Thread(() -> System.out.println("Shutting down consumers!"));
Runtime.getRuntime().addShutdownHook(shutdownHook);
if (server != null) {
try {
server.start();
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
}
}
try {
startLatch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (server != null) {
try {
server.stop();
} catch (Exception e1) {
// ignore
}
}
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
}
}