public RecordsWithSplitIds fetch()

in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/source/reader/KuduSourceSplitReader.java [54:87]


    public RecordsWithSplitIds<RowResult> fetch() throws IOException {
        wakeUpFlag.compareAndSet(true, false);

        final KuduSourceSplit currentSplit = KuduSourceUtils.getNextSplit(splits);
        if (currentSplit == null) {
            return new RecordsBySplits.Builder<RowResult>().build();
        }

        byte[] serializedToken = currentSplit.getSerializedScanToken();
        KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, kuduClient);
        RecordsBySplits.Builder<RowResult> builder = new RecordsBySplits.Builder<>();

        try {
            while (scanner.hasMoreRows()) {
                for (RowResult row : scanner.nextRows()) {
                    if (wakeUpFlag.get()) {
                        LOG.debug("Wakeup signal received inside row iteration, stopping fetch.");
                        scanner.close(); // Close the scanner
                        splits.add(currentSplit); // Put the split back
                        return new RecordsBySplits.Builder<RowResult>()
                                .build(); // Return empty result
                    }
                    builder.add(currentSplit.splitId(), row);
                }
            }
            builder.addFinishedSplit(
                    currentSplit.splitId()); // Mark split as completed only after the loop

        } finally {
            scanner.close(); // Ensure scanner is always closed
        }

        return builder.build();
    }