in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/source/reader/KuduSourceSplitReader.java [54:87]
public RecordsWithSplitIds<RowResult> fetch() throws IOException {
wakeUpFlag.compareAndSet(true, false);
final KuduSourceSplit currentSplit = KuduSourceUtils.getNextSplit(splits);
if (currentSplit == null) {
return new RecordsBySplits.Builder<RowResult>().build();
}
byte[] serializedToken = currentSplit.getSerializedScanToken();
KuduScanner scanner = KuduScanToken.deserializeIntoScanner(serializedToken, kuduClient);
RecordsBySplits.Builder<RowResult> builder = new RecordsBySplits.Builder<>();
try {
while (scanner.hasMoreRows()) {
for (RowResult row : scanner.nextRows()) {
if (wakeUpFlag.get()) {
LOG.debug("Wakeup signal received inside row iteration, stopping fetch.");
scanner.close(); // Close the scanner
splits.add(currentSplit); // Put the split back
return new RecordsBySplits.Builder<RowResult>()
.build(); // Return empty result
}
builder.add(currentSplit.splitId(), row);
}
}
builder.addFinishedSplit(
currentSplit.splitId()); // Mark split as completed only after the loop
} finally {
scanner.close(); // Ensure scanner is always closed
}
return builder.build();
}