public void run()

in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java [221:289]


        public void run() {
            isRunning = true;
            while (isRunning) {

                 int read = entryQueue.drain(new MessagePassingQueue.Consumer<Entry>() {
                    @Override
                    public void accept(Entry entry) {

                        try {
                            long bytes = entry.getDataBuffer().readableBytes();
                            completedBytes += bytes;
                            // register stats for bytes read
                            metricsTracker.register_BYTES_READ(bytes);

                            // check if we have processed all entries in this split
                            if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0) {
                                return;
                            }

                            // set start time for time deserializing entries for stats
                            metricsTracker.start_ENTRY_DESERIALIZE_TIME();

                            try {
                                MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
                                        entry.getDataBuffer(), (message) -> {
                                            try {
                                                // start time for message queue read
                                                metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();

                                                // enqueue deserialize message from this entry
                                                while (!messageQueue.offer(message)) {
                                                    Thread.sleep(1);
                                                }

                                                // stats for how long a read from message queue took
                                                metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
                                                // stats for number of messages read
                                                metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();

                                            } catch (InterruptedException e) {
                                                //no-op
                                            }
                                        }, pulsarConnectorConfig.getMaxMessageSize());
                            } catch (IOException e) {
                                log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
                                throw new RuntimeException(e);
                            }
                            // stats for time spend deserializing entries
                            metricsTracker.end_ENTRY_DESERIALIZE_TIME();

                            // stats for num messages per entry
                            metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();

                        } finally {
                            entriesProcessed++;
                            entry.release();
                        }
                    }
                });

                if (read <= 0) {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            }
        }