static Collection split()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSampleSplitter.java [73:107]


    static Collection<MongoScanSourceSplit> split(
            MongoSplitContext splitContext,
            BiFunction<MongoSplitContext, Integer, List<BsonDocument>> sampler) {
        MongoReadOptions readOptions = splitContext.getReadOptions();
        MongoNamespace namespace = splitContext.getMongoNamespace();

        long totalNumDocuments = splitContext.getCount();
        long partitionSizeInBytes = readOptions.getPartitionSize().getBytes();
        int samplesPerPartition = readOptions.getSamplesPerPartition();

        long avgObjSizeInBytes = splitContext.getAvgObjSize();
        if (avgObjSizeInBytes == 0L) {
            LOG.info(
                    "{} seems to be an empty collection, Returning a single partition.", namespace);
            return MongoSingleSplitter.split(splitContext);
        }

        long numDocumentsPerPartition = partitionSizeInBytes / avgObjSizeInBytes;
        if (numDocumentsPerPartition >= totalNumDocuments) {
            LOG.info(
                    "Fewer documents ({}) than the number of documents per partition ({}), Returning a single partition.",
                    totalNumDocuments,
                    numDocumentsPerPartition);
            return MongoSingleSplitter.split(splitContext);
        }

        int numberOfPartitions =
                (int) Math.ceil(totalNumDocuments * 1.0d / numDocumentsPerPartition);
        // N samples divide the data into N + 1 partitions
        int numberOfSamples = samplesPerPartition * numberOfPartitions - 1;

        List<BsonDocument> samples = sampler.apply(splitContext, numberOfSamples);

        return createSplits(samples, samplesPerPartition, namespace);
    }