private void persist()

in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java [174:241]


    private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]> sortedRDD, String[] columnNames)
    {
        onDirectTransport(ctx -> writeValidator.setPhase("UploadAndCommit"));
        onCloudStorageTransport(ctx -> {
            writeValidator.setPhase("UploadToCloudStorage");
            ctx.transportExtensionImplementation().onTransportStart(elapsedTimeMillis());
        });

        try
        {
            // Copy the broadcast context as a local variable (by passing as the input) to avoid serialization error
            // W/o this, SerializedLambda captures the CassandraBulkSourceRelation object, which is not serializable (required by Spark),
            // as a captured argument. It causes "Task not serializable" error.
            List<WriteResult> writeResults = sortedRDD
                                             .mapPartitions(writeRowsInPartition(broadcastContext, columnNames))
                                             .collect();

            // Unpersist broadcast context to free up executors while driver waits for the
            // import to complete
            unpersist();

            List<StreamResult> streamResults = writeResults.stream()
                                                           .map(WriteResult::streamResults)
                                                           .flatMap(Collection::stream)
                                                           .collect(Collectors.toList());

            long rowCount = streamResults.stream().mapToLong(res -> res.rowCount).sum();
            long totalBytesWritten = streamResults.stream().mapToLong(res -> res.bytesWritten).sum();
            boolean hasClusterTopologyChanged = writeResults.stream().anyMatch(WriteResult::isClusterResizeDetected);

            onCloudStorageTransport(context -> waitForImportCompletion(context, rowCount, totalBytesWritten, hasClusterTopologyChanged, streamResults));

            LOGGER.info("Bulk writer job complete. rowCount={} totalBytes={} hasClusterTopologyChanged={}",
                        rowCount, totalBytesWritten, hasClusterTopologyChanged);
            publishSuccessfulJobStats(rowCount, totalBytesWritten, hasClusterTopologyChanged);
        }
        catch (Throwable throwable)
        {
            publishFailureJobStats(throwable.getMessage());
            LOGGER.error("Bulk Write Failed.", throwable);
            RuntimeException failure = new RuntimeException("Bulk Write to Cassandra has failed", throwable);
            try
            {
                onCloudStorageTransport(ctx -> abortRestoreJob(ctx, throwable));
            }
            catch (Exception rte)
            {
                failure.addSuppressed(rte);
            }

            throw failure;
        }
        finally
        {
            try
            {
                simpleTaskScheduler.close();
                writerContext.shutdown();
                sqlContext().sparkContext().clearJobGroup();
            }
            catch (Exception ignored)
            {
                LOGGER.warn("Ignored exception during spark job shutdown.", ignored);
                // We've made our best effort to close the Bulk Writer context
            }
            unpersist();
        }
    }