public KafkaSubscription()

in rcomp-kafka/src/main/java/org/apache/karaf/rcomp/kafka/KafkaSource.java [60:96]


        public KafkaSubscription(Subscriber<? super T> subscriber) {
            this.subscriber = subscriber;
            this.sent = new AtomicLong();
            this.requested = new AtomicLong();
            this.receiveExecutor = Executors.newSingleThreadExecutor();
            this.finished = new AtomicBoolean(false);
            Runnable receiver = new Runnable() {

                @SuppressWarnings("unchecked")
                @Override
                public void run() {
                    consumer.subscribe(Arrays.asList(topic));
                    while (!finished.get()) {
                        try {
                            if (sent.get() < requested.get())  {
                                ConsumerRecords<String, T> records = consumer.poll(100);
                                records.forEach(record -> handleRecord(record));
                            } else {
                                synchronized (this) {
                                    try {
                                        wait(1000);
                                    } catch (InterruptedException e) {
                                        finished.set(true);
                                    }
                                }
                            }
                        } catch (RuntimeException e) {
                            subscriber.onError(e);
                        }

                    }
                    subscriber.onComplete();
                    consumer.close();
                }
            };
            this.receiveExecutor.submit(receiver);
        }