def testJoinField()

in spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala [1669:1756]


  def testJoinField(): Unit = {
    // Join added in 6.0.
    // TODO: Available in 5.6, but we only track major version ids in the connector.
    EsAssume.versionOnOrAfter(EsMajorVersion.V_6_X, "Join added in 6.0.")

    // test mix of short-form and long-form joiner values
    val company1 = Map("id" -> "1", "company" -> "Elastic", "joiner" -> "company")
    val company2 = Map("id" -> "2", "company" -> "Fringe Cafe", "joiner" -> Map("name" -> "company"))
    val company3 = Map("id" -> "3", "company" -> "WATIcorp", "joiner" -> Map("name" -> "company"))

    val employee1 = Map("id" -> "10", "name" -> "kimchy", "joiner" -> Map("name" -> "employee", "parent" -> "1"))
    val employee2 = Map("id" -> "20", "name" -> "April Ryan", "joiner" -> Map("name" -> "employee", "parent" -> "2"))
    val employee3 = Map("id" -> "21", "name" -> "Charlie", "joiner" -> Map("name" -> "employee", "parent" -> "2"))
    val employee4 = Map("id" -> "30", "name" -> "Alvin Peats", "joiner" -> Map("name" -> "employee", "parent" -> "3"))

    val parents = Seq(company1, company2, company3)
    val children = Seq(employee1, employee2, employee3, employee4)
    val docs = parents ++ children

    {
      val index = wrapIndex("sparksql-test-scala-write-join-separate")
      val typename = "join"
      val (target, docPath) = makeTargets(index, typename)
      if (TestUtils.isTypelessVersion(version)) {
        RestUtils.putMapping(index, typename, "data/join/mapping/typeless.json")
      } else {
        RestUtils.putMapping(index, typename, "data/join/mapping/typed.json")
      }

      sc.makeRDD(parents).saveToEs(target, Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))
      sc.makeRDD(children).saveToEs(target, Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))

      assertThat(RestUtils.get(docPath + "/10?routing=1"), containsString("kimchy"))
      assertThat(RestUtils.get(docPath + "/10?routing=1"), containsString(""""_routing":"1""""))

      val df = sqc.read.format("es").load(target)
      val data = df.where(df("id").equalTo("1").or(df("id").equalTo("10"))).sort(df("id")).collect()

      {
        val record1 = data(0)
        assertNotNull(record1.getStruct(record1.fieldIndex("joiner")))
        val joiner = record1.getStruct(record1.fieldIndex("joiner"))
        assertNotNull(joiner.getString(joiner.fieldIndex("name")))
      }

      {
        val record10 = data(1)
        assertNotNull(record10.getStruct(record10.fieldIndex("joiner")))
        val joiner = record10.getStruct(record10.fieldIndex("joiner"))
        assertNotNull(joiner.getString(joiner.fieldIndex("name")))
        assertNotNull(joiner.getString(joiner.fieldIndex("parent")))
      }

    }
    {
      val index = wrapIndex("sparksql-test-scala-write-join-combined")
      val typename = "join"
      val (target, docPath) = makeTargets(index, typename)
      if (TestUtils.isTypelessVersion(version)) {
        RestUtils.putMapping(index, typename, "data/join/mapping/typeless.json")
      } else {
        RestUtils.putMapping(index, typename, "data/join/mapping/typed.json")
      }

      sc.makeRDD(docs).saveToEs(target, Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))

      assertThat(RestUtils.get(docPath + "/10?routing=1"), containsString("kimchy"))
      assertThat(RestUtils.get(docPath + "/10?routing=1"), containsString(""""_routing":"1""""))

      val df = sqc.read.format("es").load(target)
      val data = df.where(df("id").equalTo("1").or(df("id").equalTo("10"))).sort(df("id")).collect()

      {
        val record1 = data(0)
        assertNotNull(record1.getStruct(record1.fieldIndex("joiner")))
        val joiner = record1.getStruct(record1.fieldIndex("joiner"))
        assertNotNull(joiner.getString(joiner.fieldIndex("name")))
      }

      {
        val record10 = data(1)
        assertNotNull(record10.getStruct(record10.fieldIndex("joiner")))
        val joiner = record10.getStruct(record10.fieldIndex("joiner"))
        assertNotNull(joiner.getString(joiner.fieldIndex("name")))
        assertNotNull(joiner.getString(joiner.fieldIndex("parent")))
      }
    }
  }