public void testBsonOpsWithDocumentConditionsUpdateSuccess()

in phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java [621:1060]


  public void testBsonOpsWithDocumentConditionsUpdateSuccess() throws Exception {
    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
    String tableName = generateUniqueName();
    String cdcName = generateUniqueName();
    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
      String ddl = "CREATE TABLE " + tableName
              + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
              + " CONSTRAINT pk PRIMARY KEY(PK1))";
      String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
      conn.createStatement().execute(ddl);
      conn.createStatement().execute(cdcDdl);
      IndexToolIT.runIndexTool(false, "", tableName,
              "\"" + CDCUtil.getCDCIndexName(cdcName) + "\"");
      Timestamp ts1 = new Timestamp(System.currentTimeMillis());
      Thread.sleep(100);

      String sample1 = getJsonString("json/sample_01.json");
      String sample2 = getJsonString("json/sample_02.json");
      String sample3 = getJsonString("json/sample_03.json");
      BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
      BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
      BsonDocument bsonDocument3 = RawBsonDocument.parse(sample3);

      PreparedStatement stmt =
              conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?)");
      stmt.setString(1, "pk0001");
      stmt.setString(2, "0002");
      stmt.setObject(3, bsonDocument1);
      stmt.executeUpdate();

      stmt.setString(1, "pk1010");
      stmt.setString(2, "1010");
      stmt.setObject(3, bsonDocument2);
      stmt.executeUpdate();

      stmt.setString(1, "pk1011");
      stmt.setString(2, "1011");
      stmt.setObject(3, bsonDocument3);
      stmt.executeUpdate();

      conn.commit();

      Thread.sleep(100);
      Timestamp ts2 = new Timestamp(System.currentTimeMillis());

      testCDCAfterFirstUpsert(conn, cdcName, ts1, ts2, bsonDocument1, bsonDocument2, bsonDocument3);

      ts1 = new Timestamp(System.currentTimeMillis());
      Thread.sleep(100);

      //{
      //  "$and": [
      //    {
      //      "press": {
      //        "$eq": "beat"
      //      }
      //    },
      //    {
      //      "track[0].shot[2][0].city.standard[50]": {
      //        "$eq": "softly"
      //      }
      //    }
      //  ]
      //}
      BsonDocument conditionDoc = new BsonDocument();
      BsonArray andList1 = new BsonArray();
      andList1.add(new BsonDocument()
              .append("press", new BsonDocument()
                      .append("$eq", new BsonString("beat"))));
      andList1.add(new BsonDocument()
              .append("track[0].shot[2][0].city.standard[50]", new BsonDocument()
                      .append("$eq", new BsonString("softly"))));
      conditionDoc.put("$and", andList1);

      String query = "SELECT * FROM " + tableName +
              " WHERE PK1 = 'pk0001' AND C1 = '0002' AND NOT BSON_CONDITION_EXPRESSION(COL, '"
              + conditionDoc.toJson() + "')";
      ResultSet rs = conn.createStatement().executeQuery(query);

      assertTrue(rs.next());
      assertEquals("pk0001", rs.getString(1));
      assertEquals("0002", rs.getString(2));
      BsonDocument document1 = (BsonDocument) rs.getObject(3);
      assertEquals(bsonDocument1, document1);

      assertFalse(rs.next());

      //{
      //  "$and": [
      //    {
      //      "press": {
      //        "$eq": "beat"
      //      }
      //    },
      //    {
      //      "track[0].shot[2][0].city.standard[5]": {
      //        "$eq": "softly"
      //      }
      //    }
      //  ]
      //}
      conditionDoc = new BsonDocument();
      andList1 = new BsonArray();
      andList1.add(new BsonDocument()
              .append("press", new BsonDocument()
                      .append("$eq", new BsonString("beat"))));
      andList1.add(new BsonDocument()
              .append("track[0].shot[2][0].city.standard[5]", new BsonDocument()
                      .append("$eq", new BsonString("softly"))));
      conditionDoc.put("$and", andList1);

      query = "SELECT * FROM " + tableName +
              " WHERE PK1 = 'pk0001' AND C1 = '0002' AND BSON_CONDITION_EXPRESSION(COL, '"
              + conditionDoc.toJson() + "')";
      rs = conn.createStatement().executeQuery(query);

      assertTrue(rs.next());
      assertEquals("pk0001", rs.getString(1));
      assertEquals("0002", rs.getString(2));
      document1 = (BsonDocument) rs.getObject(3);
      assertEquals(bsonDocument1, document1);

      assertFalse(rs.next());

      //{
      //  "$SET": {
      //    "browserling": {
      //      "$binary": {
      //        "base64": "PkHjukNzgcg=",
      //        "subType": "00"
      //      }
      //    },
      //    "track[0].shot[2][0].city.standard[5]": "soft",
      //    "track[0].shot[2][0].city.problem[2]": "track[0].shot[2][0].city.problem[2] + 529.435"
      //  },
      //  "$UNSET": {
      //    "track[0].shot[2][0].city.flame": null
      //  }
      //}
      BsonDocument updateExp = new BsonDocument()
              .append("$SET", new BsonDocument()
                      .append("browserling",
                              new BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095)))
                      .append("track[0].shot[2][0].city.standard[5]", new BsonString("soft"))
                      .append("track[0].shot[2][0].city.problem[2]",
                              new BsonString("track[0].shot[2][0].city.problem[2] + 529.435")))
              .append("$UNSET", new BsonDocument()
                      .append("track[0].shot[2][0].city.flame", new BsonNull()));

      stmt = conn.prepareStatement("UPSERT INTO " + tableName
              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')"
              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END,"
              + " C1 = ?");
      stmt.setString(1, "pk0001");
      stmt.setString(2, "0003");
      stmt.executeUpdate();

      //{
      //  "$ADD": {
      //    "new_samples": {
      //      "$set": [
      //        {
      //          "$binary": {
      //            "base64": "U2FtcGxlMTA=",
      //            "subType": "00"
      //          }
      //        },
      //        {
      //          "$binary": {
      //            "base64": "U2FtcGxlMTI=",
      //            "subType": "00"
      //          }
      //        },
      //        {
      //          "$binary": {
      //            "base64": "U2FtcGxlMTM=",
      //            "subType": "00"
      //          }
      //        },
      //        {
      //          "$binary": {
      //            "base64": "U2FtcGxlMTQ=",
      //            "subType": "00"
      //          }
      //        }
      //      ]
      //    }
      //  },
      //  "$DELETE_FROM_SET": {
      //    "new_samples": {
      //      "$set": [
      //        {
      //          "$binary": {
      //            "base64": "U2FtcGxlMDI=",
      //            "subType": "00"
      //          }
      //        },
      //        {
      //          "$binary": {
      //            "base64": "U2FtcGxlMDM=",
      //            "subType": "00"
      //          }
      //        }
      //      ]
      //    }
      //  },
      //  "$SET": {
      //    "newrecord": {
      //      "speed": "sun",
      //      "shot": [
      //        "fun",
      //        true,
      //        [
      //          {
      //            "character": 413968576,
      //            "earth": "helpful",
      //            "money": false,
      //            "city": {
      //              "softly": "service",
      //              "standard": [
      //                "pour",
      //                false,
      //                true,
      //                true,
      //                true,
      //                "softly",
      //                "happened"
      //              ],
      //              "problem": [
      //                -687102682.7731872,
      //                "tightly",
      //                1527061470.2690287,
      //                {
      //                  "condition": "else",
      //                  "higher": 1462910924.088698,
      //                  "scene": false,
      //                  "safety": 240784722.66658115,
      //                  "catch": false,
      //                  "behavior": true,
      //                  "protection": true
      //                },
      //                "torn",
      //                false,
      //                "eat"
      //              ],
      //              "flame": 1066643931,
      //              "rest": -1053428849,
      //              "additional": -442539394.7937908,
      //              "brought": "rock"
      //            },
      //            "upward": -1306729583.8727202,
      //            "sky": "act",
      //            "height": true
      //          },
      //          -2042805074.4290242,
      //          "settlers",
      //          1455555511.4875226,
      //          -1448763321,
      //          false,
      //          379701747
      //        ],
      //        false,
      //        1241794365,
      //        "capital",
      //        false
      //      ],
      //      "hidden": false,
      //      "truth": "south",
      //      "other": true,
      //      "disease": "disease"
      //    }
      //  },
      //  "$UNSET": {
      //    "rather[3].outline.halfway.so[2][2]": null
      //  }
      //}
      updateExp = new BsonDocument()
              .append("$ADD", new BsonDocument()
                      .append("new_samples",
                              new BsonDocument().append("$set",
                                      new BsonArray(Arrays.asList(
                                              new BsonBinary(Bytes.toBytes("Sample10")),
                                              new BsonBinary(Bytes.toBytes("Sample12")),
                                              new BsonBinary(Bytes.toBytes("Sample13")),
                                              new BsonBinary(Bytes.toBytes("Sample14"))
                                      )))))
              .append("$DELETE_FROM_SET", new BsonDocument()
                      .append("new_samples",
                              new BsonDocument().append("$set",
                                      new BsonArray(Arrays.asList(
                                              new BsonBinary(Bytes.toBytes("Sample02")),
                                              new BsonBinary(Bytes.toBytes("Sample03"))
                                      )))))
              .append("$SET", new BsonDocument()
                      .append("newrecord", ((BsonArray) (document1.get("track"))).get(0)))
              .append("$UNSET", new BsonDocument()
                      .append("rather[3].outline.halfway.so[2][2]", new BsonNull()));

      //{
      //  "$and": [
      //    {
      //      "press": {
      //        "$exists": false
      //      }
      //    },
      //    {
      //      "rather[3].outline.halfway.so[2][2]": {
      //        "$exists": true
      //      }
      //    }
      //  ]
      //}
      conditionDoc = new BsonDocument();
      andList1 = new BsonArray();
      andList1.add(new BsonDocument()
              .append("press", new BsonDocument()
                      .append("$exists", new BsonBoolean(false))));
      andList1.add(new BsonDocument()
              .append("rather[3].outline.halfway.so[2][2]", new BsonDocument()
                      .append("$exists", new BsonBoolean(true))));
      conditionDoc.put("$and", andList1);

      stmt = conn.prepareStatement("UPSERT INTO " + tableName
              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')"
              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END");

      stmt.setString(1, "pk1010");
      stmt.executeUpdate();

      //{
      //  "$SET": {
      //    "result[1].location.state": "AK"
      //  },
      //  "$UNSET": {
      //    "result[4].emails[1]": null
      //  }
      //}
      updateExp = new BsonDocument()
              .append("$SET", new BsonDocument()
                      .append("result[1].location.state", new BsonString("AK")))
              .append("$UNSET", new BsonDocument()
                      .append("result[4].emails[1]", new BsonNull()));

      //{
      //  "$or": [
      //    {
      //      "result[2].location.coordinates.latitude": {
      //        "$gt": 0
      //      }
      //    },
      //    {
      //      "$and": [
      //        {
      //          "result[1].location": {
      //            "$exists": true
      //          }
      //        },
      //        {
      //          "result[1].location.state": {
      //            "$ne": "AK"
      //          }
      //        },
      //        {
      //          "result[4].emails[1]": {
      //            "$exists": true
      //          }
      //        }
      //      ]
      //    }
      //  ]
      //}
      conditionDoc = new BsonDocument();
      BsonArray orList1 = new BsonArray();
      andList1 = new BsonArray();
      BsonDocument andDoc1 = new BsonDocument();
      andList1.add(new BsonDocument()
              .append("result[1].location", new BsonDocument()
                      .append("$exists", new BsonBoolean(true))));
      andList1.add(new BsonDocument()
              .append("result[1].location.state", new BsonDocument()
                      .append("$ne", new BsonString("AK"))));
      andList1.add(new BsonDocument()
              .append("result[4].emails[1]", new BsonDocument()
                      .append("$exists", new BsonBoolean(true))));
      andDoc1.put("$and", andList1);

      orList1.add(new BsonDocument()
              .append("result[2].location.coordinates.latitude",
                      new BsonDocument("$gt", new BsonDouble(0))));
      orList1.add(andDoc1);

      conditionDoc.put("$or", orList1);

      stmt = conn.prepareStatement("UPSERT INTO " + tableName
              + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
              + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')"
              + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END");

      stmt.setString(1, "pk1011");
      stmt.executeUpdate();

      conn.commit();

      Thread.sleep(100);
      ts2 = new Timestamp(System.currentTimeMillis());

      testCDCPostUpdate(conn, cdcName, ts1, ts2, bsonDocument1, bsonDocument2, bsonDocument3);

      query = "SELECT * FROM " + tableName;
      rs = conn.createStatement().executeQuery(query);

      assertTrue(rs.next());
      assertEquals("pk0001", rs.getString(1));
      assertEquals("0003", rs.getString(2));
      document1 = (BsonDocument) rs.getObject(3);

      String updatedJson = getJsonString("json/sample_updated_01.json");
      assertEquals(RawBsonDocument.parse(updatedJson), document1);

      assertTrue(rs.next());
      assertEquals("pk1010", rs.getString(1));
      assertEquals("1010", rs.getString(2));
      BsonDocument document2 = (BsonDocument) rs.getObject(3);

      updatedJson = getJsonString("json/sample_updated_02.json");
      assertEquals(RawBsonDocument.parse(updatedJson), document2);

      assertTrue(rs.next());
      assertEquals("pk1011", rs.getString(1));
      assertEquals("1011", rs.getString(2));
      BsonDocument document3 = (BsonDocument) rs.getObject(3);

      updatedJson = getJsonString("json/sample_updated_03.json");
      assertEquals(RawBsonDocument.parse(updatedJson), document3);

      assertFalse(rs.next());
    }
  }