private Producer initProducer()

in crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java [89:130]


    private Producer<String, MirroredSolrRequest> initProducer() {
        // Initialize and return Kafka producer
        Properties kafkaProducerProps = new Properties();

        log.info("Starting CrossDC Producer {}", conf);

        kafkaProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));

        kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
        String retries = conf.get(KafkaCrossDcConf.NUM_RETRIES);
        if (retries != null) {
            kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retries));
        }
        kafkaProducerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, conf.getInt(KafkaCrossDcConf.RETRY_BACKOFF_MS));
        kafkaProducerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES));
        kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getInt(KafkaCrossDcConf.BATCH_SIZE_BYTES));
        kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, conf.getInt(KafkaCrossDcConf.BUFFER_MEMORY_BYTES));
        kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, conf.getInt(KafkaCrossDcConf.LINGER_MS));
        kafkaProducerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); // should be less than time that causes consumer to be kicked out
        kafkaProducerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, conf.get(KafkaCrossDcConf.ENABLE_DATA_COMPRESSION));

        kafkaProducerProps.put("key.serializer", StringSerializer.class.getName());
        kafkaProducerProps.put("value.serializer", MirroredSolrRequestSerializer.class.getName());

        KafkaCrossDcConf.addSecurityProps(conf, kafkaProducerProps);

        kafkaProducerProps.putAll(conf.getAdditionalProperties());

        if (log.isDebugEnabled()) {
            log.debug("Kafka Producer props={}", kafkaProducerProps);
        }

        ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(null);
        Producer<String, MirroredSolrRequest> producer;
        try {
            producer = new KafkaProducer<>(kafkaProducerProps);
        } finally {
            Thread.currentThread().setContextClassLoader(originalContextClassLoader);
        }
        return producer;
    }