public void send()

in src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java [74:88]


    public void send(T payload, Map<String, String> properties) throws MessagingException {
        try {
            ObjectWriter writer = mapper.writerFor(payload.getClass());
            String payloadSt = writer.writeValueAsString(payload);
            List<Header> headerList = properties.entrySet().stream().map(this::toHeader).collect(Collectors.toList());
            RecordHeader messageType = header(KafkaMessageInfo.KEY_MESSAGE_TYPE, payload.getClass().getSimpleName());
            headerList.add(messageType);
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, PARTITION, null, payloadSt, headerList);
            RecordMetadata metadata = producer.send(record).get();
            LOG.info("Sent to topic={}, offset={}", topic, metadata.offset());
        } catch (Exception e) {
            eventSender.send(e);
            throw new MessagingException(format("Failed to send JSON message on topic %s", topic), e);
        }
    }