private PulsarKafkaConsumer()

in pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [138:181]


    private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema<K> keySchema, Schema<V> valueSchema) {

        if (keySchema == null) {
            Deserializer<K> kafkaKeyDeserializer = consumerConfig.getConfiguredInstance(
                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            kafkaKeyDeserializer.configure(consumerConfig.originals(), true);
            this.keySchema = new PulsarKafkaSchema<>(kafkaKeyDeserializer);
        } else {
            this.keySchema = keySchema;
            consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }

        if (valueSchema == null) {
            Deserializer<V> kafkaValueDeserializer = consumerConfig.getConfiguredInstance(
                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            kafkaValueDeserializer.configure(consumerConfig.originals(), true);
            this.valueSchema = new PulsarKafkaSchema<>(kafkaValueDeserializer);
        } else {
            this.valueSchema = valueSchema;
            consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        }

        groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
        isAutoCommit = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
        strategy = getStrategy(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
        log.info("Offset reset strategy has been assigned value {}", strategy);

        String serviceUrl = consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);

        // there is not this config in kafka 0.9, so use default value.
        maxRecordsInSinglePoll = 1000;

        this.properties = new Properties();
        consumerConfig.originals().forEach(properties::put);
        ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
        // Since this client instance is going to be used just for the consumers, we can enable Nagle to group
        // all the acknowledgments sent to broker within a short time frame
        clientBuilder.enableTcpNoDelay(false);
        try {
            client = clientBuilder.serviceUrl(serviceUrl).build();
        } catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }