in pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [138:181]
private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
if (keySchema == null) {
Deserializer<K> kafkaKeyDeserializer = consumerConfig.getConfiguredInstance(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
kafkaKeyDeserializer.configure(consumerConfig.originals(), true);
this.keySchema = new PulsarKafkaSchema<>(kafkaKeyDeserializer);
} else {
this.keySchema = keySchema;
consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
}
if (valueSchema == null) {
Deserializer<V> kafkaValueDeserializer = consumerConfig.getConfiguredInstance(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
kafkaValueDeserializer.configure(consumerConfig.originals(), true);
this.valueSchema = new PulsarKafkaSchema<>(kafkaValueDeserializer);
} else {
this.valueSchema = valueSchema;
consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}
groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
isAutoCommit = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
strategy = getStrategy(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
log.info("Offset reset strategy has been assigned value {}", strategy);
String serviceUrl = consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
// there is not this config in kafka 0.9, so use default value.
maxRecordsInSinglePoll = 1000;
this.properties = new Properties();
consumerConfig.originals().forEach(properties::put);
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
// Since this client instance is going to be used just for the consumers, we can enable Nagle to group
// all the acknowledgments sent to broker within a short time frame
clientBuilder.enableTcpNoDelay(false);
try {
client = clientBuilder.serviceUrl(serviceUrl).build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}