public void run()

in org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaSubscription.java [52:72]


    public void run() {
        try {
            for (;running;) {
                ConsumerRecords<String, byte[]> records = consumer.poll(ofHours(1));
                records.forEach(record -> callback.accept(toReceived(record)));
            }
        } catch (WakeupException e) {
            if (running) {
                LOG.error("WakeupException while running {}", e.getMessage(), e);
                throw e;
            } else {
                LOG.debug("WakeupException while stopping {}", e.getMessage(), e);
            }
        } catch(Throwable t) {
            LOG.error(format("Catch Throwable %s closing subscription", t.getMessage()), t);
            throw t;
        } finally {
            // Close the network connections and sockets
            consumer.close();
        }
    }