in rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaSource.java [60:96]
public KafkaSubscription(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
this.sent = new AtomicLong();
this.requested = new AtomicLong();
this.receiveExecutor = Executors.newSingleThreadExecutor();
this.finished = new AtomicBoolean(false);
Runnable receiver = new Runnable() {
@SuppressWarnings("unchecked")
@Override
public void run() {
consumer.subscribe(Arrays.asList(topic));
while (!finished.get()) {
try {
if (sent.get() < requested.get()) {
ConsumerRecords<String, T> records = consumer.poll(100);
records.forEach(record -> handleRecord(record));
} else {
synchronized (this) {
try {
wait(1000);
} catch (InterruptedException e) {
finished.set(true);
}
}
}
} catch (RuntimeException e) {
subscriber.onError(e);
}
}
subscriber.onComplete();
consumer.close();
}
};
this.receiveExecutor.submit(receiver);
}