in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/source/reader/split/MongoScanSourceSplitReader.java [83:130]
public RecordsWithSplitIds<BsonDocument> fetch() throws IOException {
if (closed) {
throw new IllegalStateException("Cannot fetch records from a closed split reader");
}
RecordsBySplits.Builder<BsonDocument> builder = new RecordsBySplits.Builder<>();
// Return when no split registered to this reader.
if (currentSplit == null) {
return builder.build();
}
// Return when current read count is over limit.
if (readerContext.isOverLimit()) {
builder.addFinishedSplit(currentSplit.splitId());
currentSplit = null;
finished = true;
return builder.build();
}
currentCursor = getOrCreateCursor();
int fetchSize = readOptions.getFetchSize();
try {
for (int recordNum = 0; recordNum < fetchSize; recordNum++) {
if (currentCursor.hasNext()) {
builder.add(currentSplit, currentCursor.next());
readerContext.getReadCount().incrementAndGet();
if (readerContext.isOverLimit()) {
builder.addFinishedSplit(currentSplit.splitId());
finished = true;
break;
}
} else {
builder.addFinishedSplit(currentSplit.splitId());
finished = true;
break;
}
}
return builder.build();
} catch (MongoException e) {
throw new IOException("Scan records form MongoDB failed", e);
} finally {
if (finished) {
closeCursor();
}
}
}