in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoSplitVectorSplitter.java [63:112]
public static Collection<MongoScanSourceSplit> split(MongoSplitContext splitContext) {
if (splitContext.isSharded()) {
throw new FlinkRuntimeException("splitVector does not apply to sharded collections.");
}
MongoClient mongoClient = splitContext.getMongoClient();
MongoNamespace namespace = splitContext.getMongoNamespace();
MongoReadOptions readOptions = splitContext.getReadOptions();
MemorySize chunkSize = readOptions.getPartitionSize();
// if partition size < 1mb, use 1 mb as chunk size.
int maxChunkSizeMB = Math.max(chunkSize.getMebiBytes(), 1);
BsonDocument keyPattern = new BsonDocument(ID_FIELD, new BsonInt32(1));
BsonDocument splitResult;
try {
splitResult = splitVector(mongoClient, namespace, keyPattern, maxChunkSizeMB);
} catch (MongoException e) {
LOG.error("Execute splitVector command failed : {}", e.getMessage());
throw new FlinkRuntimeException(e);
}
BsonArray splitKeys = splitResult.getArray(SPLIT_KEYS_FIELD);
if (CollectionUtils.isEmpty(splitKeys)) {
// documents size is less than chunk size, treat the entire collection as single chunk.
return MongoSingleSplitter.split(splitContext);
}
// Complete right bound: (lastKey, maxKey)
splitKeys.add(BSON_MAX_BOUNDARY);
List<MongoScanSourceSplit> sourceSplits = new ArrayList<>(splitKeys.size());
BsonValue lowerValue = BSON_MIN_KEY;
for (int i = 0; i < splitKeys.size(); i++) {
BsonValue splitKeyValue = splitKeys.get(i).asDocument().get(ID_FIELD);
sourceSplits.add(
new MongoScanSourceSplit(
String.format("%s_%d", namespace, i),
namespace.getDatabaseName(),
namespace.getCollectionName(),
new BsonDocument(ID_FIELD, lowerValue),
new BsonDocument(ID_FIELD, splitKeyValue),
ID_HINT));
lowerValue = splitKeyValue;
}
return sourceSplits;
}