in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [147:208]
private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
this.consumerCfg = consumerConfig;
if (keySchema == null) {
Deserializer<K> kafkaKeyDeserializer = consumerConfig.getConfiguredInstance(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
kafkaKeyDeserializer.configure(consumerConfig.originals(), true);
this.keySchema = new PulsarKafkaSchema<>(kafkaKeyDeserializer);
} else {
this.keySchema = keySchema;
consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
}
if (valueSchema == null) {
Deserializer<V> kafkaValueDeserializer = consumerConfig.getConfiguredInstance(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
kafkaValueDeserializer.configure(consumerConfig.originals(), true);
this.valueSchema = new PulsarKafkaSchema<>(kafkaValueDeserializer);
} else {
this.valueSchema = valueSchema;
consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}
// kafka removes group id for the restore consumer but adds client id
groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG) == null
? consumerConfig.getString(ConsumerConfig.CLIENT_ID_CONFIG)
: consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
Preconditions.checkNotNull(groupId, "groupId cannot be null");
isAutoCommit = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
strategy = getStrategy(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
log.info("Offset reset strategy has been assigned value {}", strategy);
String serviceUrl = consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
// If MAX_POLL_RECORDS_CONFIG is provided then use the config, else use default value.
if(consumerConfig.values().containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)){
maxRecordsInSinglePoll = consumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
} else {
maxRecordsInSinglePoll = 1000;
}
interceptors = (List) consumerConfig.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
this.properties = new Properties();
log.info("config originals: {}", consumerConfig.originals());
consumerConfig.originals().forEach((k, v) -> {
log.info("Setting k = {} v = {}", k, v);
// properties do not allow null values
if (k != null && v != null) {
properties.put(k, v);
}
});
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
// Since this client instance is going to be used just for the consumers, we can enable Nagle to group
// all the acknowledgments sent to broker within a short time frame
clientBuilder.enableTcpNoDelay(false);
try {
client = clientBuilder.serviceUrl(serviceUrl).build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}