in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java [302:340]
public void run() {
if (outstandingReadsRequests.get() > 0) {
if (!cursor.hasMoreEntries() || ((PositionImpl) cursor.getReadPosition())
.compareTo(pulsarSplit.getEndPosition()) >= 0) {
isDone = true;
} else {
int batchSize = Math.min(maxBatchSize, entryQueue.capacity() - entryQueue.size());
if (batchSize > 0) {
ReadOnlyCursorImpl readOnlyCursorImpl = ((ReadOnlyCursorImpl) cursor);
// check if ledger is offloaded
if (!readOffloaded && readOnlyCursorImpl.getCurrentLedgerInfo().hasOffloadContext()) {
log.warn(
"Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured",
readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName());
long numEntries = readOnlyCursorImpl.getCurrentLedgerInfo().getEntries();
long entriesToSkip =
(numEntries - ((PositionImpl) cursor.getReadPosition()).getEntryId()) + 1;
cursor.skipEntries(Math.toIntExact((entriesToSkip)));
entriesProcessed += entriesToSkip;
} else {
outstandingReadsRequests.decrementAndGet();
cursor.asyncReadEntries(batchSize, this, System.nanoTime());
}
// stats for successful read request
metricsTracker.incr_READ_ATTEMPTS_SUCCESS();
} else {
// stats for failed read request because entry queue is full
metricsTracker.incr_READ_ATTEMPTS_FAIL();
}
}
}
}