in phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java [621:1060]
public void testBsonOpsWithDocumentConditionsUpdateSuccess() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
String tableName = generateUniqueName();
String cdcName = generateUniqueName();
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String ddl = "CREATE TABLE " + tableName
+ " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
+ " CONSTRAINT pk PRIMARY KEY(PK1))";
String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
conn.createStatement().execute(ddl);
conn.createStatement().execute(cdcDdl);
IndexToolIT.runIndexTool(false, "", tableName,
"\"" + CDCUtil.getCDCIndexName(cdcName) + "\"");
Timestamp ts1 = new Timestamp(System.currentTimeMillis());
Thread.sleep(100);
String sample1 = getJsonString("json/sample_01.json");
String sample2 = getJsonString("json/sample_02.json");
String sample3 = getJsonString("json/sample_03.json");
BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
BsonDocument bsonDocument3 = RawBsonDocument.parse(sample3);
PreparedStatement stmt =
conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?)");
stmt.setString(1, "pk0001");
stmt.setString(2, "0002");
stmt.setObject(3, bsonDocument1);
stmt.executeUpdate();
stmt.setString(1, "pk1010");
stmt.setString(2, "1010");
stmt.setObject(3, bsonDocument2);
stmt.executeUpdate();
stmt.setString(1, "pk1011");
stmt.setString(2, "1011");
stmt.setObject(3, bsonDocument3);
stmt.executeUpdate();
conn.commit();
Thread.sleep(100);
Timestamp ts2 = new Timestamp(System.currentTimeMillis());
testCDCAfterFirstUpsert(conn, cdcName, ts1, ts2, bsonDocument1, bsonDocument2, bsonDocument3);
ts1 = new Timestamp(System.currentTimeMillis());
Thread.sleep(100);
//{
// "$and": [
// {
// "press": {
// "$eq": "beat"
// }
// },
// {
// "track[0].shot[2][0].city.standard[50]": {
// "$eq": "softly"
// }
// }
// ]
//}
BsonDocument conditionDoc = new BsonDocument();
BsonArray andList1 = new BsonArray();
andList1.add(new BsonDocument()
.append("press", new BsonDocument()
.append("$eq", new BsonString("beat"))));
andList1.add(new BsonDocument()
.append("track[0].shot[2][0].city.standard[50]", new BsonDocument()
.append("$eq", new BsonString("softly"))));
conditionDoc.put("$and", andList1);
String query = "SELECT * FROM " + tableName +
" WHERE PK1 = 'pk0001' AND C1 = '0002' AND NOT BSON_CONDITION_EXPRESSION(COL, '"
+ conditionDoc.toJson() + "')";
ResultSet rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("pk0001", rs.getString(1));
assertEquals("0002", rs.getString(2));
BsonDocument document1 = (BsonDocument) rs.getObject(3);
assertEquals(bsonDocument1, document1);
assertFalse(rs.next());
//{
// "$and": [
// {
// "press": {
// "$eq": "beat"
// }
// },
// {
// "track[0].shot[2][0].city.standard[5]": {
// "$eq": "softly"
// }
// }
// ]
//}
conditionDoc = new BsonDocument();
andList1 = new BsonArray();
andList1.add(new BsonDocument()
.append("press", new BsonDocument()
.append("$eq", new BsonString("beat"))));
andList1.add(new BsonDocument()
.append("track[0].shot[2][0].city.standard[5]", new BsonDocument()
.append("$eq", new BsonString("softly"))));
conditionDoc.put("$and", andList1);
query = "SELECT * FROM " + tableName +
" WHERE PK1 = 'pk0001' AND C1 = '0002' AND BSON_CONDITION_EXPRESSION(COL, '"
+ conditionDoc.toJson() + "')";
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("pk0001", rs.getString(1));
assertEquals("0002", rs.getString(2));
document1 = (BsonDocument) rs.getObject(3);
assertEquals(bsonDocument1, document1);
assertFalse(rs.next());
//{
// "$SET": {
// "browserling": {
// "$binary": {
// "base64": "PkHjukNzgcg=",
// "subType": "00"
// }
// },
// "track[0].shot[2][0].city.standard[5]": "soft",
// "track[0].shot[2][0].city.problem[2]": "track[0].shot[2][0].city.problem[2] + 529.435"
// },
// "$UNSET": {
// "track[0].shot[2][0].city.flame": null
// }
//}
BsonDocument updateExp = new BsonDocument()
.append("$SET", new BsonDocument()
.append("browserling",
new BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095)))
.append("track[0].shot[2][0].city.standard[5]", new BsonString("soft"))
.append("track[0].shot[2][0].city.problem[2]",
new BsonString("track[0].shot[2][0].city.problem[2] + 529.435")))
.append("$UNSET", new BsonDocument()
.append("track[0].shot[2][0].city.flame", new BsonNull()));
stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')"
+ " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END,"
+ " C1 = ?");
stmt.setString(1, "pk0001");
stmt.setString(2, "0003");
stmt.executeUpdate();
//{
// "$ADD": {
// "new_samples": {
// "$set": [
// {
// "$binary": {
// "base64": "U2FtcGxlMTA=",
// "subType": "00"
// }
// },
// {
// "$binary": {
// "base64": "U2FtcGxlMTI=",
// "subType": "00"
// }
// },
// {
// "$binary": {
// "base64": "U2FtcGxlMTM=",
// "subType": "00"
// }
// },
// {
// "$binary": {
// "base64": "U2FtcGxlMTQ=",
// "subType": "00"
// }
// }
// ]
// }
// },
// "$DELETE_FROM_SET": {
// "new_samples": {
// "$set": [
// {
// "$binary": {
// "base64": "U2FtcGxlMDI=",
// "subType": "00"
// }
// },
// {
// "$binary": {
// "base64": "U2FtcGxlMDM=",
// "subType": "00"
// }
// }
// ]
// }
// },
// "$SET": {
// "newrecord": {
// "speed": "sun",
// "shot": [
// "fun",
// true,
// [
// {
// "character": 413968576,
// "earth": "helpful",
// "money": false,
// "city": {
// "softly": "service",
// "standard": [
// "pour",
// false,
// true,
// true,
// true,
// "softly",
// "happened"
// ],
// "problem": [
// -687102682.7731872,
// "tightly",
// 1527061470.2690287,
// {
// "condition": "else",
// "higher": 1462910924.088698,
// "scene": false,
// "safety": 240784722.66658115,
// "catch": false,
// "behavior": true,
// "protection": true
// },
// "torn",
// false,
// "eat"
// ],
// "flame": 1066643931,
// "rest": -1053428849,
// "additional": -442539394.7937908,
// "brought": "rock"
// },
// "upward": -1306729583.8727202,
// "sky": "act",
// "height": true
// },
// -2042805074.4290242,
// "settlers",
// 1455555511.4875226,
// -1448763321,
// false,
// 379701747
// ],
// false,
// 1241794365,
// "capital",
// false
// ],
// "hidden": false,
// "truth": "south",
// "other": true,
// "disease": "disease"
// }
// },
// "$UNSET": {
// "rather[3].outline.halfway.so[2][2]": null
// }
//}
updateExp = new BsonDocument()
.append("$ADD", new BsonDocument()
.append("new_samples",
new BsonDocument().append("$set",
new BsonArray(Arrays.asList(
new BsonBinary(Bytes.toBytes("Sample10")),
new BsonBinary(Bytes.toBytes("Sample12")),
new BsonBinary(Bytes.toBytes("Sample13")),
new BsonBinary(Bytes.toBytes("Sample14"))
)))))
.append("$DELETE_FROM_SET", new BsonDocument()
.append("new_samples",
new BsonDocument().append("$set",
new BsonArray(Arrays.asList(
new BsonBinary(Bytes.toBytes("Sample02")),
new BsonBinary(Bytes.toBytes("Sample03"))
)))))
.append("$SET", new BsonDocument()
.append("newrecord", ((BsonArray) (document1.get("track"))).get(0)))
.append("$UNSET", new BsonDocument()
.append("rather[3].outline.halfway.so[2][2]", new BsonNull()));
//{
// "$and": [
// {
// "press": {
// "$exists": false
// }
// },
// {
// "rather[3].outline.halfway.so[2][2]": {
// "$exists": true
// }
// }
// ]
//}
conditionDoc = new BsonDocument();
andList1 = new BsonArray();
andList1.add(new BsonDocument()
.append("press", new BsonDocument()
.append("$exists", new BsonBoolean(false))));
andList1.add(new BsonDocument()
.append("rather[3].outline.halfway.so[2][2]", new BsonDocument()
.append("$exists", new BsonBoolean(true))));
conditionDoc.put("$and", andList1);
stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')"
+ " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END");
stmt.setString(1, "pk1010");
stmt.executeUpdate();
//{
// "$SET": {
// "result[1].location.state": "AK"
// },
// "$UNSET": {
// "result[4].emails[1]": null
// }
//}
updateExp = new BsonDocument()
.append("$SET", new BsonDocument()
.append("result[1].location.state", new BsonString("AK")))
.append("$UNSET", new BsonDocument()
.append("result[4].emails[1]", new BsonNull()));
//{
// "$or": [
// {
// "result[2].location.coordinates.latitude": {
// "$gt": 0
// }
// },
// {
// "$and": [
// {
// "result[1].location": {
// "$exists": true
// }
// },
// {
// "result[1].location.state": {
// "$ne": "AK"
// }
// },
// {
// "result[4].emails[1]": {
// "$exists": true
// }
// }
// ]
// }
// ]
//}
conditionDoc = new BsonDocument();
BsonArray orList1 = new BsonArray();
andList1 = new BsonArray();
BsonDocument andDoc1 = new BsonDocument();
andList1.add(new BsonDocument()
.append("result[1].location", new BsonDocument()
.append("$exists", new BsonBoolean(true))));
andList1.add(new BsonDocument()
.append("result[1].location.state", new BsonDocument()
.append("$ne", new BsonString("AK"))));
andList1.add(new BsonDocument()
.append("result[4].emails[1]", new BsonDocument()
.append("$exists", new BsonBoolean(true))));
andDoc1.put("$and", andList1);
orList1.add(new BsonDocument()
.append("result[2].location.coordinates.latitude",
new BsonDocument("$gt", new BsonDouble(0))));
orList1.add(andDoc1);
conditionDoc.put("$or", orList1);
stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')"
+ " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END");
stmt.setString(1, "pk1011");
stmt.executeUpdate();
conn.commit();
Thread.sleep(100);
ts2 = new Timestamp(System.currentTimeMillis());
testCDCPostUpdate(conn, cdcName, ts1, ts2, bsonDocument1, bsonDocument2, bsonDocument3);
query = "SELECT * FROM " + tableName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("pk0001", rs.getString(1));
assertEquals("0003", rs.getString(2));
document1 = (BsonDocument) rs.getObject(3);
String updatedJson = getJsonString("json/sample_updated_01.json");
assertEquals(RawBsonDocument.parse(updatedJson), document1);
assertTrue(rs.next());
assertEquals("pk1010", rs.getString(1));
assertEquals("1010", rs.getString(2));
BsonDocument document2 = (BsonDocument) rs.getObject(3);
updatedJson = getJsonString("json/sample_updated_02.json");
assertEquals(RawBsonDocument.parse(updatedJson), document2);
assertTrue(rs.next());
assertEquals("pk1011", rs.getString(1));
assertEquals("1011", rs.getString(2));
BsonDocument document3 = (BsonDocument) rs.getObject(3);
updatedJson = getJsonString("json/sample_updated_03.json");
assertEquals(RawBsonDocument.parse(updatedJson), document3);
assertFalse(rs.next());
}
}