public void testEsRDDWriteWithUpsertScriptUsingBothObjectAndRegularString()

in spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkStreamingTest.java [514:598]


    public void testEsRDDWriteWithUpsertScriptUsingBothObjectAndRegularString() throws Exception {
        String keyword = "keyword";
        if (version.onOrBefore(EsMajorVersion.V_2_X)) {
            keyword = "string";
        }

        String mapping = "{\"properties\":{\"id\":{\"type\":\""+keyword+"\"},\"note\":{\"type\":\""+keyword+"\"},\"address\":{\"type\":\"nested\",\"properties\":{\"id\":{\"type\":\""+keyword+"\"},\"zipcode\":{\"type\":\""+keyword+"\"}}}}}";
        if (!TestUtils.isTypelessVersion(version)) {
            mapping = "{\"data\":"+mapping+"}";
        }
        String index = wrapIndex("spark-streaming-test-contact");
        String type = "data";
        String target = resource(index, type, version);
        String docEndpoint = docEndpoint(index, type, version);

        RestUtils.touch(index);
        RestUtils.putMapping(index, type, mapping.getBytes());
        RestUtils.postData(docEndpoint+"/1", "{\"id\":\"1\",\"note\":\"First\",\"address\":[]}".getBytes());
        RestUtils.postData(docEndpoint+"/2", "{\"id\":\"2\",\"note\":\"First\",\"address\":[]}".getBytes());

        String lang = "painless";
        if (version.onOrBefore(EsMajorVersion.V_2_X)) {
            lang = "groovy";
        }
        Map<String, String> props = new HashMap<>();
        props.put("es.write.operation", "upsert");
        props.put("es.input.json", "true");
        props.put("es.mapping.id", "id");
        props.put("es.update.script.lang", lang);

        String doc1 = "{\"id\":\"1\",\"address\":{\"zipcode\":\"12345\",\"id\":\"1\"}}";
        List<String> docs1 = new ArrayList<>();
        docs1.add(doc1);
        String upParams = "new_address:address";
        String upScript;
        if (version.onOrAfter(EsMajorVersion.V_5_X)) {
            upScript = "ctx._source.address.add(params.new_address)";
        } else {
            upScript = "ctx._source.address+=new_address";
        }

        Map<String, String> localConf1 = new HashMap<>(props);
        localConf1.put("es.update.script.params", upParams);
        localConf1.put("es.update.script", upScript);

        JavaRDD<String> batch1 = sc.parallelize(docs1);
        Queue<JavaRDD<String>> rddQueue1 = new LinkedList<>();
        rddQueue1.add(batch1);
        JavaDStream<String> dstream1 = ssc.queueStream(rddQueue1);
        JavaEsSparkStreaming.saveJsonToEs(dstream1, target, localConf1);
        ssc.start();
        TimeUnit.SECONDS.sleep(2);
        ssc.stop(false, true);
        ssc = new JavaStreamingContext(sc, Seconds.apply(1));

        String doc2 = "{\"id\":\"2\",\"note\":\"Second\"}";
        List<String> docs2 = new ArrayList<>();
        docs2.add(doc2);
        String noteUpParams = "new_note:note";
        String noteUpScript;
        if (version.onOrAfter(EsMajorVersion.V_5_X)) {
            noteUpScript = "ctx._source.note = params.new_note";
        } else {
            noteUpScript = "ctx._source.note=new_note";
        }

        Map<String, String> localConf2 = new HashMap<>(props);
        localConf2.put("es.update.script.params", noteUpParams);
        localConf2.put("es.update.script", noteUpScript);

        JavaRDD<String> batch2 = sc.parallelize(docs2);
        Queue<JavaRDD<String>> rddQueue2 = new LinkedList<>();
        rddQueue2.add(batch2);
        JavaDStream<String> dstream2 = ssc.queueStream(rddQueue2);
        JavaEsSparkStreaming.saveJsonToEs(dstream2, target, localConf2);
        ssc.start();
        TimeUnit.SECONDS.sleep(2);
        ssc.stop(false, true);

        assertTrue(RestUtils.exists(docEndpoint + "/1"));
        assertThat(RestUtils.get(docEndpoint + "/1"), both(containsString("\"zipcode\":\"12345\"")).and(containsString("\"note\":\"First\"")));

        assertTrue(RestUtils.exists(docEndpoint + "/2"));
        assertThat(RestUtils.get(docEndpoint + "/2"), both(not(containsString("\"zipcode\":\"12345\""))).and(containsString("\"note\":\"Second\"")));
    }