in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java [164:201]
private MongoCursor<BsonDocument> getOrCreateCursor() {
if (currentCursor == null) {
LOG.debug("Opened cursor for partitionId: {}", currentSplit);
mongoClient = MongoClients.create(connectionOptions.getUri());
// Using MongoDB's cursor.min() and cursor.max() to limit an index bound.
// When the index range is the primary key, the bound is (min <= _id < max).
// Compound indexes and hash indexes bounds can also be supported in this way.
// Please refer to https://www.mongodb.com/docs/manual/reference/method/cursor.min/
FindIterable<BsonDocument> findIterable =
mongoClient
.getDatabase(connectionOptions.getDatabase())
.getCollection(connectionOptions.getCollection(), BsonDocument.class)
.find()
.min(currentSplit.getMin())
.max(currentSplit.getMax())
.hint(currentSplit.getHint())
.noCursorTimeout(readOptions.isNoCursorTimeout());
// Current split was partially read and recovered from checkpoint
if (currentSplit.getOffset() > 0) {
findIterable.skip(currentSplit.getOffset());
}
// Push limit down
if (readerContext.isLimitPushedDown()) {
findIterable.limit(readerContext.getLimit());
}
// Push projection down
if (!CollectionUtil.isNullOrEmpty(projectedFields)) {
findIterable.projection(project(projectedFields));
}
currentCursor = findIterable.cursor();
}
return currentCursor;
}