public boolean hasNext()

in flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java [213:271]


    public boolean hasNext() {
        if (readOptions.getRowLimit() != null && readRowCount >= readOptions.getRowLimit()) {
            return false;
        }

        boolean hasNext = false;
        if (deserializeArrowToRowBatchAsync && asyncThreadStarted) {
            // support deserialize Arrow to RowBatch asynchronously
            if (rowBatch == null || !rowBatch.hasNext()) {
                while (!eos.get() || !rowBatchBlockingQueue.isEmpty()) {
                    if (!rowBatchBlockingQueue.isEmpty()) {
                        try {
                            rowBatch = rowBatchBlockingQueue.take();
                        } catch (InterruptedException e) {
                            throw new DorisRuntimeException(e);
                        }
                        hasNext = true;
                        break;
                    } else {
                        // wait for rowBatch put in queue or eos change
                        try {
                            Thread.sleep(5);
                        } catch (InterruptedException e) {
                        }
                    }
                }
            } else {
                hasNext = true;
            }
        } else {
            clientLock.lock();
            try {
                // Arrow data was acquired synchronously during the iterative process
                if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) {
                    if (rowBatch != null) {
                        offset += rowBatch.getReadRowCount();
                        rowBatch.close();
                    }
                    TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
                    nextBatchParams.setContextId(contextId);
                    nextBatchParams.setOffset(offset);
                    TScanBatchResult nextResult = client.getNext(nextBatchParams);
                    eos.set(nextResult.isEos());
                    if (!eos.get()) {
                        rowBatch = new RowBatch(nextResult, schema).readArrow();
                    } else {
                        LOG.info(
                                "Scan finished, tablets: {}, offset: {}",
                                partition.getTabletIds(),
                                offset);
                    }
                }
                hasNext = !eos.get();
            } finally {
                clientLock.unlock();
            }
        }
        return hasNext;
    }