in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/CassandraDataSink.java [59:96]
public BaseRelation createRelation(@NotNull SQLContext sqlContext,
@NotNull SaveMode saveMode,
@NotNull Map<String, String> parameters,
@NotNull Dataset<Row> data)
{
switch (saveMode)
{
case Append:
// Initialize the job group ID for later use if we need to cancel the job
// TODO: Can we get a more descriptive "description" in here from the end user somehow?
BulkWriterContext writerContext = createBulkWriterContext(
sqlContext.sparkContext(),
ScalaConversionUtils.<String, String>mapAsJavaMap(parameters),
data.schema());
try
{
JobInfo jobInfo = writerContext.job();
String description = "Cassandra Bulk Load for table " + jobInfo.getFullTableName();
CassandraBulkSourceRelation relation = new CassandraBulkSourceRelation(writerContext, sqlContext);
sqlContext.sparkContext().setJobGroup(jobInfo.getId().toString(), description, false);
relation.insert(data, false);
return relation;
}
catch (Exception exception)
{
throw new RuntimeException(exception);
}
finally
{
writerContext.shutdown();
sqlContext.sparkContext().clearJobGroup();
}
case Overwrite:
throw new LoadNotSupportedException("SaveMode.Overwrite is not supported on Cassandra as it needs privileged TRUNCATE operation");
default:
throw new LoadNotSupportedException("SaveMode." + saveMode + " is not supported");
}
}