def testEsRDDWriteWithUpsertScriptUsingBothObjectAndRegularString()

in spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsScalaSparkStreaming.scala [315:376]


  def testEsRDDWriteWithUpsertScriptUsingBothObjectAndRegularString(): Unit = {
    val mapping = wrapMapping("data", s"""{
                    |    "properties": {
                    |      "id": {
                    |        "type": "$keyword"
                    |      },
                    |      "note": {
                    |        "type": "$keyword"
                    |      },
                    |      "address": {
                    |        "type": "nested",
                    |        "properties": {
                    |          "id":    { "type": "$keyword"  },
                    |          "zipcode": { "type": "$keyword"  }
                    |        }
                    |      }
                    |    }
                    |}""".stripMargin)

    val index = "spark-streaming-test-contact"
    val typed = "data"
    val target = resource(index, typed, version)
    val docPath = docEndpoint(index, typed, version)

    RestUtils.touch(index)
    RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8))
    RestUtils.postData(s"$docPath/1", """{ "id" : "1", "note": "First", "address": [] }""".getBytes(StringUtils.UTF_8))
    RestUtils.postData(s"$docPath/2", """{ "id" : "2", "note": "First", "address": [] }""".getBytes(StringUtils.UTF_8))

    val lang = if (version.onOrAfter(EsMajorVersion.V_5_X)) "painless" else "groovy"
    val props = Map("es.write.operation" -> "upsert",
      "es.input.json" -> "true",
      "es.mapping.id" -> "id",
      "es.update.script.lang" -> lang
    )

    // Upsert a value that should only modify the first document. Modification will add an address entry.
    val lines = sc.makeRDD(List("""{"id":"1","address":{"zipcode":"12345","id":"1"}}"""))
    val up_params = "new_address:address"
    val up_script = if (version.onOrAfter(EsMajorVersion.V_5_X)) {
      "ctx._source.address.add(params.new_address)"
    } else {
      "ctx._source.address+=new_address"
    }
    runStreamRecoverably(lines)(_.saveToEs(target, props + ("es.update.script.params" -> up_params) + ("es.update.script" -> up_script)))

    // Upsert a value that should only modify the second document. Modification will update the "note" field.
    val notes = sc.makeRDD(List("""{"id":"2","note":"Second"}"""))
    val note_up_params = "new_note:note"
    val note_up_script = if (version.onOrAfter(EsMajorVersion.V_5_X)) {
      "ctx._source.note = params.new_note"
    } else {
      "ctx._source.note=new_note"
    }
    runStream(notes)(_.saveToEs(target, props + ("es.update.script.params" -> note_up_params) + ("es.update.script" -> note_up_script)))

    assertTrue(RestUtils.exists(s"$docPath/1"))
    assertThat(RestUtils.get(s"$docPath/1"), both(containsString(""""zipcode":"12345"""")).and(containsString(""""note":"First"""")))

    assertTrue(RestUtils.exists(s"$docPath/2"))
    assertThat(RestUtils.get(s"$docPath/2"), both(not(containsString(""""zipcode":"12345""""))).and(containsString(""""note":"Second"""")))
  }