in phoenix5-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java [410:518]
public void testColumnFamily() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
String jdbcUrl = getUrl();
try (Connection conn = DriverManager.getConnection(jdbcUrl, props)) {
conn.setAutoCommit(false);
String tableName = generateUniqueName();
String ddl = "CREATE TABLE " + tableName +
" (a_string varchar not null, cf1.a integer, cf1.b varchar, col1 integer, cf2.c varchar, cf2.d integer, col2 integer" +
" CONSTRAINT pk PRIMARY KEY (a_string))\n";
createTestTable(jdbcUrl, ddl);
String dml = "UPSERT INTO " + tableName + " VALUES(?,?,?,?,?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
stmt.setString(1, "a");
stmt.setInt(2, 40);
stmt.setString(3, "aa");
stmt.setInt(4, 10);
stmt.setString(5, "bb");
stmt.setInt(6, 20);
stmt.setInt(7, 1);
stmt.execute();
stmt.setString(1, "c");
stmt.setInt(2, 30);
stmt.setString(3, "cc");
stmt.setInt(4, 50);
stmt.setString(5, "dd");
stmt.setInt(6, 60);
stmt.setInt(7, 3);
stmt.execute();
stmt.setString(1, "b");
stmt.setInt(2, 40);
stmt.setString(3, "bb");
stmt.setInt(4, 5);
stmt.setString(5, "aa");
stmt.setInt(6, 80);
stmt.setInt(7, 2);
stmt.execute();
conn.commit();
SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
.option(DataSourceOptions.TABLE_KEY, tableName)
.option(PhoenixDataSource.JDBC_URL, jdbcUrl).load();
phoenixDataSet.createOrReplaceTempView(tableName);
Dataset<Row> dataset =
sqlContext.sql("SELECT A_STRING, `CF1.A`, `CF1.B`, COL1, `CF2.C`, `CF2.D`, COL2 from "
+ tableName + " ORDER BY `CF1.A`,`CF2.C`");
List<Row> rows = dataset.collectAsList();
ResultSet rs = new SparkResultSet(rows, dataset.columns());
assertTrue(rs.next());
assertEquals("c",rs.getString(1));
assertEquals(30,rs.getInt(2));
assertEquals("cc",rs.getString(3));
assertEquals(50,rs.getInt(4));
assertEquals("dd",rs.getString(5));
assertEquals(60,rs.getInt(6));
assertEquals(3,rs.getInt(7));
assertTrue(rs.next());
assertEquals("b",rs.getString(1));
assertEquals(40,rs.getInt(2));
assertEquals("bb",rs.getString(3));
assertEquals(5,rs.getInt(4));
assertEquals("aa",rs.getString(5));
assertEquals(80,rs.getInt(6));
assertEquals(2,rs.getInt(7));
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(40,rs.getInt(2));
assertEquals("aa",rs.getString(3));
assertEquals(10,rs.getInt(4));
assertEquals("bb",rs.getString(5));
assertEquals(20,rs.getInt(6));
assertEquals(1,rs.getInt(7));
assertFalse(rs.next());
dataset =
sqlContext.sql("SELECT A_STRING, `CF1.A`, `CF1.B`, COL1, `CF2.C`, `CF2.D`, COL2 from "
+ tableName + " ORDER BY COL2");
rows = dataset.collectAsList();
rs = new SparkResultSet(rows, dataset.columns());
assertTrue(rs.next());
assertEquals("a",rs.getString(1));
assertEquals(40,rs.getInt(2));
assertEquals("aa",rs.getString(3));
assertEquals(10,rs.getInt(4));
assertEquals("bb",rs.getString(5));
assertEquals(20,rs.getInt(6));
assertEquals(1,rs.getInt(7));
assertTrue(rs.next());
assertEquals("b",rs.getString(1));
assertEquals(40,rs.getInt(2));
assertEquals("bb",rs.getString(3));
assertEquals(5,rs.getInt(4));
assertEquals("aa",rs.getString(5));
assertEquals(80,rs.getInt(6));
assertEquals(2,rs.getInt(7));
assertTrue(rs.next());
assertEquals("c",rs.getString(1));
assertEquals(30,rs.getInt(2));
assertEquals("cc",rs.getString(3));
assertEquals(50,rs.getInt(4));
assertEquals("dd",rs.getString(5));
assertEquals(60,rs.getInt(6));
assertEquals(3,rs.getInt(7));
assertFalse(rs.next());
}
}