private void assignSplitsToReaders()

in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/source/enumerator/KuduSourceEnumerator.java [225:251]


    private void assignSplitsToReaders() {
        final Iterator<Integer> awaitingSubtasks = readersAwaitingSplit.iterator();

        while (awaitingSubtasks.hasNext()) {
            final int awaitingSubtask = awaitingSubtasks.next();

            // If the reader that requested another split has failed in the meantime, remove
            // it from the list of waiting readers
            if (!context.registeredReaders().containsKey(awaitingSubtask)) {
                awaitingSubtasks.remove();
                continue;
            }

            final KuduSourceSplit nextSplit = KuduSourceUtils.getNextSplit(unassigned);
            if (nextSplit != null) {
                context.assignSplit(nextSplit, awaitingSubtask);
                awaitingSubtasks.remove();
                pending.add(nextSplit);
            } else {
                if (boundedness == Boundedness.BOUNDED) {
                    awaitingSubtasks.remove();
                    context.signalNoMoreSplits(awaitingSubtask);
                }
                break;
            }
        }
    }