in cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/AbstractCassandraJob.java [82:136]
public void start(String[] args)
{
logger.info("Starting Spark job with args={}", Arrays.toString(args));
SparkConf sparkConf = new SparkConf().setAppName("Sample Spark Cassandra Bulk Reader Job")
.set("spark.master", "local[8]");
// Add SBW-specific settings
// TODO: simplify setting up spark conf
BulkSparkConf.setupSparkConf(sparkConf, true);
KryoRegister.setup(sparkConf);
SparkSession spark = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();
SparkContext sc = spark.sparkContext();
SQLContext sql = spark.sqlContext();
logger.info("Spark Conf: " + sparkConf.toDebugString());
configuration = configureJob(sc, sparkConf);
long rowCount = configuration.rowCount;
try
{
Dataset<Row> written = null;
if (configuration.shouldWrite())
{
written = write(rowCount, sql, sc);
}
Dataset<Row> read = null;
if (configuration.shouldRead())
{
read = read(rowCount, sql);
}
if (configuration.shouldWrite() && configuration.shouldRead())
{
checkSmallDataFrameEquality(written, read);
}
logger.info("Finished Spark job, shutting down...");
sc.stop();
}
catch (Throwable throwable)
{
logger.error("Unexpected exception executing Spark job", throwable);
try
{
sc.stop();
}
catch (Throwable ignored)
{
}
}
}