in crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java [89:130]
private Producer<String, MirroredSolrRequest> initProducer() {
// Initialize and return Kafka producer
Properties kafkaProducerProps = new Properties();
log.info("Starting CrossDC Producer {}", conf);
kafkaProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
String retries = conf.get(KafkaCrossDcConf.NUM_RETRIES);
if (retries != null) {
kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retries));
}
kafkaProducerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, conf.getInt(KafkaCrossDcConf.RETRY_BACKOFF_MS));
kafkaProducerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES));
kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getInt(KafkaCrossDcConf.BATCH_SIZE_BYTES));
kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, conf.getInt(KafkaCrossDcConf.BUFFER_MEMORY_BYTES));
kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, conf.getInt(KafkaCrossDcConf.LINGER_MS));
kafkaProducerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); // should be less than time that causes consumer to be kicked out
kafkaProducerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, conf.get(KafkaCrossDcConf.ENABLE_DATA_COMPRESSION));
kafkaProducerProps.put("key.serializer", StringSerializer.class.getName());
kafkaProducerProps.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
KafkaCrossDcConf.addSecurityProps(conf, kafkaProducerProps);
kafkaProducerProps.putAll(conf.getAdditionalProperties());
if (log.isDebugEnabled()) {
log.debug("Kafka Producer props={}", kafkaProducerProps);
}
ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(null);
Producer<String, MirroredSolrRequest> producer;
try {
producer = new KafkaProducer<>(kafkaProducerProps);
} finally {
Thread.currentThread().setContextClassLoader(originalContextClassLoader);
}
return producer;
}