def testEsDataFrame52OverwriteExistingDataSourceWithJoinField()

in spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala [1359:1425]


  def testEsDataFrame52OverwriteExistingDataSourceWithJoinField() {
    // Join added in 6.0.
    EsAssume.versionOnOrAfter(EsMajorVersion.V_6_X, "Join added in 6.0.")

    // using long-form joiner values
    val schema = StructType(Seq(
      StructField("id", StringType, nullable = false),
      StructField("company", StringType, nullable = true),
      StructField("name", StringType, nullable = true),
      StructField("joiner", StructType(Seq(
        StructField("name", StringType, nullable = false),
        StructField("parent", StringType, nullable = true)
      )))
    ))

    val parents = Seq(
      Row("1", "Elastic", null, Row("company", null)),
      Row("2", "Fringe Cafe", null, Row("company", null)),
      Row("3", "WATIcorp", null, Row("company", null))
    )

    val firstChildren = Seq(
      Row("10", null, "kimchy", Row("employee", "1")),
      Row("20", null, "April Ryan", Row("employee", "2")),
      Row("21", null, "Charlie", Row("employee", "2")),
      Row("30", null, "Alvin Peats", Row("employee", "3"))
    )

    val index = wrapIndex("sparksql-test-scala-overwrite-join")
    val typename = "join"
    val (target, docPath) = makeTargets(index, typename)
    RestUtils.delete(index)
    RestUtils.touch(index)
    if (TestUtils.isTypelessVersion(version)) {
      RestUtils.putMapping(index, typename, "data/join/mapping/typeless.json")
    } else {
      RestUtils.putMapping(index, typename, "data/join/mapping/typed.json")
    }

    sqc.createDataFrame(sc.makeRDD(parents ++ firstChildren), schema)
      .write
      .format("es")
      .options(Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))
      .save(target)

    assertThat(RestUtils.get(docPath + "/10?routing=1"), containsString("kimchy"))
    assertThat(RestUtils.get(docPath + "/10?routing=1"), containsString(""""_routing":"1""""))

    // Overwrite the data using a new dataset:
    val newChildren = Seq(
      Row("110", null, "costinl", Row("employee", "1")),
      Row("111", null, "jbaiera", Row("employee", "1")),
      Row("121", null, "Charlie", Row("employee", "2")),
      Row("130", null, "Damien", Row("employee", "3"))
    )

    sqc.createDataFrame(sc.makeRDD(parents ++ newChildren), schema)
      .write
      .format("es")
      .options(cfg ++ Map(ES_MAPPING_ID -> "id", ES_MAPPING_JOIN -> "joiner"))
      .mode(SaveMode.Overwrite)
      .save(target)

    assertFalse(RestUtils.exists(docPath + "/10?routing=1"))
    assertThat(RestUtils.get(docPath + "/110?routing=1"), containsString("costinl"))
    assertThat(RestUtils.get(docPath + "/110?routing=1"), containsString(""""_routing":"1""""))
  }