public void run()

in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java [302:340]


        public void run() {

            if (outstandingReadsRequests.get() > 0) {
                if (!cursor.hasMoreEntries() || ((PositionImpl) cursor.getReadPosition())
                        .compareTo(pulsarSplit.getEndPosition()) >= 0) {
                    isDone = true;

                } else {
                    int batchSize = Math.min(maxBatchSize, entryQueue.capacity() - entryQueue.size());

                    if (batchSize > 0) {

                        ReadOnlyCursorImpl readOnlyCursorImpl = ((ReadOnlyCursorImpl) cursor);
                        // check if ledger is offloaded
                        if (!readOffloaded  && readOnlyCursorImpl.getCurrentLedgerInfo().hasOffloadContext()) {
                            log.warn(
                                "Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured",
                                readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName());

                            long numEntries = readOnlyCursorImpl.getCurrentLedgerInfo().getEntries();
                            long entriesToSkip =
                                (numEntries - ((PositionImpl) cursor.getReadPosition()).getEntryId()) + 1;
                            cursor.skipEntries(Math.toIntExact((entriesToSkip)));

                            entriesProcessed += entriesToSkip;
                        } else {
                            outstandingReadsRequests.decrementAndGet();
                            cursor.asyncReadEntries(batchSize, this, System.nanoTime());
                        }

                        // stats for successful read request
                        metricsTracker.incr_READ_ATTEMPTS_SUCCESS();
                    } else {
                        // stats for failed read request because entry queue is full
                        metricsTracker.incr_READ_ATTEMPTS_FAIL();
                    }
                }
            }
        }