in src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java [108:120]
public void handle(ConsumerRecord<String, String> record) {
try {
String messageType = getMessageType(record);
JsonRecordHandler<?> handler = handlers.get(messageType);
if (handler != null) {
handler.accept(record);
} else {
LOG.info("No handler for messageType={}. Ignoring message.", messageType);
}
} catch (Exception e) {
LOG.warn("Error consuming message {}", record.headers(), e);
}
}