in pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java [119:152]
public void start() {
runnerThread = new Thread(() -> {
LOG.info("Starting kafka source");
consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
LOG.info("Kafka source started.");
ConsumerRecords<String, byte[]> consumerRecords;
while (running) {
consumerRecords = consumer.poll(1000);
CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
int index = 0;
for (ConsumerRecord<String, byte[]> consumerRecord : consumerRecords) {
LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
consume(record);
futures[index] = record.getCompletableFuture();
index++;
}
if (!kafkaSourceConfig.isAutoCommitEnabled()) {
try {
CompletableFuture.allOf(futures).get();
consumer.commitSync();
} catch (InterruptedException ex) {
break;
} catch (ExecutionException ex) {
LOG.error("Error while processing records", ex);
break;
}
}
}
});
runnerThread.setUncaughtExceptionHandler((t, e) -> LOG.error("[{}] Error while consuming records", t.getName(), e));
runnerThread.setName("Kafka Source Thread");
runnerThread.start();
}