public void basicWriteAndReadBackTest()

in phoenix5-spark3/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java [75:172]


    public void basicWriteAndReadBackTest() throws SQLException {
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        SQLContext sqlContext = new SQLContext(jsc);
        String tableName = generateUniqueName();

        try (Connection conn = DriverManager.getConnection(getUrl());
                Statement stmt = conn.createStatement()) {
            stmt.executeUpdate(
                "CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 VARCHAR)");
        }

        try (SparkSession spark = sqlContext.sparkSession()) {

            StructType schema =
                    new StructType(new StructField[] {
                            new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
                            new StructField("v1", DataTypes.StringType, false, Metadata.empty()) });

            // Use old zkUrl
            Dataset<Row> df1 =
                    spark.createDataFrame(
                        Arrays.asList(RowFactory.create(1, "x")),
                        schema);

            df1.write().format("phoenix").mode(SaveMode.Append)
            .option(PhoenixDataSource.TABLE, tableName)
            .option(ZOOKEEPER_URL, getUrl())
            .save();

            // Use jdbcUrl
            // In Phoenix 5.2+ getUrl() return a JDBC URL, in earlier versions it returns a ZK
            // quorum
            String jdbcUrl = getUrl();
            if (!jdbcUrl.startsWith(JDBC_PROTOCOL)) {
                jdbcUrl = JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + jdbcUrl;
            }

            Dataset<Row> df2 =
                    spark.createDataFrame(
                        Arrays.asList(RowFactory.create(2, "x")),
                        schema);

            df2.write().format("phoenix").mode(SaveMode.Append)
                .option(PhoenixDataSource.TABLE, tableName)
                .option(JDBC_URL, jdbcUrl)
                .save();

            // Use default from hbase-site.xml
            Dataset<Row> df3 =
                    spark.createDataFrame(
                        Arrays.asList(RowFactory.create(3, "x")),
                        schema);

            df3.write().format("phoenix").mode(SaveMode.Append)
                .option(PhoenixDataSource.TABLE, tableName)
                .save();

            try (Connection conn = DriverManager.getConnection(getUrl());
                    Statement stmt = conn.createStatement()) {
                ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
                assertTrue(rs.next());
                assertEquals(1, rs.getInt(1));
                assertEquals("x", rs.getString(2));
                assertTrue(rs.next());
                assertEquals(2, rs.getInt(1));
                assertEquals("x", rs.getString(2));
                assertTrue(rs.next());
                assertEquals(3, rs.getInt(1));
                assertEquals("x", rs.getString(2));
                assertFalse(rs.next());
            }

            Dataset df1Read = spark.read().format("phoenix")
                    .option(PhoenixDataSource.TABLE, tableName)
                    .option(PhoenixDataSource.JDBC_URL, getUrl()).load();

            assertEquals(3l, df1Read.count());

            // Use jdbcUrl
            Dataset df2Read = spark.read().format("phoenix")
                    .option(PhoenixDataSource.TABLE, tableName)
                    .option(PhoenixDataSource.JDBC_URL, jdbcUrl)
                    .load();

            assertEquals(3l, df2Read.count());

            // Use default
            Dataset df3Read = spark.read().format("phoenix")
                    .option(PhoenixDataSource.TABLE, tableName)
                    .load();

            assertEquals(3l, df3Read.count());

        } finally {
            jsc.stop();
        }
    }