in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java [73:107]
static Collection<MongoScanSourceSplit> split(
MongoSplitContext splitContext,
BiFunction<MongoSplitContext, Integer, List<BsonDocument>> sampler) {
MongoReadOptions readOptions = splitContext.getReadOptions();
MongoNamespace namespace = splitContext.getMongoNamespace();
long totalNumDocuments = splitContext.getCount();
long partitionSizeInBytes = readOptions.getPartitionSize().getBytes();
int samplesPerPartition = readOptions.getSamplesPerPartition();
long avgObjSizeInBytes = splitContext.getAvgObjSize();
if (avgObjSizeInBytes == 0L) {
LOG.info(
"{} seems to be an empty collection, Returning a single partition.", namespace);
return MongoSingleSplitter.split(splitContext);
}
long numDocumentsPerPartition = partitionSizeInBytes / avgObjSizeInBytes;
if (numDocumentsPerPartition >= totalNumDocuments) {
LOG.info(
"Fewer documents ({}) than the number of documents per partition ({}), Returning a single partition.",
totalNumDocuments,
numDocumentsPerPartition);
return MongoSingleSplitter.split(splitContext);
}
int numberOfPartitions =
(int) Math.ceil(totalNumDocuments * 1.0d / numDocumentsPerPartition);
// N samples divide the data into N + 1 partitions
int numberOfSamples = samplesPerPartition * numberOfPartitions - 1;
List<BsonDocument> samples = sampler.apply(splitContext, numberOfSamples);
return createSplits(samples, samplesPerPartition, namespace);
}