public void start()

in cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/AbstractCassandraJob.java [82:136]


    public void start(String[] args)
    {
        logger.info("Starting Spark job with args={}", Arrays.toString(args));

        SparkConf sparkConf = new SparkConf().setAppName("Sample Spark Cassandra Bulk Reader Job")
                                             .set("spark.master", "local[8]");

        // Add SBW-specific settings
        // TODO: simplify setting up spark conf
        BulkSparkConf.setupSparkConf(sparkConf, true);
        KryoRegister.setup(sparkConf);

        SparkSession spark = SparkSession
                             .builder()
                             .config(sparkConf)
                             .getOrCreate();
        SparkContext sc = spark.sparkContext();
        SQLContext sql = spark.sqlContext();
        logger.info("Spark Conf: " + sparkConf.toDebugString());

        configuration = configureJob(sc, sparkConf);
        long rowCount = configuration.rowCount;

        try
        {
            Dataset<Row> written = null;
            if (configuration.shouldWrite())
            {
                written = write(rowCount, sql, sc);
            }
            Dataset<Row> read = null;
            if (configuration.shouldRead())
            {
                read = read(rowCount, sql);
            }

            if (configuration.shouldWrite() && configuration.shouldRead())
            {
                checkSmallDataFrameEquality(written, read);
            }
            logger.info("Finished Spark job, shutting down...");
            sc.stop();
        }
        catch (Throwable throwable)
        {
            logger.error("Unexpected exception executing Spark job", throwable);
            try
            {
                sc.stop();
            }
            catch (Throwable ignored)
            {
            }
        }
    }