public StreamResult write()

in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java [92:135]


    public StreamResult write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceIterator)
    {
        TaskContext taskContext = taskContextSupplier.get();
        LOGGER.info("[{}]: Processing Bulk Writer partition", taskContext.partitionId());
        scala.collection.Iterator<scala.Tuple2<DecoratedKey, Object[]>> dataIterator =
        new InterruptibleIterator<>(taskContext, asScalaIterator(sourceIterator));
        StreamSession streamSession = createStreamSession(taskContext);
        validateAcceptableTimeSkewOrThrow(streamSession.replicas);
        int partitionId = taskContext.partitionId();
        Range<BigInteger> range = getTokenRange(taskContext);
        JobInfo job = writerContext.job();
        Path baseDir = Paths.get(System.getProperty("java.io.tmpdir"),
                                 job.getId().toString(),
                                 Integer.toString(taskContext.stageAttemptNumber()),
                                 Integer.toString(taskContext.attemptNumber()),
                                 Integer.toString(partitionId));
        Map<String, Object> valueMap = new HashMap<>();
        try
        {
            while (dataIterator.hasNext())
            {
                maybeCreateTableWriter(partitionId, baseDir);
                writeRow(valueMap, dataIterator, partitionId, range);
                checkBatchSize(streamSession, partitionId, job);
            }

            if (sstableWriter != null)
            {
                finalizeSSTable(streamSession, partitionId, sstableWriter, batchNumber, batchSize);
            }

            LOGGER.info("[{}] Done with all writers and waiting for stream to complete", partitionId);
            return streamSession.close();
        }
        catch (Exception exception)
        {
            LOGGER.error("[{}] Failed to write job={}, taskStageAttemptNumber={}, taskAttemptNumber={}",
                         partitionId,
                         job.getId().toString(),
                         taskContext.stageAttemptNumber(),
                         taskContext.attemptNumber());
            throw new RuntimeException(exception);
        }
    }