public KafkaCrossDcConsumer()

in crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java [64:115]


  public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {

    this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
    this.startLatch = startLatch;
    final Properties kafkaConsumerProps = new Properties();

    kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));

    kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, conf.get(KafkaCrossDcConf.GROUP_ID));

    kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));

    kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

    kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
    kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));

    kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
    kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));

    KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps);

    kafkaConsumerProps.putAll(conf.getAdditionalProperties());
    int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS);

    executor = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, queue, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("KafkaCrossDcConsumerWorker");
                return t;
            }
    });
    executor.prestartAllCoreThreads();

    solrClient = createSolrClient(conf);

    messageProcessor = createSolrMessageProcessor();



    log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProps);
    kafkaConsumer = createKafkaConsumer(kafkaConsumerProps);
    partitionManager = new PartitionManager(kafkaConsumer);
    // Create producer for resubmitting failed requests
    log.info("Creating Kafka resubmit producer");
    this.kafkaMirroringSink = createKafkaMirroringSink(conf);
    log.info("Created Kafka resubmit producer");

  }