public static Dataset readJdbc()

in src/main/java/com/uber/uberscriptquery/util/SparkUtils.java [70:104]


    public static Dataset<Row> readJdbc(String jdbcUrlWithPassword, String sql, SparkSession spark) {
        JavaRDD<Tuple2<List<Row>, StructType>> javaRdd1 = JavaSparkContext.fromSparkContext(spark.sparkContext())
                .parallelize(Arrays.asList(0))
                .map(new Function<Integer, Tuple2<List<Row>, StructType>>() {
                    @Override
                    public Tuple2<List<Row>, StructType> call(Integer v1) throws Exception {
                        Tuple2<List<Row>, StructType> tuple = new ExponentialBackoffRetryPolicy<Tuple2<List<Row>, StructType>>(3, 100)
                                .attempt(() -> readJdbcAndReturnRowsAndSchema(jdbcUrlWithPassword, sql));
                        return tuple;
                    }
                });

        javaRdd1 = javaRdd1.persist(StorageLevel.DISK_ONLY_2());

        StructType structType = javaRdd1.map(new Function<Tuple2<List<Row>, StructType>, StructType>() {
            @Override
            public StructType call(Tuple2<List<Row>, StructType> v1) throws Exception {
                return v1._2();
            }
        }).collect().get(0);

        JavaRDD<Row> javaRdd2 = javaRdd1.flatMap(new FlatMapFunction<Tuple2<List<Row>, StructType>, Row>() {
            @Override
            public Iterator<Row> call(Tuple2<List<Row>, StructType> listStructTypeTuple2) throws Exception {
                return listStructTypeTuple2._1().iterator();
            }
        });

        return spark.createDataFrame(javaRdd2, structType);

        //SqlUtils.loadJdbcDriverClass(jdbcUrlWithPassword);
        //DriverRegistryWrapper.register(com.mysql.jdbc.Driver.class.getName());
        //ExponentialBackoffRetryPolicy<Dataset<Row>> retryPolicy = new ExponentialBackoffRetryPolicy<>(3, 100);
        //return retryPolicy.attempt(() -> readJdbcWithoutRetry(jdbcUrlWithPassword, sql, spark));
    }