private void assignSplits()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java [88:121]


    private void assignSplits() {
        final Iterator<Integer> awaitingReader = readersAwaitingSplit.iterator();

        while (awaitingReader.hasNext()) {
            int nextAwaiting = awaitingReader.next();
            // if the reader that requested another split has failed in the meantime, remove
            // it from the list of waiting readers
            if (!context.registeredReaders().containsKey(nextAwaiting)) {
                awaitingReader.remove();
                continue;
            }

            // close idle readers
            if (splitAssigner.noMoreSplits() && boundedness == Boundedness.BOUNDED) {
                context.signalNoMoreSplits(nextAwaiting);
                awaitingReader.remove();
                LOG.info(
                        "All scan splits have been assigned, closing idle reader {}", nextAwaiting);
                continue;
            }

            Optional<MongoSourceSplit> split = splitAssigner.getNext();
            if (split.isPresent()) {
                final MongoSourceSplit mongoSplit = split.get();
                context.assignSplit(mongoSplit, nextAwaiting);
                awaitingReader.remove();
                LOG.info("Assign split {} to subtask {}", mongoSplit, nextAwaiting);
                break;
            } else {
                // there is no available splits by now, skip assigning
                break;
            }
        }
    }