public Collection split()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/assigners/splitters/SampleBucketSplitStrategy.java [78:169]


    public Collection<SnapshotSplit> split(SplitContext splitContext) {
        long chunkSizeInBytes = splitContext.getChunkSizeMB() * 1024 * 1024;

        long sizeInBytes = splitContext.getSizeInBytes();
        long count = splitContext.getDocumentCount();

        // If collection's total uncompressed size less than chunk size,
        // treat the entire collection as single chunk.
        if (sizeInBytes < chunkSizeInBytes) {
            return SingleSplitStrategy.INSTANCE.split(splitContext);
        }

        int numChunks = (int) (sizeInBytes / chunkSizeInBytes) + 1;
        int numberOfSamples;
        if (count < DEFAULT_SAMPLING_THRESHOLD) {
            // full sampling if document count less than sampling size threshold.
            numberOfSamples = (int) count;
        } else {
            numberOfSamples = Math.min(numChunks * splitContext.getSamplesPerChunk(), (int) count);
        }

        TableId collectionId = splitContext.getCollectionId();

        MongoCollection<BsonDocument> collection =
                collectionFor(splitContext.getMongoClient(), collectionId, BsonDocument.class);

        List<Bson> pipeline = new ArrayList<>();
        if (numberOfSamples != count) {
            pipeline.add(sample(numberOfSamples));
        }
        pipeline.add(bucketAuto("$" + ID_FIELD, numChunks));
        LOG.info(
                "Collection {} going to sample {} records into {} chunks",
                collectionId,
                numberOfSamples,
                numChunks);

        List<BsonDocument> chunks =
                collection.aggregate(pipeline).allowDiskUse(true).into(new ArrayList<>());
        LOG.info(
                "Collection {} got {} chunks by auto bucket and sample",
                collectionId,
                chunks.size());

        RowType rowType = shardKeysToRowType(Collections.singleton(ID_FIELD));

        List<SnapshotSplit> snapshotSplits = new ArrayList<>(chunks.size() + 2);

        Map<TableId, TableChanges.TableChange> schema = new HashMap<>();
        schema.put(collectionId, MongoDBDialect.collectionSchema(collectionId));

        SnapshotSplit firstSplit =
                new SnapshotSplit(
                        collectionId,
                        0,
                        rowType,
                        ChunkUtils.minLowerBoundOfId(),
                        ChunkUtils.boundOfId(lowerBoundOfBucket(chunks.get(0))),
                        null,
                        schema);
        snapshotSplits.add(firstSplit);

        for (int i = 0; i < chunks.size(); i++) {
            BsonDocument bucket = chunks.get(i);
            snapshotSplits.add(
                    new SnapshotSplit(
                            collectionId,
                            i + 1,
                            rowType,
                            ChunkUtils.boundOfId(lowerBoundOfBucket(bucket)),
                            ChunkUtils.boundOfId(upperBoundOfBucket(bucket)),
                            null,
                            schema));
        }

        SnapshotSplit lastSplit =
                new SnapshotSplit(
                        collectionId,
                        chunks.size() + 1,
                        rowType,
                        ChunkUtils.boundOfId(upperBoundOfBucket(chunks.get(chunks.size() - 1))),
                        ChunkUtils.maxUpperBoundOfId(),
                        null,
                        schema);
        if (splitContext.isAssignUnboundedChunkFirst()) {
            snapshotSplits.add(0, lastSplit);
        } else {
            snapshotSplits.add(lastSplit);
        }

        return snapshotSplits;
    }