in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java [136:161]
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> typeInfo =
runtimeProviderContext.createTypeInformation(producedDataType);
final MongoDeserializationSchema<RowData> deserializationSchema =
new MongoRowDataDeserializationSchema(rowType, typeInfo);
MongoSource<RowData> mongoSource =
MongoSource.<RowData>builder()
.setUri(connectionOptions.getUri())
.setDatabase(connectionOptions.getDatabase())
.setCollection(connectionOptions.getCollection())
.setFetchSize(readOptions.getFetchSize())
.setNoCursorTimeout(readOptions.isNoCursorTimeout())
.setPartitionStrategy(readOptions.getPartitionStrategy())
.setPartitionSize(readOptions.getPartitionSize())
.setSamplesPerPartition(readOptions.getSamplesPerPartition())
.setLimit(limit)
.setProjectedFields(DataType.getFieldNames(producedDataType))
.setFilter(filter)
.setDeserializationSchema(deserializationSchema)
.build();
return SourceProvider.of(mongoSource);
}