in spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala [1359:1425]
def testEsDataFrame52OverwriteExistingDataSourceWithJoinField() {
// Join added in 6.0.
EsAssume.versionOnOrAfter(EsMajorVersion.V_6_X, "Join added in 6.0.")
// using long-form joiner values
val schema = StructType(Seq(
StructField("id", StringType, nullable = false),
StructField("company", StringType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("joiner", StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("parent", StringType, nullable = true)
)))
))
val parents = Seq(
Row("1", "Elastic", null, Row("company", null)),
Row("2", "Fringe Cafe", null, Row("company", null)),
Row("3", "WATIcorp", null, Row("company", null))
)
val firstChildren = Seq(
Row("10", null, "kimchy", Row("employee", "1")),
Row("20", null, "April Ryan", Row("employee", "2")),
Row("21", null, "Charlie", Row("employee", "2")),
Row("30", null, "Alvin Peats", Row("employee", "3"))
)
val index = wrapIndex("sparksql-test-scala-overwrite-join")
val typename = "join"
val (target, docPath) = makeTargets(index, typename)
RestUtils.delete(index)
RestUtils.touch(index)
if (TestUtils.isTypelessVersion(version)) {
RestUtils.putMapping(index, typename, "data/join/mapping/typeless.json")
} else {
RestUtils.putMapping(index, typename, "data/join/mapping/typed.json")
}
sqc.createDataFrame(sc.makeRDD(parents ++ firstChildren), schema)
.write
.format("es")
.options(Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))
.save(target)
assertThat(RestUtils.get(docPath + "/10?routing=1"), containsString("kimchy"))
assertThat(RestUtils.get(docPath + "/10?routing=1"), containsString(""""_routing":"1""""))
// Overwrite the data using a new dataset:
val newChildren = Seq(
Row("110", null, "costinl", Row("employee", "1")),
Row("111", null, "jbaiera", Row("employee", "1")),
Row("121", null, "Charlie", Row("employee", "2")),
Row("130", null, "Damien", Row("employee", "3"))
)
sqc.createDataFrame(sc.makeRDD(parents ++ newChildren), schema)
.write
.format("es")
.options(cfg ++ Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))
.mode(SaveMode.Overwrite)
.save(target)
assertFalse(RestUtils.exists(docPath + "/10?routing=1"))
assertThat(RestUtils.get(docPath + "/110?routing=1"), containsString("costinl"))
assertThat(RestUtils.get(docPath + "/110?routing=1"), containsString(""""_routing":"1""""))
}