private PulsarKafkaConsumer()

in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [147:208]


    private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
        this.consumerCfg = consumerConfig;
        if (keySchema == null) {
            Deserializer<K> kafkaKeyDeserializer = consumerConfig.getConfiguredInstance(
                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            kafkaKeyDeserializer.configure(consumerConfig.originals(), true);
            this.keySchema = new PulsarKafkaSchema<>(kafkaKeyDeserializer);
        } else {
            this.keySchema = keySchema;
            consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }

        if (valueSchema == null) {
            Deserializer<V> kafkaValueDeserializer = consumerConfig.getConfiguredInstance(
                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
            kafkaValueDeserializer.configure(consumerConfig.originals(), true);
            this.valueSchema = new PulsarKafkaSchema<>(kafkaValueDeserializer);
        } else {
            this.valueSchema = valueSchema;
            consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        }

        // kafka removes group id for the restore consumer but adds client id
        groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG) == null
                ? consumerConfig.getString(ConsumerConfig.CLIENT_ID_CONFIG)
                : consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
        Preconditions.checkNotNull(groupId, "groupId cannot be null");
        isAutoCommit = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
        strategy = getStrategy(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
        log.info("Offset reset strategy has been assigned value {}", strategy);

        String serviceUrl = consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);

        // If MAX_POLL_RECORDS_CONFIG is provided then use the config, else use default value.
        if(consumerConfig.values().containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)){
            maxRecordsInSinglePoll = consumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
        } else {
            maxRecordsInSinglePoll = 1000;
        }

        interceptors = (List) consumerConfig.getConfiguredInstances(
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);

        this.properties = new Properties();
        log.info("config originals: {}", consumerConfig.originals());
        consumerConfig.originals().forEach((k, v) -> {
            log.info("Setting k = {} v = {}", k, v);
            // properties do not allow null values
            if (k != null && v != null) {
                properties.put(k, v);
            }
        });
        ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
        // Since this client instance is going to be used just for the consumers, we can enable Nagle to group
        // all the acknowledgments sent to broker within a short time frame
        clientBuilder.enableTcpNoDelay(false);
        try {
            client = clientBuilder.serviceUrl(serviceUrl).build();
        } catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }