def testUpdate()

in spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala [590:646]


  def testUpdate(): Unit = {
    val target = wrapIndex(resource("test-update", "data", version))
    val docPath = wrapIndex(docEndpoint("test-update", "data", version))
    val test = new StreamingQueryTestHarness[Record](spark)

    test.withInput(Record(1, "Spark"))
      .withInput(Record(2, "Hadoop"))
      .withInput(Record(3, "YARN"))
      .startTest {
        test.stream
          .writeStream
          .outputMode("update")
          .option("checkpointLocation", checkpoint(target))
          .option(ES_MAPPING_ID, "id")
          .format("es")
          .start(target)
      }
    test.waitForPartialCompletion()

    assertTrue(RestUtils.exists(target))
    assertTrue(RestUtils.exists(docPath + "/1"))
    assertTrue(RestUtils.exists(docPath + "/2"))
    assertTrue(RestUtils.exists(docPath + "/3"))
    var searchResult = RestUtils.get(target + "/_search?")
    assertThat(searchResult, containsString("Spark"))
    assertThat(searchResult, containsString("Hadoop"))
    assertThat(searchResult, containsString("YARN"))

    test.withInput(Record(1, "Spark"))
      .withInput(Record(2, "Hadoop2"))
      .withInput(Record(3, "YARN"))
    test.waitForCompletion()
    searchResult = RestUtils.get(target + "/_search?version=true")
    val result: java.util.Map[String, Object] = new ObjectMapper().readValue(searchResult, classOf[java.util.Map[String, Object]])
    val hits = result.get("hits").asInstanceOf[java.util.Map[String, Object]].get("hits").asInstanceOf[java.util.List[java.util.Map[String,
      Object]]]
    hits.forEach(hit => {
      hit.get("_id").asInstanceOf[String] match {
          case "1" => {
            assertEquals(1, hit.get("_version"))
            val value = hit.get("_source").asInstanceOf[java.util.Map[String, Object]].get("name").asInstanceOf[String]
            assertEquals("Spark", value)
          }
          case "2" => {
            assertEquals(2, hit.get("_version")) // The only one that should have been updated
            val value = hit.get("_source").asInstanceOf[java.util.Map[String, Object]].get("name").asInstanceOf[String]
            assertEquals("Hadoop2", value)
          }
          case "3" => {
            assertEquals(1, hit.get("_version"))
            val value = hit.get("_source").asInstanceOf[java.util.Map[String, Object]].get("name").asInstanceOf[String]
            assertEquals("YARN", value)
          }
          case _ => throw new AssertionError("Unexpected result")
      }
    })
  }