in spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractJavaEsSparkStructuredStreamingTest.java [437:533]
public void test3WriteWithUpsertScript() throws Exception {
// BWC
String keyword = "keyword";
String lang = "painless";
if (version.onOrBefore(EsMajorVersion.V_2_X)) {
keyword = "string";
lang = "groovy";
}
// Init
String mapping = "{\"data\":{\"properties\":{\"id\":{\"type\":\""+keyword+"\"},\"note\":{\"type\":\""+keyword+"\"},\"address\":{\"type\":\"nested\",\"properties\":{\"id\":{\"type\":\""+keyword+"\"},\"zipcode\":{\"type\":\""+keyword+"\"}}}}}}";
String index = wrapIndex("test-script-upsert");
String type = "data";
String target = resource(index, type);
String docPath = docPath(index, type);
RestUtils.touch(index);
RestUtils.putMapping(index, type, mapping.getBytes());
RestUtils.postData(docPath+"/1", "{\"id\":\"1\",\"note\":\"First\",\"address\":[]}".getBytes());
RestUtils.postData(docPath+"/2", "{\"id\":\"2\",\"note\":\"First\",\"address\":[]}".getBytes());
// Common configurations
Map<String, String> updateProperties = new HashMap<>();
updateProperties.put("es.write.operation", "upsert");
updateProperties.put("es.mapping.id", "id");
updateProperties.put("es.update.script.lang", lang);
// Run 1
ContactBean doc1;
{
AddressBean address = new AddressBean();
address.setId("1");
address.setZipcode("12345");
doc1 = new ContactBean();
doc1.setId("1");
doc1.setAddress(address);
}
String script1;
if (version.onOrAfter(EsMajorVersion.V_5_X)) {
script1 = "ctx._source.address.add(params.new_address)";
} else {
script1 = "ctx._source.address+=new_address";
}
JavaStreamingQueryTestHarness<ContactBean> test1 = new JavaStreamingQueryTestHarness<>(spark, Encoders.bean(ContactBean.class));
test1
.withInput(doc1)
.run(
test1.stream()
.writeStream()
.option("checkpointLocation", checkpoint(target))
.options(updateProperties)
.option("es.update.script.params", "new_address:address")
.option("es.update.script", script1)
.format("es"),
target
);
// Run 2
ContactBean doc2;
{
doc2 = new ContactBean();
doc2.setId("2");
doc2.setNote("Second");
}
String script2;
if (version.onOrAfter(EsMajorVersion.V_5_X)) {
script2 = "ctx._source.note = params.new_note";
} else {
script2 = "ctx._source.note=new_note";
}
JavaStreamingQueryTestHarness<ContactBean> test2 = new JavaStreamingQueryTestHarness<>(spark, Encoders.bean(ContactBean.class));
test2
.withInput(doc2)
.run(
test2.stream()
.writeStream()
.option("checkpointLocation", checkpoint(target))
.options(updateProperties)
.option("es.update.script.params", "new_note:note")
.option("es.update.script", script2)
.format("es"),
target
);
// Validate
assertTrue(RestUtils.exists(docPath + "/1"));
assertThat(RestUtils.get(docPath + "/1"), both(containsString("\"zipcode\":\"12345\"")).and(containsString("\"note\":\"First\"")));
assertTrue(RestUtils.exists(docPath + "/2"));
assertThat(RestUtils.get(docPath + "/2"), both(not(containsString("\"zipcode\":\"12345\""))).and(containsString("\"note\":\"Second\"")));
}