public static Collection split()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoPaginationSplitter.java [48:125]


    public static Collection<MongoScanSourceSplit> split(MongoSplitContext splitContext) {
        MongoReadOptions readOptions = splitContext.getReadOptions();
        MongoNamespace namespace = splitContext.getMongoNamespace();

        // If partition record size isn't present, we'll use the partition size option and average
        // object size to calculate number of records in each partitioned split.
        Integer partitionRecordSize = readOptions.getPartitionRecordSize();
        if (partitionRecordSize == null) {
            long avgObjSizeInBytes = splitContext.getAvgObjSize();
            if (avgObjSizeInBytes == 0) {
                LOG.info(
                        "{} seems to be an empty collection, Returning a single partition.",
                        namespace);
                return MongoSingleSplitter.split(splitContext);
            }

            partitionRecordSize =
                    Math.toIntExact(readOptions.getPartitionSize().getBytes() / avgObjSizeInBytes);
        }

        long totalNumOfDocuments = splitContext.getCount();

        if (partitionRecordSize >= totalNumOfDocuments) {
            LOG.info(
                    "Fewer documents ({}) than the number of documents per partition ({}), Returning a single partition.",
                    totalNumOfDocuments,
                    partitionRecordSize);
            return MongoSingleSplitter.split(splitContext);
        }

        int numberOfPartitions =
                (int) (Math.ceil(totalNumOfDocuments / (double) partitionRecordSize));

        BsonDocument lastUpperBound = null;
        List<MongoScanSourceSplit> paginatedSplits = new ArrayList<>();

        for (int splitNum = 0; splitNum < numberOfPartitions; splitNum++) {
            List<Bson> pipeline = new ArrayList<>();

            pipeline.add(Aggregates.project(Projections.include(ID_FIELD)));
            pipeline.add(Aggregates.project(Sorts.ascending(ID_FIELD)));

            // We don't have to set the upper bounds limit if we're generating the first split.
            if (lastUpperBound != null) {
                BsonDocument matchFilter = new BsonDocument();
                if (lastUpperBound.containsKey(ID_FIELD)) {
                    matchFilter.put(
                            ID_FIELD, new BsonDocument("$gte", lastUpperBound.get(ID_FIELD)));
                }
                pipeline.add(Aggregates.match(matchFilter));
            }
            pipeline.add(Aggregates.skip(partitionRecordSize));
            pipeline.add(Aggregates.limit(1));

            BsonDocument currentUpperBound =
                    splitContext
                            .getMongoCollection()
                            .aggregate(pipeline)
                            .allowDiskUse(true)
                            .first();

            paginatedSplits.add(
                    new MongoScanSourceSplit(
                            String.format("%s_%d", namespace, splitNum),
                            namespace.getDatabaseName(),
                            namespace.getCollectionName(),
                            lastUpperBound != null ? lastUpperBound : BSON_MIN_BOUNDARY,
                            currentUpperBound != null ? currentUpperBound : BSON_MAX_BOUNDARY,
                            ID_HINT));

            if (currentUpperBound == null) {
                break;
            }
            lastUpperBound = currentUpperBound;
        }

        return paginatedSplits;
    }