public void start()

in crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java [64:136]


    public void start(Map<String,Object> properties ) {

        ConfUtil.fillProperties(null, properties);

        log.info("Consumer startup config properties before adding additional properties from Zookeeper={}",
                SensitivePropRedactionUtils.flattenAndRedactForLogging(properties));

        String zkConnectString = (String) properties.get("zkConnectString");
        if (zkConnectString == null) {
            throw new IllegalArgumentException("zkConnectString not specified for producer");
        }

        try (SolrZkClient client = new SolrZkClient.Builder().withUrl(zkConnectString).withTimeout(15000, TimeUnit.MILLISECONDS).build()) {
            // update properties, potentially also from ZK
            ConfUtil.fillProperties(client, properties);
        }

        ConfUtil.verifyProperties(properties);

        String bootstrapServers = (String) properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS);
        String topicName = (String) properties.get(TOPIC_NAME);

        KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
        crossDcConsumer = getCrossDcConsumer(conf, startLatch);

        // jetty endpoint for /metrics
        int port = conf.getInt(PORT);
        if (port > 0) {
            log.info("Starting API endpoints...");
            server = new Server(port);
            ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
            context.setContextPath("/");
            server.setHandler(context);
            context.addServlet(ThreadDumpServlet.class, "/threads/*");
            context.addServlet(MetricsServlet.class, "/metrics/*");
            context.setAttribute("com.codahale.metrics.servlets.MetricsServlet.registry", SharedMetricRegistries.getOrCreate(METRICS_REGISTRY));
            for (ServletMapping mapping : context.getServletHandler().getServletMappings()) {
                log.info(" - {}", mapping.getPathSpecs()[0]);
            }
        }

        // Start consumer thread

        log.info("Starting CrossDC Consumer {}", conf);

        ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
        consumerThreadExecutor.submit(crossDcConsumer);

        // Register shutdown hook
        Thread shutdownHook = new Thread(() -> System.out.println("Shutting down consumers!"));
        Runtime.getRuntime().addShutdownHook(shutdownHook);

        if (server != null) {
            try {
                server.start();
            } catch (Exception e) {
                throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
            }
        }
        try {
            startLatch.await(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            if (server != null) {
                try {
                    server.stop();
                } catch (Exception e1) {
                    // ignore
                }
            }
            throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
        }
    }