in cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java [129:174]
private static Dataset<Row> write(long rowCount, SparkConf sparkConf, SQLContext sql, SparkContext sc)
{
JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sc);
int parallelism = sc.defaultParallelism();
boolean addTTLColumn = false;
boolean addTimestampColumn = false;
JavaRDD<Row> rows = genDataset(javaSparkContext, rowCount, parallelism, addTTLColumn, addTimestampColumn);
Dataset<Row> df = sql.createDataFrame(rows, getWriteSchema(addTTLColumn, addTimestampColumn));
DataFrameWriter<Row> dfWriter = df.write()
.format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
.option("sidecar_instances", "localhost,localhost2,localhost3")
.option("keyspace", "spark_test")
.option("table", "test")
.option("local_dc", "datacenter1")
.option("bulk_writer_cl", "LOCAL_QUORUM")
.option("number_splits", "-1")
// A constant timestamp and TTL can be used by setting the following options.
// .option(WriterOptions.TTL.name(), TTLOption.constant(20))
// .option(WriterOptions.TIMESTAMP.name(), TimestampOption.constant(System.currentTimeMillis() * 1000))
.mode("append");
List<String> addedColumns = new ArrayList<>();
if (addTTLColumn)
{
addedColumns.add("ttl");
dfWriter = dfWriter
.option(WriterOptions.TTL.name(), TTLOption.perRow("ttl"));
}
if (addTimestampColumn)
{
addedColumns.add("timestamp");
dfWriter = dfWriter
.option(WriterOptions.TIMESTAMP.name(), TimestampOption.perRow("timestamp"));
}
dfWriter.save();
if (!addedColumns.isEmpty())
{
df = df.drop(addedColumns.toArray(new String[0]));
}
return df;
}