in spark/core/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSpark.scala [310:403]
def testEsRDDWriteJoinField(): Unit = {
// Join added in 6.0.
// TODO: Available in 5.6, but we only track major version ids in the connector.
EsAssume.versionOnOrAfter(EsMajorVersion.V_6_X, "Join added in 6.0.")
// test mix of short-form and long-form joiner values
val company1 = Map("id" -> "1", "company" -> "Elastic", "joiner" -> "company")
val company2 = Map("id" -> "2", "company" -> "Fringe Cafe", "joiner" -> Map("name" -> "company"))
val company3 = Map("id" -> "3", "company" -> "WATIcorp", "joiner" -> Map("name" -> "company"))
val employee1 = Map("id" -> "10", "name" -> "kimchy", "joiner" -> Map("name" -> "employee", "parent" -> "1"))
val employee2 = Map("id" -> "20", "name" -> "April Ryan", "joiner" -> Map("name" -> "employee", "parent" -> "2"))
val employee3 = Map("id" -> "21", "name" -> "Charlie", "joiner" -> Map("name" -> "employee", "parent" -> "2"))
val employee4 = Map("id" -> "30", "name" -> "Alvin Peats", "joiner" -> Map("name" -> "employee", "parent" -> "3"))
val parents = Seq(company1, company2, company3)
val children = Seq(employee1, employee2, employee3, employee4)
val docs = parents ++ children
{
val index = wrapIndex("spark-test-scala-write-join-separate")
val typename = "join"
val target = resource(index, typename, version)
val getEndpoint = docEndpoint(index, typename, version)
if (TestUtils.isTypelessVersion(version)) {
RestUtils.putMapping(index, typename, "data/join/mapping/typeless.json")
} else {
RestUtils.putMapping(index, typename, "data/join/mapping/typed.json")
}
sc.makeRDD(parents).saveToEs(target, Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))
sc.makeRDD(children).saveToEs(target, Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))
assertThat(RestUtils.get(getEndpoint + "/10?routing=1"), containsString("kimchy"))
assertThat(RestUtils.get(getEndpoint + "/10?routing=1"), containsString(""""_routing":"1""""))
val data = sc.esRDD(target).collectAsMap()
{
assertTrue(data.contains("1"))
val record10 = data("1")
assertTrue(record10.contains("joiner"))
val joiner = record10("joiner").asInstanceOf[collection.mutable.Map[String, Object]]
assertTrue(joiner.contains("name"))
}
{
assertTrue(data.contains("10"))
val record10 = data("10")
assertTrue(record10.contains("joiner"))
val joiner = record10("joiner").asInstanceOf[collection.mutable.Map[String, Object]]
assertTrue(joiner.contains("name"))
assertTrue(joiner.contains("parent"))
}
}
{
val index = wrapIndex("spark-test-scala-write-join-combined")
val typename = "join"
val target = resource(index, typename, version)
val getEndpoint = docEndpoint(index, typename, version)
if (TestUtils.isTypelessVersion(version)) {
RestUtils.putMapping(index, typename, "data/join/mapping/typeless.json")
} else {
RestUtils.putMapping(index, typename, "data/join/mapping/typed.json")
}
sc.makeRDD(docs).saveToEs(target, Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))
assertThat(RestUtils.get(getEndpoint + "/10?routing=1"), containsString("kimchy"))
assertThat(RestUtils.get(getEndpoint + "/10?routing=1"), containsString(""""_routing":"1""""))
val data = sc.esRDD(target).collectAsMap()
{
assertTrue(data.contains("1"))
val record10 = data("1")
assertTrue(record10.contains("joiner"))
val joiner = record10("joiner").asInstanceOf[collection.mutable.Map[String, Object]]
assertTrue(joiner.contains("name"))
}
{
assertTrue(data.contains("10"))
val record10 = data("10")
assertTrue(record10.contains("joiner"))
val joiner = record10("joiner").asInstanceOf[collection.mutable.Map[String, Object]]
assertTrue(joiner.contains("name"))
assertTrue(joiner.contains("parent"))
}
}
}