public static Collection split()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitVectorSplitter.java [63:112]


    public static Collection<MongoScanSourceSplit> split(MongoSplitContext splitContext) {
        if (splitContext.isSharded()) {
            throw new FlinkRuntimeException("splitVector does not apply to sharded collections.");
        }

        MongoClient mongoClient = splitContext.getMongoClient();
        MongoNamespace namespace = splitContext.getMongoNamespace();
        MongoReadOptions readOptions = splitContext.getReadOptions();

        MemorySize chunkSize = readOptions.getPartitionSize();
        // if partition size < 1mb, use 1 mb as chunk size.
        int maxChunkSizeMB = Math.max(chunkSize.getMebiBytes(), 1);

        BsonDocument keyPattern = new BsonDocument(ID_FIELD, new BsonInt32(1));

        BsonDocument splitResult;
        try {
            splitResult = splitVector(mongoClient, namespace, keyPattern, maxChunkSizeMB);
        } catch (MongoException e) {
            LOG.error("Execute splitVector command failed : {}", e.getMessage());
            throw new FlinkRuntimeException(e);
        }

        BsonArray splitKeys = splitResult.getArray(SPLIT_KEYS_FIELD);
        if (CollectionUtils.isEmpty(splitKeys)) {
            // documents size is less than chunk size, treat the entire collection as single chunk.
            return MongoSingleSplitter.split(splitContext);
        }

        // Complete right bound: (lastKey, maxKey)
        splitKeys.add(BSON_MAX_BOUNDARY);

        List<MongoScanSourceSplit> sourceSplits = new ArrayList<>(splitKeys.size());

        BsonValue lowerValue = BSON_MIN_KEY;
        for (int i = 0; i < splitKeys.size(); i++) {
            BsonValue splitKeyValue = splitKeys.get(i).asDocument().get(ID_FIELD);
            sourceSplits.add(
                    new MongoScanSourceSplit(
                            String.format("%s_%d", namespace, i),
                            namespace.getDatabaseName(),
                            namespace.getCollectionName(),
                            new BsonDocument(ID_FIELD, lowerValue),
                            new BsonDocument(ID_FIELD, splitKeyValue),
                            ID_HINT));
            lowerValue = splitKeyValue;
        }

        return sourceSplits;
    }