in src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java [89:106]
public void run() {
LOG.info("Start poller");
while(running) {
try {
consumer.poll(ofHours(1)).forEach(this::handle);
} catch (WakeupException e) {
LOG.debug("Waked up {}", e.getMessage(), e);
this.running = false;
} catch (Exception e) {
eventSender.send(e);
LOG.error("Exception while receiving from kafka: {}", e.getMessage(), e);
sleepAfterError();
// Continue as KafkaConsumer should handle the error transparently
}
}
consumer.close();
LOG.info("Stopped poller");
}