in spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala [1669:1756]
def testJoinField(): Unit = {
// Join added in 6.0.
// TODO: Available in 5.6, but we only track major version ids in the connector.
EsAssume.versionOnOrAfter(EsMajorVersion.V_6_X, "Join added in 6.0.")
// test mix of short-form and long-form joiner values
val company1 = Map("id" -> "1", "company" -> "Elastic", "joiner" -> "company")
val company2 = Map("id" -> "2", "company" -> "Fringe Cafe", "joiner" -> Map("name" -> "company"))
val company3 = Map("id" -> "3", "company" -> "WATIcorp", "joiner" -> Map("name" -> "company"))
val employee1 = Map("id" -> "10", "name" -> "kimchy", "joiner" -> Map("name" -> "employee", "parent" -> "1"))
val employee2 = Map("id" -> "20", "name" -> "April Ryan", "joiner" -> Map("name" -> "employee", "parent" -> "2"))
val employee3 = Map("id" -> "21", "name" -> "Charlie", "joiner" -> Map("name" -> "employee", "parent" -> "2"))
val employee4 = Map("id" -> "30", "name" -> "Alvin Peats", "joiner" -> Map("name" -> "employee", "parent" -> "3"))
val parents = Seq(company1, company2, company3)
val children = Seq(employee1, employee2, employee3, employee4)
val docs = parents ++ children
{
val index = wrapIndex("sparksql-test-scala-write-join-separate")
val typename = "join"
val (target, docPath) = makeTargets(index, typename)
if (TestUtils.isTypelessVersion(version)) {
RestUtils.putMapping(index, typename, "data/join/mapping/typeless.json")
} else {
RestUtils.putMapping(index, typename, "data/join/mapping/typed.json")
}
sc.makeRDD(parents).saveToEs(target, Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))
sc.makeRDD(children).saveToEs(target, Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))
assertThat(RestUtils.get(docPath + "/10?routing=1"), containsString("kimchy"))
assertThat(RestUtils.get(docPath + "/10?routing=1"), containsString(""""_routing":"1""""))
val df = sqc.read.format("es").load(target)
val data = df.where(df("id").equalTo("1").or(df("id").equalTo("10"))).sort(df("id")).collect()
{
val record1 = data(0)
assertNotNull(record1.getStruct(record1.fieldIndex("joiner")))
val joiner = record1.getStruct(record1.fieldIndex("joiner"))
assertNotNull(joiner.getString(joiner.fieldIndex("name")))
}
{
val record10 = data(1)
assertNotNull(record10.getStruct(record10.fieldIndex("joiner")))
val joiner = record10.getStruct(record10.fieldIndex("joiner"))
assertNotNull(joiner.getString(joiner.fieldIndex("name")))
assertNotNull(joiner.getString(joiner.fieldIndex("parent")))
}
}
{
val index = wrapIndex("sparksql-test-scala-write-join-combined")
val typename = "join"
val (target, docPath) = makeTargets(index, typename)
if (TestUtils.isTypelessVersion(version)) {
RestUtils.putMapping(index, typename, "data/join/mapping/typeless.json")
} else {
RestUtils.putMapping(index, typename, "data/join/mapping/typed.json")
}
sc.makeRDD(docs).saveToEs(target, Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))
assertThat(RestUtils.get(docPath + "/10?routing=1"), containsString("kimchy"))
assertThat(RestUtils.get(docPath + "/10?routing=1"), containsString(""""_routing":"1""""))
val df = sqc.read.format("es").load(target)
val data = df.where(df("id").equalTo("1").or(df("id").equalTo("10"))).sort(df("id")).collect()
{
val record1 = data(0)
assertNotNull(record1.getStruct(record1.fieldIndex("joiner")))
val joiner = record1.getStruct(record1.fieldIndex("joiner"))
assertNotNull(joiner.getString(joiner.fieldIndex("name")))
}
{
val record10 = data(1)
assertNotNull(record10.getStruct(record10.fieldIndex("joiner")))
val joiner = record10.getStruct(record10.fieldIndex("joiner"))
assertNotNull(joiner.getString(joiner.fieldIndex("name")))
assertNotNull(joiner.getString(joiner.fieldIndex("parent")))
}
}
}