public boolean advanceNextPosition()

in presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java [381:440]


    public boolean advanceNextPosition() {

        if (readEntries == null) {
            // start deserialize thread
            deserializeEntries = new DeserializeEntries();
            deserializeEntries.start();

            readEntries = new ReadEntries();
            readEntries.run();
        }

        if (currentMessage != null) {
            currentMessage.release();
            currentMessage = null;
        }

        while (true) {
            if (readEntries.hasFinished()) {
                return false;
            }

            if ((messageQueue.capacity() - messageQueue.size()) > 0) {
                readEntries.run();
            }

            currentMessage = messageQueue.poll();
            if (currentMessage != null) {
                break;
            } else {
                try {
                    Thread.sleep(1);
                    // stats for time spent wait to read from message queue because its empty
                    metricsTracker.register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(1);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        //start time for deseralizing record
        metricsTracker.start_RECORD_DESERIALIZE_TIME();

        if (this.schemaHandler instanceof KeyValueSchemaHandler) {
            ByteBuf keyByteBuf = null;
            if (this.currentMessage.getKeyBytes().isPresent()) {
                keyByteBuf = this.currentMessage.getKeyBytes().get();
            }
            currentRecord = this.schemaHandler.deserialize(keyByteBuf,
                    this.currentMessage.getData(), this.currentMessage.getSchemaVersion());
        } else {
            currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData(),
                    this.currentMessage.getSchemaVersion());
        }
        metricsTracker.incr_NUM_RECORD_DESERIALIZED();

        // stats for time spend deserializing
        metricsTracker.end_RECORD_DESERIALIZE_TIME();

        return true;
    }