in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java [174:241]
private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]> sortedRDD, String[] columnNames)
{
onDirectTransport(ctx -> writeValidator.setPhase("UploadAndCommit"));
onCloudStorageTransport(ctx -> {
writeValidator.setPhase("UploadToCloudStorage");
ctx.transportExtensionImplementation().onTransportStart(elapsedTimeMillis());
});
try
{
// Copy the broadcast context as a local variable (by passing as the input) to avoid serialization error
// W/o this, SerializedLambda captures the CassandraBulkSourceRelation object, which is not serializable (required by Spark),
// as a captured argument. It causes "Task not serializable" error.
List<WriteResult> writeResults = sortedRDD
.mapPartitions(writeRowsInPartition(broadcastContext, columnNames))
.collect();
// Unpersist broadcast context to free up executors while driver waits for the
// import to complete
unpersist();
List<StreamResult> streamResults = writeResults.stream()
.map(WriteResult::streamResults)
.flatMap(Collection::stream)
.collect(Collectors.toList());
long rowCount = streamResults.stream().mapToLong(res -> res.rowCount).sum();
long totalBytesWritten = streamResults.stream().mapToLong(res -> res.bytesWritten).sum();
boolean hasClusterTopologyChanged = writeResults.stream().anyMatch(WriteResult::isClusterResizeDetected);
onCloudStorageTransport(context -> waitForImportCompletion(context, rowCount, totalBytesWritten, hasClusterTopologyChanged, streamResults));
LOGGER.info("Bulk writer job complete. rowCount={} totalBytes={} hasClusterTopologyChanged={}",
rowCount, totalBytesWritten, hasClusterTopologyChanged);
publishSuccessfulJobStats(rowCount, totalBytesWritten, hasClusterTopologyChanged);
}
catch (Throwable throwable)
{
publishFailureJobStats(throwable.getMessage());
LOGGER.error("Bulk Write Failed.", throwable);
RuntimeException failure = new RuntimeException("Bulk Write to Cassandra has failed", throwable);
try
{
onCloudStorageTransport(ctx -> abortRestoreJob(ctx, throwable));
}
catch (Exception rte)
{
failure.addSuppressed(rte);
}
throw failure;
}
finally
{
try
{
simpleTaskScheduler.close();
writerContext.shutdown();
sqlContext().sparkContext().clearJobGroup();
}
catch (Exception ignored)
{
LOGGER.warn("Ignored exception during spark job shutdown.", ignored);
// We've made our best effort to close the Bulk Writer context
}
unpersist();
}
}