in phoenix5-spark3/src/it/java/org/apache/phoenix/spark/DataSourceApiIT.java [75:172]
public void basicWriteAndReadBackTest() throws SQLException {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(jsc);
String tableName = generateUniqueName();
try (Connection conn = DriverManager.getConnection(getUrl());
Statement stmt = conn.createStatement()) {
stmt.executeUpdate(
"CREATE TABLE " + tableName + " (id INTEGER PRIMARY KEY, v1 VARCHAR)");
}
try (SparkSession spark = sqlContext.sparkSession()) {
StructType schema =
new StructType(new StructField[] {
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("v1", DataTypes.StringType, false, Metadata.empty()) });
// Use old zkUrl
Dataset<Row> df1 =
spark.createDataFrame(
Arrays.asList(RowFactory.create(1, "x")),
schema);
df1.write().format("phoenix").mode(SaveMode.Append)
.option(PhoenixDataSource.TABLE, tableName)
.option(ZOOKEEPER_URL, getUrl())
.save();
// Use jdbcUrl
// In Phoenix 5.2+ getUrl() return a JDBC URL, in earlier versions it returns a ZK
// quorum
String jdbcUrl = getUrl();
if (!jdbcUrl.startsWith(JDBC_PROTOCOL)) {
jdbcUrl = JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + jdbcUrl;
}
Dataset<Row> df2 =
spark.createDataFrame(
Arrays.asList(RowFactory.create(2, "x")),
schema);
df2.write().format("phoenix").mode(SaveMode.Append)
.option(PhoenixDataSource.TABLE, tableName)
.option(JDBC_URL, jdbcUrl)
.save();
// Use default from hbase-site.xml
Dataset<Row> df3 =
spark.createDataFrame(
Arrays.asList(RowFactory.create(3, "x")),
schema);
df3.write().format("phoenix").mode(SaveMode.Append)
.option(PhoenixDataSource.TABLE, tableName)
.save();
try (Connection conn = DriverManager.getConnection(getUrl());
Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("x", rs.getString(2));
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertEquals("x", rs.getString(2));
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
assertEquals("x", rs.getString(2));
assertFalse(rs.next());
}
Dataset df1Read = spark.read().format("phoenix")
.option(PhoenixDataSource.TABLE, tableName)
.option(PhoenixDataSource.JDBC_URL, getUrl()).load();
assertEquals(3l, df1Read.count());
// Use jdbcUrl
Dataset df2Read = spark.read().format("phoenix")
.option(PhoenixDataSource.TABLE, tableName)
.option(PhoenixDataSource.JDBC_URL, jdbcUrl)
.load();
assertEquals(3l, df2Read.count());
// Use default
Dataset df3Read = spark.read().format("phoenix")
.option(PhoenixDataSource.TABLE, tableName)
.load();
assertEquals(3l, df3Read.count());
} finally {
jsc.stop();
}
}