in rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaSource.java [107:116]
private void handleRecord(ConsumerRecord<String, T> record) {
System.out.println("Handling message " + record);
if (type == ConsumerRecord.class) {
subscriber.onNext((T)record);
} else {
subscriber.onNext((T)record.value());
}
consumer.commitAsync();
sent.incrementAndGet();
}