in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoPaginationSplitter.java [48:125]
public static Collection<MongoScanSourceSplit> split(MongoSplitContext splitContext) {
MongoReadOptions readOptions = splitContext.getReadOptions();
MongoNamespace namespace = splitContext.getMongoNamespace();
// If partition record size isn't present, we'll use the partition size option and average
// object size to calculate number of records in each partitioned split.
Integer partitionRecordSize = readOptions.getPartitionRecordSize();
if (partitionRecordSize == null) {
long avgObjSizeInBytes = splitContext.getAvgObjSize();
if (avgObjSizeInBytes == 0) {
LOG.info(
"{} seems to be an empty collection, Returning a single partition.",
namespace);
return MongoSingleSplitter.split(splitContext);
}
partitionRecordSize =
Math.toIntExact(readOptions.getPartitionSize().getBytes() / avgObjSizeInBytes);
}
long totalNumOfDocuments = splitContext.getCount();
if (partitionRecordSize >= totalNumOfDocuments) {
LOG.info(
"Fewer documents ({}) than the number of documents per partition ({}), Returning a single partition.",
totalNumOfDocuments,
partitionRecordSize);
return MongoSingleSplitter.split(splitContext);
}
int numberOfPartitions =
(int) (Math.ceil(totalNumOfDocuments / (double) partitionRecordSize));
BsonDocument lastUpperBound = null;
List<MongoScanSourceSplit> paginatedSplits = new ArrayList<>();
for (int splitNum = 0; splitNum < numberOfPartitions; splitNum++) {
List<Bson> pipeline = new ArrayList<>();
pipeline.add(Aggregates.project(Projections.include(ID_FIELD)));
pipeline.add(Aggregates.project(Sorts.ascending(ID_FIELD)));
// We don't have to set the upper bounds limit if we're generating the first split.
if (lastUpperBound != null) {
BsonDocument matchFilter = new BsonDocument();
if (lastUpperBound.containsKey(ID_FIELD)) {
matchFilter.put(
ID_FIELD, new BsonDocument("$gte", lastUpperBound.get(ID_FIELD)));
}
pipeline.add(Aggregates.match(matchFilter));
}
pipeline.add(Aggregates.skip(partitionRecordSize));
pipeline.add(Aggregates.limit(1));
BsonDocument currentUpperBound =
splitContext
.getMongoCollection()
.aggregate(pipeline)
.allowDiskUse(true)
.first();
paginatedSplits.add(
new MongoScanSourceSplit(
String.format("%s_%d", namespace, splitNum),
namespace.getDatabaseName(),
namespace.getCollectionName(),
lastUpperBound != null ? lastUpperBound : BSON_MIN_BOUNDARY,
currentUpperBound != null ? currentUpperBound : BSON_MAX_BOUNDARY,
ID_HINT));
if (currentUpperBound == null) {
break;
}
lastUpperBound = currentUpperBound;
}
return paginatedSplits;
}