private void persist()

in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java [124:169]


    private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]> sortedRDD, String[] columnNames)
    {
        writeValidator.setPhase("Environment Validation");
        writeValidator.validateInitialEnvironment();
        writeValidator.setPhase("UploadAndCommit");

        try
        {
            sortedRDD.foreachPartition(writeRowsInPartition(broadcastContext, columnNames));
            writeValidator.failIfRingChanged();
        }
        catch (Throwable throwable)
        {
            LOGGER.error("Bulk Write Failed", throwable);
            throw new RuntimeException("Bulk Write to Cassandra has failed", throwable);
        }
        finally
        {
            writeValidator.close();  // Uses the MgrClient, so needs to stop first
            try
            {
                writerContext.shutdown();
                sqlContext().sparkContext().clearJobGroup();
            }
            catch (Exception ignored)
            {
                // We've made our best effort to close the Bulk Writer context
            }
            try
            {
                broadcastContext.unpersist(false);
            }
            catch (Throwable throwable)
            {
                if (NonFatal$.MODULE$.apply(throwable))
                {
                    LOGGER.error("Uncaught exception in thread {} attempting to unpersist broadcast variable",
                                 Thread.currentThread().getName(), throwable);
                }
                else
                {
                    throw throwable;
                }
            }
        }
    }