in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/assigner/MongoScanSplitAssigner.java [88:109]
public Optional<MongoSourceSplit> getNext() {
if (!remainingScanSplits.isEmpty()) {
// return remaining splits firstly
MongoScanSourceSplit split = remainingScanSplits.poll();
assignedScanSplits.put(split.splitId(), split);
return Optional.of(split);
} else {
// it's turn for next collection
String nextCollection = remainingCollections.poll();
if (nextCollection != null) {
// split the given collection into chunks (scan splits)
Collection<MongoScanSourceSplit> splits =
MongoSplitters.split(
mongoClient, readOptions, new MongoNamespace(nextCollection));
remainingScanSplits.addAll(splits);
alreadyProcessedCollections.add(nextCollection);
return getNext();
} else {
return Optional.empty();
}
}
}