in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/source/enumerator/KuduSourceEnumerator.java [225:251]
private void assignSplitsToReaders() {
final Iterator<Integer> awaitingSubtasks = readersAwaitingSplit.iterator();
while (awaitingSubtasks.hasNext()) {
final int awaitingSubtask = awaitingSubtasks.next();
// If the reader that requested another split has failed in the meantime, remove
// it from the list of waiting readers
if (!context.registeredReaders().containsKey(awaitingSubtask)) {
awaitingSubtasks.remove();
continue;
}
final KuduSourceSplit nextSplit = KuduSourceUtils.getNextSplit(unassigned);
if (nextSplit != null) {
context.assignSplit(nextSplit, awaitingSubtask);
awaitingSubtasks.remove();
pending.add(nextSplit);
} else {
if (boundedness == Boundedness.BOUNDED) {
awaitingSubtasks.remove();
context.signalNoMoreSplits(awaitingSubtask);
}
break;
}
}
}