public void start()

in pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java [119:152]


    public void start() {
        runnerThread = new Thread(() -> {
            LOG.info("Starting kafka source");
            consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
            LOG.info("Kafka source started.");
            ConsumerRecords<String, byte[]> consumerRecords;
            while (running) {
                consumerRecords = consumer.poll(1000);
                CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
                int index = 0;
                for (ConsumerRecord<String, byte[]> consumerRecord : consumerRecords) {
                    LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
                    consume(record);
                    futures[index] = record.getCompletableFuture();
                    index++;
                }
                if (!kafkaSourceConfig.isAutoCommitEnabled()) {
                    try {
                        CompletableFuture.allOf(futures).get();
                        consumer.commitSync();
                    } catch (InterruptedException ex) {
                        break;
                    } catch (ExecutionException ex) {
                        LOG.error("Error while processing records", ex);
                        break;
                    }
                }
            }
        });
        runnerThread.setUncaughtExceptionHandler((t, e) -> LOG.error("[{}] Error while consuming records", t.getName(), e));
        runnerThread.setName("Kafka Source Thread");
        runnerThread.start();
    }