in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java [124:169]
private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]> sortedRDD, String[] columnNames)
{
writeValidator.setPhase("Environment Validation");
writeValidator.validateInitialEnvironment();
writeValidator.setPhase("UploadAndCommit");
try
{
sortedRDD.foreachPartition(writeRowsInPartition(broadcastContext, columnNames));
writeValidator.failIfRingChanged();
}
catch (Throwable throwable)
{
LOGGER.error("Bulk Write Failed", throwable);
throw new RuntimeException("Bulk Write to Cassandra has failed", throwable);
}
finally
{
writeValidator.close(); // Uses the MgrClient, so needs to stop first
try
{
writerContext.shutdown();
sqlContext().sparkContext().clearJobGroup();
}
catch (Exception ignored)
{
// We've made our best effort to close the Bulk Writer context
}
try
{
broadcastContext.unpersist(false);
}
catch (Throwable throwable)
{
if (NonFatal$.MODULE$.apply(throwable))
{
LOGGER.error("Uncaught exception in thread {} attempting to unpersist broadcast variable",
Thread.currentThread().getName(), throwable);
}
else
{
throw throwable;
}
}
}
}