public void testBug2894()

in phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java [304:502]


    public void testBug2894() throws Exception {
        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
        Connection conn = DriverManager.getConnection(getUrl(), props);
        conn.setAutoCommit(true);
        String eventCountTableName = generateUniqueName();
        try {
            conn.createStatement().execute(
                    "CREATE TABLE IF NOT EXISTS " + eventCountTableName + " (\n" +
                    "        BUCKET VARCHAR,\n" +
                    "        TIMESTAMP_DATE TIMESTAMP,\n" +
                    "        \"TIMESTAMP\" UNSIGNED_LONG NOT NULL,\n" +
                    "        LOCATION VARCHAR,\n" +
                    "        A VARCHAR,\n" +
                    "        B VARCHAR,\n" +
                    "        C VARCHAR,\n" +
                    "        D UNSIGNED_LONG,\n" +
                    "        E FLOAT\n" +
                    "    CONSTRAINT pk PRIMARY KEY (BUCKET, \"TIMESTAMP\" DESC, LOCATION, A, B, C)\n" +
                    ") SALT_BUCKETS=2, COMPRESSION='GZ', TTL=31622400");
            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + eventCountTableName + "(BUCKET, \"TIMESTAMP\", LOCATION, A, B, C) VALUES(?,?,?,?,?,?)");
            stmt.setString(1, "5SEC");
            stmt.setString(3, "Tr/Bal");
            stmt.setString(4, "A1");
            stmt.setString(5, "B1");
            stmt.setString(6, "C1");
            stmt.setLong(2, 1462993520000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993515000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993510000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993505000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993500000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993495000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993490000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993485000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993480000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993475000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993470000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993465000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993460000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993455000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993450000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993445000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993440000000000L);
            stmt.execute();
            stmt.setLong(2, 1462993430000000000L);
            stmt.execute();

            // We'll test the original version of the user table as well as a slightly modified
            // version, in order to verify that sort-merge join works for columns both having
            // DESC sort order as well as one having ASC order and the other having DESC order.
            String[] t = new String[] {"EVENT_LATENCY" + generateUniqueName(), "EVENT_LATENCY_2" + generateUniqueName()};
            for (int i = 0; i < 2; i++) {
                conn.createStatement().execute(
                        "CREATE TABLE IF NOT EXISTS " + t[i] + " (\n" +
                                "        BUCKET VARCHAR,\n" +
                                "        TIMESTAMP_DATE TIMESTAMP,\n" +
                                "        \"TIMESTAMP\" UNSIGNED_LONG NOT NULL,\n" +
                                "        SRC_LOCATION VARCHAR,\n" +
                                "        DST_LOCATION VARCHAR,\n" +
                                "        B VARCHAR,\n" +
                                "        C VARCHAR,\n" +
                                "        F UNSIGNED_LONG,\n" +
                                "        G UNSIGNED_LONG,\n" +
                                "        H UNSIGNED_LONG,\n" +
                                "        I UNSIGNED_LONG\n" +
                                "    CONSTRAINT pk PRIMARY KEY (BUCKET, \"TIMESTAMP\"" + (i == 0 ? " DESC" : "") + ", SRC_LOCATION, DST_LOCATION, B, C)\n" +
                        ") SALT_BUCKETS=2, COMPRESSION='GZ', TTL=31622400");
                stmt = conn.prepareStatement("UPSERT INTO " + t[i] + "(BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION, B, C) VALUES(?,?,?,?,?,?)");
                stmt.setString(1, "5SEC");
                stmt.setString(3, "Tr/Bal");
                stmt.setString(4, "Tr/Bal");
                stmt.setString(5, "B1");
                stmt.setString(6, "C1");
                stmt.setLong(2, 1462993520000000000L);
                stmt.execute();
                stmt.setLong(2, 1462993515000000000L);
                stmt.execute();
                stmt.setLong(2, 1462993510000000000L);
                stmt.execute();
                stmt.setLong(2, 1462993505000000000L);
                stmt.execute();
                stmt.setLong(2, 1462993490000000000L);
                stmt.execute();
                stmt.setLong(2, 1462993485000000000L);
                stmt.execute();
                stmt.setLong(2, 1462993480000000000L);
                stmt.execute();
                stmt.setLong(2, 1462993475000000000L);
                stmt.execute();
                stmt.setLong(2, 1462993470000000000L);
                stmt.execute();
                stmt.setLong(2, 1462993430000000000L);
                stmt.execute();
                //add order by to make the query result stable
                String q =
                        "SELECT C.BUCKET, C.TIMESTAMP FROM (\n" +
                        "     SELECT E.BUCKET as BUCKET, L.BUCKET as LBUCKET, E.TIMESTAMP as TIMESTAMP, L.TIMESTAMP as LTIMESTAMP FROM\n" +
                        "        (SELECT BUCKET, TIMESTAMP FROM " + eventCountTableName + "\n" +
                        "             WHERE BUCKET = '5SEC' AND LOCATION = 'Tr/Bal'\n" +
                        "                 AND TIMESTAMP <= 1462993520000000000 AND TIMESTAMP > 1462993420000000000\n" +
                        "             GROUP BY BUCKET, TIMESTAMP, LOCATION\n" +
                        "        ) E\n" +
                        "        JOIN\n" +
                        "         (SELECT BUCKET, \"TIMESTAMP\" FROM "+ t[i] +"\n" +
                        "             WHERE BUCKET = '5SEC' AND SRC_LOCATION = 'Tr/Bal' AND SRC_LOCATION = DST_LOCATION\n" +
                        "                 AND \"TIMESTAMP\" <= 1462993520000000000 AND \"TIMESTAMP\" > 1462993420000000000\n" +
                        "             GROUP BY BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION\n" +
                        "         ) L\n" +
                        "     ON L.BUCKET = E.BUCKET AND L.TIMESTAMP = E.TIMESTAMP\n" +
                        " ) C\n" +
                        " GROUP BY C.BUCKET, C.TIMESTAMP ORDER BY C.BUCKET, C.TIMESTAMP";
                
                String p = i == 0 ?
                        "SORT-MERGE-JOIN (INNER) TABLES\n" +
                        "    CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER " + eventCountTableName + " [X'00','5SEC',~1462993520000000000,'Tr/Bal'] - [X'01','5SEC',~1462993420000000000,'Tr/Bal']\n" +
                        "        SERVER FILTER BY FIRST KEY ONLY\n" +
                        "        SERVER DISTINCT PREFIX FILTER OVER [BUCKET, TIMESTAMP, LOCATION]\n" +
                        "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [BUCKET, TIMESTAMP, LOCATION]\n" +
                        "    CLIENT MERGE SORT\n" +
                        "    CLIENT SORTED BY [BUCKET, TIMESTAMP]\n" +
                        "AND (SKIP MERGE)\n" +
                        "    CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER " + t[i] + " [X'00','5SEC',~1462993520000000000,'Tr/Bal'] - [X'01','5SEC',~1462993420000000000,'Tr/Bal']\n" +
                        "        SERVER FILTER BY FIRST KEY ONLY AND SRC_LOCATION = DST_LOCATION\n" +
                        "        SERVER DISTINCT PREFIX FILTER OVER [BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION]\n" +
                        "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION]\n" +
                        "    CLIENT MERGE SORT\n" +
                        "    CLIENT SORTED BY [BUCKET, \"TIMESTAMP\"]\n" +
                        "CLIENT AGGREGATE INTO ORDERED DISTINCT ROWS BY [E.BUCKET, E.TIMESTAMP]"
                        :
                        "SORT-MERGE-JOIN (INNER) TABLES\n" +
                        "    CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER " + eventCountTableName + " [X'00','5SEC',~1462993520000000000,'Tr/Bal'] - [X'01','5SEC',~1462993420000000000,'Tr/Bal']\n" +
                        "        SERVER FILTER BY FIRST KEY ONLY\n" +
                        "        SERVER DISTINCT PREFIX FILTER OVER [BUCKET, TIMESTAMP, LOCATION]\n" +
                        "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [BUCKET, TIMESTAMP, LOCATION]\n" +
                        "    CLIENT MERGE SORT\n" +
                        "    CLIENT SORTED BY [BUCKET, TIMESTAMP]\n" +
                        "AND (SKIP MERGE)\n" +
                        "    CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER " + t[i] + " [X'00','5SEC',1462993420000000001,'Tr/Bal'] - [X'01','5SEC',1462993520000000000,'Tr/Bal']\n" +
                        "        SERVER FILTER BY FIRST KEY ONLY AND SRC_LOCATION = DST_LOCATION\n" +
                        "        SERVER DISTINCT PREFIX FILTER OVER [BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION]\n" +
                        "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION]\n" +
                        "    CLIENT MERGE SORT\n" +
                        "CLIENT AGGREGATE INTO ORDERED DISTINCT ROWS BY [E.BUCKET, E.TIMESTAMP]";
                
                ResultSet rs = conn.createStatement().executeQuery("explain " + q);
                assertEquals(p, QueryUtil.getExplainPlan(rs));
                
                rs = conn.createStatement().executeQuery(q);
                assertTrue(rs.next());
                assertEquals("5SEC", rs.getString(1));
                assertEquals(1462993430000000000L, rs.getLong(2));
                assertTrue(rs.next());
                assertEquals("5SEC", rs.getString(1));
                assertEquals(1462993470000000000L, rs.getLong(2));
                assertTrue(rs.next());
                assertEquals("5SEC", rs.getString(1));
                assertEquals(1462993475000000000L, rs.getLong(2));
                assertTrue(rs.next());
                assertEquals("5SEC", rs.getString(1));
                assertEquals(1462993480000000000L, rs.getLong(2));
                assertTrue(rs.next());
                assertEquals("5SEC", rs.getString(1));
                assertEquals(1462993485000000000L, rs.getLong(2));
                assertTrue(rs.next());
                assertEquals("5SEC", rs.getString(1));
                assertEquals(1462993490000000000L, rs.getLong(2));
                assertTrue(rs.next());
                assertEquals("5SEC", rs.getString(1));
                assertEquals(1462993505000000000L, rs.getLong(2));
                assertTrue(rs.next());
                assertEquals("5SEC", rs.getString(1));
                assertEquals(1462993510000000000L, rs.getLong(2));
                assertTrue(rs.next());
                assertEquals("5SEC", rs.getString(1));
                assertEquals(1462993515000000000L, rs.getLong(2));
                assertTrue(rs.next());
                assertEquals("5SEC", rs.getString(1));
                assertEquals(1462993520000000000L, rs.getLong(2));
                assertFalse(rs.next());
            }
        } finally {
            conn.close();
        }
    }