private Observable createBackPressuredConsumerObs()

in mantis-connectors/mantis-connector-kafka/src/main/java/io/mantisrx/connector/kafka/source/KafkaSource.java [121:254]


    private Observable<KafkaAckable> createBackPressuredConsumerObs(final MantisKafkaConsumer<?> mantisKafkaConsumer,
                                                                    final MantisKafkaSourceConfig kafkaSourceConfig) {
        CheckpointStrategy checkpointStrategy = mantisKafkaConsumer.getStrategy();
        final CheckpointTrigger trigger = mantisKafkaConsumer.getTrigger();
        final ConsumerMetrics consumerMetrics = mantisKafkaConsumer.getConsumerMetrics();
        final TopicPartitionStateManager partitionStateManager = mantisKafkaConsumer.getPartitionStateManager();
        int mantisKafkaConsumerId = mantisKafkaConsumer.getConsumerId();

        SyncOnSubscribe<Iterator<ConsumerRecord<String, byte[]>>, KafkaAckable> syncOnSubscribe = SyncOnSubscribe.createStateful(
            () -> {
                final ConsumerRecords<String, byte[]> records = mantisKafkaConsumer.poll(kafkaSourceConfig.getConsumerPollTimeoutMs());
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("topic listing for consumer {}", mantisKafkaConsumer.listTopics());
                }
                LOGGER.info("consumer subscribed to topic-partitions {}", mantisKafkaConsumer.assignment());
                return records.iterator();
            },
            (consumerRecordIterator, observer) -> {
                Iterator<ConsumerRecord<String, byte[]>> it = consumerRecordIterator;
                final Set<TopicPartition> partitions = mantisKafkaConsumer.assignment();

                if (trigger.shouldCheckpoint()) {
                    long startTime = System.currentTimeMillis();

                    final Map<TopicPartition, OffsetAndMetadata> checkpoint =
                        partitionStateManager.createCheckpoint(partitions);
                    checkpointStrategy.persistCheckpoint(checkpoint);
                    long now = System.currentTimeMillis();
                    consumerMetrics.recordCheckpointDelay(now - startTime);
                    consumerMetrics.incrementCommitCount();
                    trigger.reset();
                }
                if (!done.get()) {
                    try {
                        if (!consumerRecordIterator.hasNext()) {
                            final ConsumerRecords<String, byte[]> consumerRecords =
                                mantisKafkaConsumer.poll(kafkaSourceConfig.getConsumerPollTimeoutMs());
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("poll returned {} records", consumerRecords.count());
                            }
                            it = consumerRecords.iterator();
                        }

                        if (it.hasNext()) {
                            final ConsumerRecord<String, byte[]> m = it.next();
                            final TopicPartition topicPartition = new TopicPartition(m.topic(), m.partition());

                            consumerMetrics.incrementInCount();
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("updating read offset to " + m.offset() + " read " + m.value());
                            }

                            if (m.value() != null) {
                                try {
                                    trigger.update(getPayloadSize(m));

                                    if (kafkaSourceConfig.getParseMessageInSource()) {
                                        final Parser parser = ParserType.parser(kafkaSourceConfig.getMessageParserType()).getParser();
                                        if (parser.canParse(m.value())) {
                                            final Map<String, Object> parsedKafkaValue = parser.parseMessage(m.value());
                                            final KafkaData kafkaData = new KafkaData(m, Optional.ofNullable(parsedKafkaValue), Optional.ofNullable(m.key()), mantisKafkaConsumerId);
                                            final KafkaAckable ackable = new KafkaAckable(kafkaData, ackSubject);
                                            // record offset consumed in TopicPartitionStateManager before onNext to avoid race condition with Ack being processed before the consume is recorded
                                            partitionStateManager.recordMessageRead(topicPartition, m.offset());
                                            consumerMetrics.recordReadOffset(topicPartition, m.offset());
                                            observer.onNext(ackable);
                                        } else {
                                            consumerMetrics.incrementParseFailureCount();
                                        }
                                    } else {
                                        final KafkaData kafkaData = new KafkaData(m, Optional.empty(), Optional.ofNullable(m.key()), mantisKafkaConsumerId);
                                        final KafkaAckable ackable = new KafkaAckable(kafkaData, ackSubject);
                                        // record offset consumed in TopicPartitionStateManager before onNext to avoid race condition with Ack being processed before the consume is recorded
                                        partitionStateManager.recordMessageRead(topicPartition, m.offset());
                                        consumerMetrics.recordReadOffset(topicPartition, m.offset());
                                        observer.onNext(ackable);
                                    }
                                } catch (ParseException pe) {
                                    consumerMetrics.incrementErrorCount();
                                    LOGGER.warn("failed to parse {}:{} message {}", m.topic(), m.partition(), m.value(), pe);
                                }
                            } else {
                                consumerMetrics.incrementKafkaMessageValueNullCount();
                            }
                        } else {
                            consumerMetrics.incrementWaitForDataCount();
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("Reached head of partition, waiting for more data");
                            }
                            TimeUnit.MILLISECONDS.sleep(200);
                        }
                    } catch (TimeoutException toe) {
                        consumerMetrics.incrementWaitForDataCount();
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Reached head of partition waiting for more data");
                        }
                    } catch (OffsetOutOfRangeException oore) {
                        LOGGER.warn("offsets out of range " + oore.partitions() + " will seek to beginning", oore);
                        final Set<TopicPartition> topicPartitionSet = oore.partitions();
                        for (TopicPartition tp : topicPartitionSet) {
                            LOGGER.info("partition {} consumer position {}", tp, mantisKafkaConsumer.position(tp));
                        }
                        mantisKafkaConsumer.seekToBeginning(oore.partitions().toArray(new TopicPartition[oore.partitions().size()]));
                    } catch (InvalidRecordException ire) {
                        consumerMetrics.incrementErrorCount();
                        LOGGER.warn("iterator error with invalid message. message will be dropped " + ire.getMessage());
                    } catch (KafkaException e) {
                        consumerMetrics.incrementErrorCount();
                        LOGGER.warn("Other Kafka exception, message will be dropped. " + e.getMessage());
                    } catch (InterruptedException ie) {
                        LOGGER.error("consumer interrupted", ie);
                        Thread.currentThread().interrupt();
                    } catch (Exception e) {
                        consumerMetrics.incrementErrorCount();
                        LOGGER.warn("caught exception", e);
                    }
                } else {
                    mantisKafkaConsumer.close();
                }
                return it;
            },
            consumerRecordIterator -> {
                LOGGER.info("closing Kafka consumer on unsubscribe" + mantisKafkaConsumer.toString());
                mantisKafkaConsumer.close();
            });
        return Observable.create(syncOnSubscribe)
            .subscribeOn(Schedulers.newThread())
            .doOnUnsubscribe(() -> LOGGER.info("consumer {} stopped due to unsubscribe", mantisKafkaConsumerId))
            .doOnError((t) -> {
                LOGGER.error("consumer {} stopped due to error", mantisKafkaConsumerId, t);
                consumerMetrics.incrementErrorCount();
            })
            .doOnTerminate(() -> LOGGER.info("consumer {} terminated", mantisKafkaConsumerId));
    }