public void run()

in src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java [89:106]


    public void run() {
        LOG.info("Start poller");
        while(running) {
            try {
                consumer.poll(ofHours(1)).forEach(this::handle);
            } catch (WakeupException e) {
                LOG.debug("Waked up {}", e.getMessage(), e);
                this.running = false;
            } catch (Exception e) {
                eventSender.send(e);
                LOG.error("Exception while receiving from kafka: {}", e.getMessage(), e);
                sleepAfterError();
                // Continue as KafkaConsumer should handle the error transparently
            }
        }
        consumer.close();
        LOG.info("Stopped poller");
    }