in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java [92:135]
public StreamResult write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceIterator)
{
TaskContext taskContext = taskContextSupplier.get();
LOGGER.info("[{}]: Processing Bulk Writer partition", taskContext.partitionId());
scala.collection.Iterator<scala.Tuple2<DecoratedKey, Object[]>> dataIterator =
new InterruptibleIterator<>(taskContext, asScalaIterator(sourceIterator));
StreamSession streamSession = createStreamSession(taskContext);
validateAcceptableTimeSkewOrThrow(streamSession.replicas);
int partitionId = taskContext.partitionId();
Range<BigInteger> range = getTokenRange(taskContext);
JobInfo job = writerContext.job();
Path baseDir = Paths.get(System.getProperty("java.io.tmpdir"),
job.getId().toString(),
Integer.toString(taskContext.stageAttemptNumber()),
Integer.toString(taskContext.attemptNumber()),
Integer.toString(partitionId));
Map<String, Object> valueMap = new HashMap<>();
try
{
while (dataIterator.hasNext())
{
maybeCreateTableWriter(partitionId, baseDir);
writeRow(valueMap, dataIterator, partitionId, range);
checkBatchSize(streamSession, partitionId, job);
}
if (sstableWriter != null)
{
finalizeSSTable(streamSession, partitionId, sstableWriter, batchNumber, batchSize);
}
LOGGER.info("[{}] Done with all writers and waiting for stream to complete", partitionId);
return streamSession.close();
}
catch (Exception exception)
{
LOGGER.error("[{}] Failed to write job={}, taskStageAttemptNumber={}, taskAttemptNumber={}",
partitionId,
job.getId().toString(),
taskContext.stageAttemptNumber(),
taskContext.attemptNumber());
throw new RuntimeException(exception);
}
}