in org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java [52:72]
public void run() {
try {
for (;running;) {
ConsumerRecords<String, byte[]> records = consumer.poll(ofHours(1));
records.forEach(record -> callback.accept(toReceived(record)));
}
} catch (WakeupException e) {
if (running) {
LOG.error("WakeupException while running {}", e.getMessage(), e);
throw e;
} else {
LOG.debug("WakeupException while stopping {}", e.getMessage(), e);
}
} catch(Throwable t) {
LOG.error(format("Catch Throwable %s closing subscription", t.getMessage()), t);
throw t;
} finally {
// Close the network connections and sockets
consumer.close();
}
}