in spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkStreamingTest.java [514:598]
public void testEsRDDWriteWithUpsertScriptUsingBothObjectAndRegularString() throws Exception {
String keyword = "keyword";
if (version.onOrBefore(EsMajorVersion.V_2_X)) {
keyword = "string";
}
String mapping = "{\"properties\":{\"id\":{\"type\":\""+keyword+"\"},\"note\":{\"type\":\""+keyword+"\"},\"address\":{\"type\":\"nested\",\"properties\":{\"id\":{\"type\":\""+keyword+"\"},\"zipcode\":{\"type\":\""+keyword+"\"}}}}}";
if (!TestUtils.isTypelessVersion(version)) {
mapping = "{\"data\":"+mapping+"}";
}
String index = wrapIndex("spark-streaming-test-contact");
String type = "data";
String target = resource(index, type, version);
String docEndpoint = docEndpoint(index, type, version);
RestUtils.touch(index);
RestUtils.putMapping(index, type, mapping.getBytes());
RestUtils.postData(docEndpoint+"/1", "{\"id\":\"1\",\"note\":\"First\",\"address\":[]}".getBytes());
RestUtils.postData(docEndpoint+"/2", "{\"id\":\"2\",\"note\":\"First\",\"address\":[]}".getBytes());
String lang = "painless";
if (version.onOrBefore(EsMajorVersion.V_2_X)) {
lang = "groovy";
}
Map<String, String> props = new HashMap<>();
props.put("es.write.operation", "upsert");
props.put("es.input.json", "true");
props.put("es.mapping.id", "id");
props.put("es.update.script.lang", lang);
String doc1 = "{\"id\":\"1\",\"address\":{\"zipcode\":\"12345\",\"id\":\"1\"}}";
List<String> docs1 = new ArrayList<>();
docs1.add(doc1);
String upParams = "new_address:address";
String upScript;
if (version.onOrAfter(EsMajorVersion.V_5_X)) {
upScript = "ctx._source.address.add(params.new_address)";
} else {
upScript = "ctx._source.address+=new_address";
}
Map<String, String> localConf1 = new HashMap<>(props);
localConf1.put("es.update.script.params", upParams);
localConf1.put("es.update.script", upScript);
JavaRDD<String> batch1 = sc.parallelize(docs1);
Queue<JavaRDD<String>> rddQueue1 = new LinkedList<>();
rddQueue1.add(batch1);
JavaDStream<String> dstream1 = ssc.queueStream(rddQueue1);
JavaEsSparkStreaming.saveJsonToEs(dstream1, target, localConf1);
ssc.start();
TimeUnit.SECONDS.sleep(2);
ssc.stop(false, true);
ssc = new JavaStreamingContext(sc, Seconds.apply(1));
String doc2 = "{\"id\":\"2\",\"note\":\"Second\"}";
List<String> docs2 = new ArrayList<>();
docs2.add(doc2);
String noteUpParams = "new_note:note";
String noteUpScript;
if (version.onOrAfter(EsMajorVersion.V_5_X)) {
noteUpScript = "ctx._source.note = params.new_note";
} else {
noteUpScript = "ctx._source.note=new_note";
}
Map<String, String> localConf2 = new HashMap<>(props);
localConf2.put("es.update.script.params", noteUpParams);
localConf2.put("es.update.script", noteUpScript);
JavaRDD<String> batch2 = sc.parallelize(docs2);
Queue<JavaRDD<String>> rddQueue2 = new LinkedList<>();
rddQueue2.add(batch2);
JavaDStream<String> dstream2 = ssc.queueStream(rddQueue2);
JavaEsSparkStreaming.saveJsonToEs(dstream2, target, localConf2);
ssc.start();
TimeUnit.SECONDS.sleep(2);
ssc.stop(false, true);
assertTrue(RestUtils.exists(docEndpoint + "/1"));
assertThat(RestUtils.get(docEndpoint + "/1"), both(containsString("\"zipcode\":\"12345\"")).and(containsString("\"note\":\"First\"")));
assertTrue(RestUtils.exists(docEndpoint + "/2"));
assertThat(RestUtils.get(docEndpoint + "/2"), both(not(containsString("\"zipcode\":\"12345\""))).and(containsString("\"note\":\"Second\"")));
}