in src/main/java/com/uber/uberscriptquery/util/SparkUtils.java [70:104]
public static Dataset<Row> readJdbc(String jdbcUrlWithPassword, String sql, SparkSession spark) {
JavaRDD<Tuple2<List<Row>, StructType>> javaRdd1 = JavaSparkContext.fromSparkContext(spark.sparkContext())
.parallelize(Arrays.asList(0))
.map(new Function<Integer, Tuple2<List<Row>, StructType>>() {
@Override
public Tuple2<List<Row>, StructType> call(Integer v1) throws Exception {
Tuple2<List<Row>, StructType> tuple = new ExponentialBackoffRetryPolicy<Tuple2<List<Row>, StructType>>(3, 100)
.attempt(() -> readJdbcAndReturnRowsAndSchema(jdbcUrlWithPassword, sql));
return tuple;
}
});
javaRdd1 = javaRdd1.persist(StorageLevel.DISK_ONLY_2());
StructType structType = javaRdd1.map(new Function<Tuple2<List<Row>, StructType>, StructType>() {
@Override
public StructType call(Tuple2<List<Row>, StructType> v1) throws Exception {
return v1._2();
}
}).collect().get(0);
JavaRDD<Row> javaRdd2 = javaRdd1.flatMap(new FlatMapFunction<Tuple2<List<Row>, StructType>, Row>() {
@Override
public Iterator<Row> call(Tuple2<List<Row>, StructType> listStructTypeTuple2) throws Exception {
return listStructTypeTuple2._1().iterator();
}
});
return spark.createDataFrame(javaRdd2, structType);
//SqlUtils.loadJdbcDriverClass(jdbcUrlWithPassword);
//DriverRegistryWrapper.register(com.mysql.jdbc.Driver.class.getName());
//ExponentialBackoffRetryPolicy<Dataset<Row>> retryPolicy = new ExponentialBackoffRetryPolicy<>(3, 100);
//return retryPolicy.attempt(() -> readJdbcWithoutRetry(jdbcUrlWithPassword, sql, spark));
}