def testEsRDDWriteWithUpsertScriptUsingBothObjectAndRegularString()

in spark/core/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSpark.scala [586:652]


  def testEsRDDWriteWithUpsertScriptUsingBothObjectAndRegularString() {
    val mapping = wrapMapping("data", s"""{
                    |    "properties": {
                    |      "id": {
                    |        "type": "$keyword"
                    |      },
                    |      "note": {
                    |        "type": "$keyword"
                    |      },
                    |      "address": {
                    |        "type": "nested",
                    |        "properties": {
                    |          "id":    { "type": "$keyword"  },
                    |          "zipcode": { "type": "$keyword"  }
                    |        }
                    |      }
                    |    }
                    |}""".stripMargin)

    val index = wrapIndex("spark-test-contact")
    val typename = "data"
    val target = resource(index, typename, version)
    val docPath = docEndpoint(index, typename, version)

    RestUtils.touch(index)

    RestUtils.putMapping(index, typename, mapping.getBytes())
    RestUtils.postData(s"$docPath/1", """{ "id" : "1", "note": "First", "address": [] }""".getBytes(StringUtils.UTF_8))
    RestUtils.postData(s"$docPath/2", """{ "id" : "2", "note": "First", "address": [] }""".getBytes(StringUtils.UTF_8))

    val lang = if (version.onOrAfter(EsMajorVersion.V_5_X)) "painless" else "groovy"
    val props = Map("es.write.operation" -> "upsert",
      "es.input.json" -> "true",
      "es.mapping.id" -> "id",
      "es.update.script.lang" -> lang
    )

    // Upsert a value that should only modify the first document. Modification will add an address entry.
    val lines = sc.makeRDD(List("""{"id":"1","address":{"zipcode":"12345","id":"1"}}"""))
    val up_params = "new_address:address"
    val up_script = {
      if (version.onOrAfter(EsMajorVersion.V_5_X)) {
        "ctx._source.address.add(params.new_address)"
      } else {
        "ctx._source.address+=new_address"
      }
    }
    lines.saveToEs(target, props + ("es.update.script.params" -> up_params) + ("es.update.script" -> up_script))

    // Upsert a value that should only modify the second document. Modification will update the "note" field.
    val notes = sc.makeRDD(List("""{"id":"2","note":"Second"}"""))
    val note_up_params = "new_note:note"
    val note_up_script = {
      if (version.onOrAfter(EsMajorVersion.V_5_X)) {
        "ctx._source.note = params.new_note"
      } else {
        "ctx._source.note=new_note"
      }
    }
    notes.saveToEs(target, props + ("es.update.script.params" -> note_up_params) + ("es.update.script" -> note_up_script))

    assertTrue(RestUtils.exists(s"$docPath/1"))
    assertThat(RestUtils.get(s"$docPath/1"), both(containsString(""""zipcode":"12345"""")).and(containsString(""""note":"First"""")))

    assertTrue(RestUtils.exists(s"$docPath/2"))
    assertThat(RestUtils.get(s"$docPath/2"), both(not(containsString(""""zipcode":"12345""""))).and(containsString(""""note":"Second"""")))
  }