private MongoCursor getOrCreateCursor()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java [164:201]


    private MongoCursor<BsonDocument> getOrCreateCursor() {
        if (currentCursor == null) {
            LOG.debug("Opened cursor for partitionId: {}", currentSplit);
            mongoClient = MongoClients.create(connectionOptions.getUri());

            // Using MongoDB's cursor.min() and cursor.max() to limit an index bound.
            // When the index range is the primary key, the bound is (min <= _id < max).
            // Compound indexes and hash indexes bounds can also be supported in this way.
            // Please refer to https://www.mongodb.com/docs/manual/reference/method/cursor.min/
            FindIterable<BsonDocument> findIterable =
                    mongoClient
                            .getDatabase(connectionOptions.getDatabase())
                            .getCollection(connectionOptions.getCollection(), BsonDocument.class)
                            .find()
                            .min(currentSplit.getMin())
                            .max(currentSplit.getMax())
                            .hint(currentSplit.getHint())
                            .noCursorTimeout(readOptions.isNoCursorTimeout());

            // Current split was partially read and recovered from checkpoint
            if (currentSplit.getOffset() > 0) {
                findIterable.skip(currentSplit.getOffset());
            }

            // Push limit down
            if (readerContext.isLimitPushedDown()) {
                findIterable.limit(readerContext.getLimit());
            }

            // Push projection down
            if (!CollectionUtil.isNullOrEmpty(projectedFields)) {
                findIterable.projection(project(projectedFields));
            }

            currentCursor = findIterable.cursor();
        }
        return currentCursor;
    }