def testEsRDDWriteJoinField()

in spark/core/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSpark.scala [310:403]


  def testEsRDDWriteJoinField(): Unit = {
    // Join added in 6.0.
    // TODO: Available in 5.6, but we only track major version ids in the connector.
    EsAssume.versionOnOrAfter(EsMajorVersion.V_6_X, "Join added in 6.0.")

    // test mix of short-form and long-form joiner values
    val company1 = Map("id" -> "1", "company" -> "Elastic", "joiner" -> "company")
    val company2 = Map("id" -> "2", "company" -> "Fringe Cafe", "joiner" -> Map("name" -> "company"))
    val company3 = Map("id" -> "3", "company" -> "WATIcorp", "joiner" -> Map("name" -> "company"))

    val employee1 = Map("id" -> "10", "name" -> "kimchy", "joiner" -> Map("name" -> "employee", "parent" -> "1"))
    val employee2 = Map("id" -> "20", "name" -> "April Ryan", "joiner" -> Map("name" -> "employee", "parent" -> "2"))
    val employee3 = Map("id" -> "21", "name" -> "Charlie", "joiner" -> Map("name" -> "employee", "parent" -> "2"))
    val employee4 = Map("id" -> "30", "name" -> "Alvin Peats", "joiner" -> Map("name" -> "employee", "parent" -> "3"))

    val parents = Seq(company1, company2, company3)
    val children = Seq(employee1, employee2, employee3, employee4)
    val docs = parents ++ children

    {
      val index = wrapIndex("spark-test-scala-write-join-separate")
      val typename = "join"
      val target = resource(index, typename, version)
      val getEndpoint = docEndpoint(index, typename, version)

      if (TestUtils.isTypelessVersion(version)) {
        RestUtils.putMapping(index, typename, "data/join/mapping/typeless.json")
      } else {
        RestUtils.putMapping(index, typename, "data/join/mapping/typed.json")
      }

      sc.makeRDD(parents).saveToEs(target, Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))
      sc.makeRDD(children).saveToEs(target, Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))

      assertThat(RestUtils.get(getEndpoint + "/10?routing=1"), containsString("kimchy"))
      assertThat(RestUtils.get(getEndpoint + "/10?routing=1"), containsString(""""_routing":"1""""))

      val data = sc.esRDD(target).collectAsMap()

      {
        assertTrue(data.contains("1"))
        val record10 = data("1")
        assertTrue(record10.contains("joiner"))
        val joiner = record10("joiner").asInstanceOf[collection.mutable.Map[String, Object]]
        assertTrue(joiner.contains("name"))
      }

      {
        assertTrue(data.contains("10"))
        val record10 = data("10")
        assertTrue(record10.contains("joiner"))
        val joiner = record10("joiner").asInstanceOf[collection.mutable.Map[String, Object]]
        assertTrue(joiner.contains("name"))
        assertTrue(joiner.contains("parent"))
      }

    }
    {
      val index = wrapIndex("spark-test-scala-write-join-combined")
      val typename = "join"
      val target = resource(index, typename, version)
      val getEndpoint = docEndpoint(index, typename, version)

      if (TestUtils.isTypelessVersion(version)) {
        RestUtils.putMapping(index, typename, "data/join/mapping/typeless.json")
      } else {
        RestUtils.putMapping(index, typename, "data/join/mapping/typed.json")
      }

      sc.makeRDD(docs).saveToEs(target, Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))

      assertThat(RestUtils.get(getEndpoint + "/10?routing=1"), containsString("kimchy"))
      assertThat(RestUtils.get(getEndpoint + "/10?routing=1"), containsString(""""_routing":"1""""))

      val data = sc.esRDD(target).collectAsMap()

      {
        assertTrue(data.contains("1"))
        val record10 = data("1")
        assertTrue(record10.contains("joiner"))
        val joiner = record10("joiner").asInstanceOf[collection.mutable.Map[String, Object]]
        assertTrue(joiner.contains("name"))
      }

      {
        assertTrue(data.contains("10"))
        val record10 = data("10")
        assertTrue(record10.contains("joiner"))
        val joiner = record10("joiner").asInstanceOf[collection.mutable.Map[String, Object]]
        assertTrue(joiner.contains("name"))
        assertTrue(joiner.contains("parent"))
      }
    }
  }