public void onNext()

in rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaDestination.java [53:69]


    public void onNext(T payload) {
        try {
            ProducerRecord<?, ?> record = getProducerRecord(payload);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e != null) {
                        LOGGER.warn("Can't send event to Kafka broker", e);
                    }
                    subscription.request(1);
                }
            });
            producer.flush();
        } catch (RuntimeException e) {
            LOGGER.warn("Error sending event to kafka", e);
        }
    }