private static Dataset write()

in cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java [129:174]


    private static Dataset<Row> write(long rowCount, SparkConf sparkConf, SQLContext sql, SparkContext sc)
    {
        JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sc);
        int parallelism = sc.defaultParallelism();
        boolean addTTLColumn = false;
        boolean addTimestampColumn = false;
        JavaRDD<Row> rows = genDataset(javaSparkContext, rowCount, parallelism, addTTLColumn, addTimestampColumn);
        Dataset<Row> df = sql.createDataFrame(rows, getWriteSchema(addTTLColumn, addTimestampColumn));

        DataFrameWriter<Row> dfWriter = df.write()
                                          .format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
                                          .option("sidecar_instances", "localhost,localhost2,localhost3")
                                          .option("keyspace", "spark_test")
                                          .option("table", "test")
                                          .option("local_dc", "datacenter1")
                                          .option("bulk_writer_cl", "LOCAL_QUORUM")
                                          .option("number_splits", "-1")
                                          // A constant timestamp and TTL can be used by setting the following options.
                                          // .option(WriterOptions.TTL.name(), TTLOption.constant(20))
                                          // .option(WriterOptions.TIMESTAMP.name(), TimestampOption.constant(System.currentTimeMillis() * 1000))
                                          .mode("append");

        List<String> addedColumns = new ArrayList<>();
        if (addTTLColumn)
        {
            addedColumns.add("ttl");
            dfWriter = dfWriter
                       .option(WriterOptions.TTL.name(), TTLOption.perRow("ttl"));
        }

        if (addTimestampColumn)
        {
            addedColumns.add("timestamp");
            dfWriter = dfWriter
                       .option(WriterOptions.TIMESTAMP.name(), TimestampOption.perRow("timestamp"));
        }

        dfWriter.save();

        if (!addedColumns.isEmpty())
        {
            df = df.drop(addedColumns.toArray(new String[0]));
        }

        return df;
    }