in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/MongoSourceEnumerator.java [88:121]
private void assignSplits() {
final Iterator<Integer> awaitingReader = readersAwaitingSplit.iterator();
while (awaitingReader.hasNext()) {
int nextAwaiting = awaitingReader.next();
// if the reader that requested another split has failed in the meantime, remove
// it from the list of waiting readers
if (!context.registeredReaders().containsKey(nextAwaiting)) {
awaitingReader.remove();
continue;
}
// close idle readers
if (splitAssigner.noMoreSplits() && boundedness == Boundedness.BOUNDED) {
context.signalNoMoreSplits(nextAwaiting);
awaitingReader.remove();
LOG.info(
"All scan splits have been assigned, closing idle reader {}", nextAwaiting);
continue;
}
Optional<MongoSourceSplit> split = splitAssigner.getNext();
if (split.isPresent()) {
final MongoSourceSplit mongoSplit = split.get();
context.assignSplit(mongoSplit, nextAwaiting);
awaitingReader.remove();
LOG.info("Assign split {} to subtask {}", mongoSplit, nextAwaiting);
break;
} else {
// there is no available splits by now, skip assigning
break;
}
}
}