public RecordsWithSplitIds fetch()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java [83:130]


    public RecordsWithSplitIds<BsonDocument> fetch() throws IOException {
        if (closed) {
            throw new IllegalStateException("Cannot fetch records from a closed split reader");
        }

        RecordsBySplits.Builder<BsonDocument> builder = new RecordsBySplits.Builder<>();

        // Return when no split registered to this reader.
        if (currentSplit == null) {
            return builder.build();
        }

        // Return when current read count is over limit.
        if (readerContext.isOverLimit()) {
            builder.addFinishedSplit(currentSplit.splitId());
            currentSplit = null;
            finished = true;
            return builder.build();
        }

        currentCursor = getOrCreateCursor();
        int fetchSize = readOptions.getFetchSize();

        try {
            for (int recordNum = 0; recordNum < fetchSize; recordNum++) {
                if (currentCursor.hasNext()) {
                    builder.add(currentSplit, currentCursor.next());
                    readerContext.getReadCount().incrementAndGet();
                    if (readerContext.isOverLimit()) {
                        builder.addFinishedSplit(currentSplit.splitId());
                        finished = true;
                        break;
                    }
                } else {
                    builder.addFinishedSplit(currentSplit.splitId());
                    finished = true;
                    break;
                }
            }
            return builder.build();
        } catch (MongoException e) {
            throw new IOException("Scan records form MongoDB failed", e);
        } finally {
            if (finished) {
                closeCursor();
            }
        }
    }