public static Collection split()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoShardedSplitter.java [63:114]


    public static Collection<MongoScanSourceSplit> split(MongoSplitContext splitContext) {
        MongoNamespace namespace = splitContext.getMongoNamespace();
        MongoClient mongoClient = splitContext.getMongoClient();

        List<BsonDocument> chunks;
        Optional<BsonDocument> collectionMetadata;
        try {
            collectionMetadata = readCollectionMetadata(mongoClient, namespace);
            if (!collectionMetadata.isPresent()) {
                LOG.error(
                        "Do sharded split failed, collection {} does not appear to be sharded.",
                        namespace);
                throw new FlinkRuntimeException(
                        String.format(
                                "Do sharded split failed, %s is not a sharded collection.",
                                namespace));
            }

            if (isShardedCollectionDropped(collectionMetadata.get())) {
                LOG.error("Do sharded split failed, collection {} was dropped.", namespace);
                throw new FlinkRuntimeException(
                        String.format("Do sharded split failed, %s was dropped.", namespace));
            }

            chunks = readChunks(mongoClient, collectionMetadata.get());
            if (chunks.isEmpty()) {
                LOG.error("Do sharded split failed, chunks of {} is empty.", namespace);
                throw new FlinkRuntimeException(
                        String.format(
                                "Do sharded split failed, chunks of %s is empty.", namespace));
            }
        } catch (MongoException e) {
            LOG.error(
                    "Read chunks from {} failed with error message: {}", namespace, e.getMessage());
            throw new FlinkRuntimeException(e);
        }

        List<MongoScanSourceSplit> sourceSplits = new ArrayList<>(chunks.size());
        for (int i = 0; i < chunks.size(); i++) {
            BsonDocument chunk = chunks.get(i);
            sourceSplits.add(
                    new MongoScanSourceSplit(
                            String.format("%s_%d", namespace, i),
                            namespace.getDatabaseName(),
                            namespace.getCollectionName(),
                            chunk.getDocument(MIN_FIELD),
                            chunk.getDocument(MAX_FIELD),
                            collectionMetadata.get().getDocument(KEY_FIELD)));
        }

        return sourceSplits;
    }