in src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java [74:88]
public void send(T payload, Map<String, String> properties) throws MessagingException {
try {
ObjectWriter writer = mapper.writerFor(payload.getClass());
String payloadSt = writer.writeValueAsString(payload);
List<Header> headerList = properties.entrySet().stream().map(this::toHeader).collect(Collectors.toList());
RecordHeader messageType = header(KafkaMessageInfo.KEY_MESSAGE_TYPE, payload.getClass().getSimpleName());
headerList.add(messageType);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, PARTITION, null, payloadSt, headerList);
RecordMetadata metadata = producer.send(record).get();
LOG.info("Sent to topic={}, offset={}", topic, metadata.offset());
} catch (Exception e) {
eventSender.send(e);
throw new MessagingException(format("Failed to send JSON message on topic %s", topic), e);
}
}