in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java [381:440]
public boolean advanceNextPosition() {
if (readEntries == null) {
// start deserialize thread
deserializeEntries = new DeserializeEntries();
deserializeEntries.start();
readEntries = new ReadEntries();
readEntries.run();
}
if (currentMessage != null) {
currentMessage.release();
currentMessage = null;
}
while (true) {
if (readEntries.hasFinished()) {
return false;
}
if ((messageQueue.capacity() - messageQueue.size()) > 0) {
readEntries.run();
}
currentMessage = messageQueue.poll();
if (currentMessage != null) {
break;
} else {
try {
Thread.sleep(1);
// stats for time spent wait to read from message queue because its empty
metricsTracker.register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
//start time for deseralizing record
metricsTracker.start_RECORD_DESERIALIZE_TIME();
if (this.schemaHandler instanceof KeyValueSchemaHandler) {
ByteBuf keyByteBuf = null;
if (this.currentMessage.getKeyBytes().isPresent()) {
keyByteBuf = this.currentMessage.getKeyBytes().get();
}
currentRecord = this.schemaHandler.deserialize(keyByteBuf,
this.currentMessage.getData(), this.currentMessage.getSchemaVersion());
} else {
currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData(),
this.currentMessage.getSchemaVersion());
}
metricsTracker.incr_NUM_RECORD_DESERIALIZED();
// stats for time spend deserializing
metricsTracker.end_RECORD_DESERIALIZE_TIME();
return true;
}