def dataFrame()

in connector/src/it/scala/com/datastax/spark/connector/sql/CassandraDataFrameDateBehaviors.scala [37:96]


  def dataFrame(timeZone: TimeZone): Unit = skipIfProtocolVersionLT(DefaultProtocolVersion.V4){

    TimeZone.setDefault(timeZone)

    val readTable = s"date_test_${timeZone.getID.toLowerCase}_read"
    val writeTable = s"date_test_${timeZone.getID.toLowerCase}_write"

    conn.withSessionDo { session =>
      createKeyspace(session)
      session.execute(s"create table $ks.$readTable (key int primary key, dd date)")
      session.execute(s"insert into $ks.$readTable (key, dd) values (1, '1930-05-31')")
      session.execute(s"create table $ks.$writeTable (key int primary key, d0 date)")
    }

    it should s"read C* LocalDate columns in ${timeZone.getID} timezone" in {
      val df = spark
        .read
        .format("org.apache.spark.sql.cassandra")
        .options(Map("table" -> readTable, "keyspace" -> ks, "cluster" -> "ClusterOne"))
        .load

      df.count should be(1)

      val foundDate = df.first.getDate(1)
      val foundLocalDate = foundDate.toLocalDate
      val foundTuple = (foundLocalDate.getYear, foundLocalDate.getMonthValue, foundLocalDate.getDayOfMonth)

      val expectedTuple = (1930, 5, 31)

      foundTuple should be(expectedTuple)
    }

    it should s"write java.sql.date to C* date columns in ${timeZone.getID} timezone" in {
      val schema = StructType(Seq(
        StructField("key", DataTypes.IntegerType),
        StructField("d0", DataTypes.DateType)
      ))

      val rows = sc.parallelize(Seq(
        Row(0, Date.valueOf("1986-01-02")),
        Row(1, Date.valueOf("1987-01-02"))
      ))

      val dataFrame = spark.createDataFrame(rows, schema)

      dataFrame.write
        .format("org.apache.spark.sql.cassandra")
        .options(Map("table" -> writeTable, "keyspace" -> ks, "cluster" -> "ClusterOne"))
        .mode("append")
        .save

      conn.withSessionDo { session =>
        val count = session.execute(s"select count(1) from $ks.$writeTable").one().getLong(0)
        count should be(2)

        val date = session.execute(s"select d0 from $ks.$writeTable where key = 0").one().getLocalDate(0)
        date should be(LocalDate.of(1986, 1, 2))
      }
    }
  }