public void test3WriteWithUpsertScript()

in spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractJavaEsSparkStructuredStreamingTest.java [437:533]


    public void test3WriteWithUpsertScript() throws Exception {
        // BWC
        String keyword = "keyword";
        String lang = "painless";
        if (version.onOrBefore(EsMajorVersion.V_2_X)) {
            keyword = "string";
            lang = "groovy";
        }

        // Init
        String mapping = "{\"data\":{\"properties\":{\"id\":{\"type\":\""+keyword+"\"},\"note\":{\"type\":\""+keyword+"\"},\"address\":{\"type\":\"nested\",\"properties\":{\"id\":{\"type\":\""+keyword+"\"},\"zipcode\":{\"type\":\""+keyword+"\"}}}}}}";
        String index = wrapIndex("test-script-upsert");
        String type = "data";
        String target = resource(index, type);
        String docPath = docPath(index, type);

        RestUtils.touch(index);
        RestUtils.putMapping(index, type, mapping.getBytes());
        RestUtils.postData(docPath+"/1", "{\"id\":\"1\",\"note\":\"First\",\"address\":[]}".getBytes());
        RestUtils.postData(docPath+"/2", "{\"id\":\"2\",\"note\":\"First\",\"address\":[]}".getBytes());

        // Common configurations
        Map<String, String> updateProperties = new HashMap<>();
        updateProperties.put("es.write.operation", "upsert");
        updateProperties.put("es.mapping.id", "id");
        updateProperties.put("es.update.script.lang", lang);

        // Run 1
        ContactBean doc1;
        {
            AddressBean address = new AddressBean();
            address.setId("1");
            address.setZipcode("12345");
            doc1 = new ContactBean();
            doc1.setId("1");
            doc1.setAddress(address);
        }

        String script1;
        if (version.onOrAfter(EsMajorVersion.V_5_X)) {
            script1 = "ctx._source.address.add(params.new_address)";
        } else {
            script1 = "ctx._source.address+=new_address";
        }

        JavaStreamingQueryTestHarness<ContactBean> test1 = new JavaStreamingQueryTestHarness<>(spark, Encoders.bean(ContactBean.class));

        test1
            .withInput(doc1)
            .run(
                test1.stream()
                    .writeStream()
                    .option("checkpointLocation", checkpoint(target))
                    .options(updateProperties)
                    .option("es.update.script.params", "new_address:address")
                    .option("es.update.script", script1)
                    .format("es"),
                target
            );

        // Run 2
        ContactBean doc2;
        {
            doc2 = new ContactBean();
            doc2.setId("2");
            doc2.setNote("Second");
        }

        String script2;
        if (version.onOrAfter(EsMajorVersion.V_5_X)) {
            script2 = "ctx._source.note = params.new_note";
        } else {
            script2 = "ctx._source.note=new_note";
        }

        JavaStreamingQueryTestHarness<ContactBean> test2 = new JavaStreamingQueryTestHarness<>(spark, Encoders.bean(ContactBean.class));

        test2
            .withInput(doc2)
            .run(
                test2.stream()
                    .writeStream()
                    .option("checkpointLocation", checkpoint(target))
                    .options(updateProperties)
                    .option("es.update.script.params", "new_note:note")
                    .option("es.update.script", script2)
                    .format("es"),
                target
            );

        // Validate
        assertTrue(RestUtils.exists(docPath + "/1"));
        assertThat(RestUtils.get(docPath + "/1"), both(containsString("\"zipcode\":\"12345\"")).and(containsString("\"note\":\"First\"")));

        assertTrue(RestUtils.exists(docPath + "/2"));
        assertThat(RestUtils.get(docPath + "/2"), both(not(containsString("\"zipcode\":\"12345\""))).and(containsString("\"note\":\"Second\"")));
    }