in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java [221:289]
public void run() {
isRunning = true;
while (isRunning) {
int read = entryQueue.drain(new MessagePassingQueue.Consumer<Entry>() {
@Override
public void accept(Entry entry) {
try {
long bytes = entry.getDataBuffer().readableBytes();
completedBytes += bytes;
// register stats for bytes read
metricsTracker.register_BYTES_READ(bytes);
// check if we have processed all entries in this split
if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0) {
return;
}
// set start time for time deserializing entries for stats
metricsTracker.start_ENTRY_DESERIALIZE_TIME();
try {
MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
entry.getDataBuffer(), (message) -> {
try {
// start time for message queue read
metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
// enqueue deserialize message from this entry
while (!messageQueue.offer(message)) {
Thread.sleep(1);
}
// stats for how long a read from message queue took
metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
// stats for number of messages read
metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
} catch (InterruptedException e) {
//no-op
}
}, pulsarConnectorConfig.getMaxMessageSize());
} catch (IOException e) {
log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
throw new RuntimeException(e);
}
// stats for time spend deserializing entries
metricsTracker.end_ENTRY_DESERIALIZE_TIME();
// stats for num messages per entry
metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
} finally {
entriesProcessed++;
entry.release();
}
}
});
if (read <= 0) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
return;
}
}
}
}