in spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsScalaSparkStreaming.scala [315:376]
def testEsRDDWriteWithUpsertScriptUsingBothObjectAndRegularString(): Unit = {
val mapping = wrapMapping("data", s"""{
| "properties": {
| "id": {
| "type": "$keyword"
| },
| "note": {
| "type": "$keyword"
| },
| "address": {
| "type": "nested",
| "properties": {
| "id": { "type": "$keyword" },
| "zipcode": { "type": "$keyword" }
| }
| }
| }
|}""".stripMargin)
val index = "spark-streaming-test-contact"
val typed = "data"
val target = resource(index, typed, version)
val docPath = docEndpoint(index, typed, version)
RestUtils.touch(index)
RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8))
RestUtils.postData(s"$docPath/1", """{ "id" : "1", "note": "First", "address": [] }""".getBytes(StringUtils.UTF_8))
RestUtils.postData(s"$docPath/2", """{ "id" : "2", "note": "First", "address": [] }""".getBytes(StringUtils.UTF_8))
val lang = if (version.onOrAfter(EsMajorVersion.V_5_X)) "painless" else "groovy"
val props = Map("es.write.operation" -> "upsert",
"es.input.json" -> "true",
"es.mapping.id" -> "id",
"es.update.script.lang" -> lang
)
// Upsert a value that should only modify the first document. Modification will add an address entry.
val lines = sc.makeRDD(List("""{"id":"1","address":{"zipcode":"12345","id":"1"}}"""))
val up_params = "new_address:address"
val up_script = if (version.onOrAfter(EsMajorVersion.V_5_X)) {
"ctx._source.address.add(params.new_address)"
} else {
"ctx._source.address+=new_address"
}
runStreamRecoverably(lines)(_.saveToEs(target, props + ("es.update.script.params" -> up_params) + ("es.update.script" -> up_script)))
// Upsert a value that should only modify the second document. Modification will update the "note" field.
val notes = sc.makeRDD(List("""{"id":"2","note":"Second"}"""))
val note_up_params = "new_note:note"
val note_up_script = if (version.onOrAfter(EsMajorVersion.V_5_X)) {
"ctx._source.note = params.new_note"
} else {
"ctx._source.note=new_note"
}
runStream(notes)(_.saveToEs(target, props + ("es.update.script.params" -> note_up_params) + ("es.update.script" -> note_up_script)))
assertTrue(RestUtils.exists(s"$docPath/1"))
assertThat(RestUtils.get(s"$docPath/1"), both(containsString(""""zipcode":"12345"""")).and(containsString(""""note":"First"""")))
assertTrue(RestUtils.exists(s"$docPath/2"))
assertThat(RestUtils.get(s"$docPath/2"), both(not(containsString(""""zipcode":"12345""""))).and(containsString(""""note":"Second"""")))
}