in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/enumerator/splitter/MongoShardedSplitter.java [63:114]
public static Collection<MongoScanSourceSplit> split(MongoSplitContext splitContext) {
MongoNamespace namespace = splitContext.getMongoNamespace();
MongoClient mongoClient = splitContext.getMongoClient();
List<BsonDocument> chunks;
Optional<BsonDocument> collectionMetadata;
try {
collectionMetadata = readCollectionMetadata(mongoClient, namespace);
if (!collectionMetadata.isPresent()) {
LOG.error(
"Do sharded split failed, collection {} does not appear to be sharded.",
namespace);
throw new FlinkRuntimeException(
String.format(
"Do sharded split failed, %s is not a sharded collection.",
namespace));
}
if (isShardedCollectionDropped(collectionMetadata.get())) {
LOG.error("Do sharded split failed, collection {} was dropped.", namespace);
throw new FlinkRuntimeException(
String.format("Do sharded split failed, %s was dropped.", namespace));
}
chunks = readChunks(mongoClient, collectionMetadata.get());
if (chunks.isEmpty()) {
LOG.error("Do sharded split failed, chunks of {} is empty.", namespace);
throw new FlinkRuntimeException(
String.format(
"Do sharded split failed, chunks of %s is empty.", namespace));
}
} catch (MongoException e) {
LOG.error(
"Read chunks from {} failed with error message: {}", namespace, e.getMessage());
throw new FlinkRuntimeException(e);
}
List<MongoScanSourceSplit> sourceSplits = new ArrayList<>(chunks.size());
for (int i = 0; i < chunks.size(); i++) {
BsonDocument chunk = chunks.get(i);
sourceSplits.add(
new MongoScanSourceSplit(
String.format("%s_%d", namespace, i),
namespace.getDatabaseName(),
namespace.getCollectionName(),
chunk.getDocument(MIN_FIELD),
chunk.getDocument(MAX_FIELD),
collectionMetadata.get().getDocument(KEY_FIELD)));
}
return sourceSplits;
}