in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitters.java [45:75]
public static Collection<MongoScanSourceSplit> split(
MongoClient mongoClient, MongoReadOptions readOptions, MongoNamespace namespace) {
BsonDocument collStats;
try {
collStats = MongoUtils.collStats(mongoClient, namespace);
} catch (MongoException e) {
LOG.error("Execute collStats command failed, with error message: {}", e.getMessage());
throw new FlinkRuntimeException(e);
}
MongoSplitContext splitContext =
MongoSplitContext.of(readOptions, mongoClient, namespace, collStats);
switch (readOptions.getPartitionStrategy()) {
case SINGLE:
return MongoSingleSplitter.split(splitContext);
case SAMPLE:
return MongoSampleSplitter.split(splitContext);
case SPLIT_VECTOR:
return MongoSplitVectorSplitter.split(splitContext);
case SHARDED:
return MongoShardedSplitter.split(splitContext);
case PAGINATION:
return MongoPaginationSplitter.split(splitContext);
case DEFAULT:
default:
return splitContext.isSharded()
? MongoShardedSplitter.split(splitContext)
: MongoSplitVectorSplitter.split(splitContext);
}
}