public void start()

in crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java [58:129]


    public void start(Map<String,Object> properties ) {

        for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
            String val = System.getProperty(configKey.getKey());
            if (val != null) {
                properties.put(configKey.getKey(), val);
            }
        }

        log.info("Consumer startup config properties before adding additional properties from Zookeeper={}",
                SensitivePropRedactionUtils.flattenAndRedactForLogging(properties));

        String zkConnectString = (String) properties.get("zkConnectString");
        if (zkConnectString == null) {
            throw new IllegalArgumentException("zkConnectString not specified for producer");
        }

        try (SolrZkClient client = new SolrZkClient(zkConnectString, 15000)) {

            try {
                if (client.exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
                    CrossDcConf.CROSSDC_PROPERTIES), true)) {
                    byte[] data = client.getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
                        CrossDcConf.CROSSDC_PROPERTIES), null, null, true);
                    Properties zkProps = new Properties();
                    zkProps.load(new ByteArrayInputStream(data));

                    KafkaCrossDcConf.readZkProps(properties, zkProps);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
            } catch (Exception e) {
                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
            }
        }

        String bootstrapServers = (String) properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS);
        if (bootstrapServers == null) {
            throw new IllegalArgumentException("bootstrapServers not specified for Consumer");
        }

        String topicName = (String) properties.get(TOPIC_NAME);
        if (topicName == null) {
            throw new IllegalArgumentException("topicName not specified for Consumer");
        }

        //server = new Server();
        //ServerConnector connector = new ServerConnector(server);
        //connector.setPort(port);
        //server.setConnectors(new Connector[] {connector})
        KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
        crossDcConsumer = getCrossDcConsumer(conf, startLatch);

        // Start consumer thread

        log.info("Starting CrossDC Consumer {}", conf);

        ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
        consumerThreadExecutor.submit(crossDcConsumer);

        // Register shutdown hook
        Thread shutdownHook = new Thread(() -> System.out.println("Shutting down consumers!"));
        Runtime.getRuntime().addShutdownHook(shutdownHook);

        try {
            startLatch.await(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
        }
    }