in crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java [64:115]
public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) {
this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
this.startLatch = startLatch;
final Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, conf.get(KafkaCrossDcConf.GROUP_ID));
kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps);
kafkaConsumerProps.putAll(conf.getAdditionalProperties());
int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS);
executor = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, queue, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("KafkaCrossDcConsumerWorker");
return t;
}
});
executor.prestartAllCoreThreads();
solrClient = createSolrClient(conf);
messageProcessor = createSolrMessageProcessor();
log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProps);
kafkaConsumer = createKafkaConsumer(kafkaConsumerProps);
partitionManager = new PartitionManager(kafkaConsumer);
// Create producer for resubmitting failed requests
log.info("Creating Kafka resubmit producer");
this.kafkaMirroringSink = createKafkaMirroringSink(conf);
log.info("Created Kafka resubmit producer");
}