in phoenix5-spark3/src/it/java/org/apache/phoenix/spark/OrderByIT.java [285:348]
public void testCombinationOfOrAndFilters() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(false);
String tableName1 = generateUniqueName();
String ddl = "CREATE TABLE " + tableName1 +
" (ENTITY_INSTANCE_ID BIGINT NOT NULL, MEMBER_ID VARCHAR(50) NOT NULL, CASE_ID VARCHAR(250) NOT NULL," +
" CANCELLATION_FLAG VARCHAR(1), CASE_MATCH_TYPE CHAR(1), CONSTRAINT PK1 PRIMARY KEY(ENTITY_INSTANCE_ID, " +
"MEMBER_ID, CASE_ID))\n";
createTestTable(getUrl(), ddl);
SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
.option(PhoenixDataSource.TABLE, tableName1)
.option(PhoenixDataSource.JDBC_URL, getUrl()).load();
phoenixDataSet.createOrReplaceTempView(tableName1);
String dml = "UPSERT INTO " + tableName1 + " VALUES(?,?,?,?,?)";
PreparedStatement stmt = conn.prepareStatement(dml);
stmt.setInt(1, 40);
stmt.setString(2, "a");
stmt.setString(3, "b");
stmt.setString(4, "Y");
stmt.setString(5, "M");
stmt.execute();
stmt.setInt(1, 40);
stmt.setString(2, "c");
stmt.setString(3, "d");
stmt.setNull(4, Types.VARCHAR);
stmt.setString(5, "M");
stmt.execute();
stmt.setInt(1, 40);
stmt.setString(2, "e");
stmt.setString(3, "f");
stmt.setString(4, "N");
stmt.setString(5, "M");
stmt.execute();
stmt.setInt(1, 41);
stmt.setString(2, "f");
stmt.setString(3, "g");
stmt.setString(4, "N");
stmt.setString(5, "C");
stmt.execute();
conn.commit();
String query =
"select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and " +
"(CANCELLATION_FLAG <> 'Y' OR CANCELLATION_FLAG IS NULL ) and CASE_MATCH_TYPE='M'";
Dataset<Row> dataset =
sqlContext.sql(query);
List<Row> rows = dataset.collectAsList();
ResultSet rs = new SparkResultSet(rows, dataset.columns());
assertTrue(rs.next());
assertEquals(2, rs.getLong(1));
assertFalse(rs.next());
query =
"select count(*) from " + tableName1 + " where ENTITY_INSTANCE_ID = 40 and CASE_MATCH_TYPE='M' " +
" OR CANCELLATION_FLAG <> 'Y'";
dataset =
sqlContext.sql(query);
rows = dataset.collectAsList();
rs = new SparkResultSet(rows, dataset.columns());
assertTrue(rs.next());
assertEquals(4, rs.getLong(1));
assertFalse(rs.next());
}
}