in rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaDestination.java [53:69]
public void onNext(T payload) {
try {
ProducerRecord<?, ?> record = getProducerRecord(payload);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
LOGGER.warn("Can't send event to Kafka broker", e);
}
subscription.request(1);
}
});
producer.flush();
} catch (RuntimeException e) {
LOGGER.warn("Error sending event to kafka", e);
}
}