in spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala [590:646]
def testUpdate(): Unit = {
val target = wrapIndex(resource("test-update", "data", version))
val docPath = wrapIndex(docEndpoint("test-update", "data", version))
val test = new StreamingQueryTestHarness[Record](spark)
test.withInput(Record(1, "Spark"))
.withInput(Record(2, "Hadoop"))
.withInput(Record(3, "YARN"))
.startTest {
test.stream
.writeStream
.outputMode("update")
.option("checkpointLocation", checkpoint(target))
.option(ES_MAPPING_ID, "id")
.format("es")
.start(target)
}
test.waitForPartialCompletion()
assertTrue(RestUtils.exists(target))
assertTrue(RestUtils.exists(docPath + "/1"))
assertTrue(RestUtils.exists(docPath + "/2"))
assertTrue(RestUtils.exists(docPath + "/3"))
var searchResult = RestUtils.get(target + "/_search?")
assertThat(searchResult, containsString("Spark"))
assertThat(searchResult, containsString("Hadoop"))
assertThat(searchResult, containsString("YARN"))
test.withInput(Record(1, "Spark"))
.withInput(Record(2, "Hadoop2"))
.withInput(Record(3, "YARN"))
test.waitForCompletion()
searchResult = RestUtils.get(target + "/_search?version=true")
val result: java.util.Map[String, Object] = new ObjectMapper().readValue(searchResult, classOf[java.util.Map[String, Object]])
val hits = result.get("hits").asInstanceOf[java.util.Map[String, Object]].get("hits").asInstanceOf[java.util.List[java.util.Map[String,
Object]]]
hits.forEach(hit => {
hit.get("_id").asInstanceOf[String] match {
case "1" => {
assertEquals(1, hit.get("_version"))
val value = hit.get("_source").asInstanceOf[java.util.Map[String, Object]].get("name").asInstanceOf[String]
assertEquals("Spark", value)
}
case "2" => {
assertEquals(2, hit.get("_version")) // The only one that should have been updated
val value = hit.get("_source").asInstanceOf[java.util.Map[String, Object]].get("name").asInstanceOf[String]
assertEquals("Hadoop2", value)
}
case "3" => {
assertEquals(1, hit.get("_version"))
val value = hit.get("_source").asInstanceOf[java.util.Map[String, Object]].get("name").asInstanceOf[String]
assertEquals("YARN", value)
}
case _ => throw new AssertionError("Unexpected result")
}
})
}