public WriteResult write()

in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java [132:222]


    public WriteResult write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceIterator)
    {
        TaskContext taskContext = taskContextSupplier.get();
        LOGGER.info("[{}]: Processing bulk writer partition", taskContext.partitionId());

        Range<BigInteger> taskTokenRange = getTokenRange(taskContext);
        Preconditions.checkState(!taskTokenRange.isEmpty(),
                                 "Token range for the partition %s is empty",
                                 taskTokenRange);

        TokenRangeMapping<RingInstance> initialTokenRangeMapping = writerContext.cluster().getTokenRangeMapping(false);
        boolean isClusterBeingResized = !initialTokenRangeMapping.pendingInstances().isEmpty();
        LOGGER.info("[{}]: Fetched token range mapping for keyspace: {} with write instances: {} " +
                    "containing pending instances: {}",
                    taskContext.partitionId(),
                    writerContext.job().qualifiedTableName().keyspace(),
                    initialTokenRangeMapping.allInstances().size(),
                    initialTokenRangeMapping.pendingInstances().size());

        writeValidator.setPhase("Environment Validation");
        writeValidator.validateClOrFail(initialTokenRangeMapping);
        writeValidator.setPhase("UploadAndCommit");
        writerContext.cluster().validateTimeSkew(taskTokenRange);

        Iterator<Tuple2<DecoratedKey, Object[]>> dataIterator = new JavaInterruptibleIterator<>(taskContext, sourceIterator);
        int partitionId = taskContext.partitionId();
        JobInfo job = writerContext.job();
        Map<String, Object> valueMap = new HashMap<>();

        try
        {
            // preserve the order of ranges
            Set<Range<BigInteger>> newRanges = new LinkedHashSet<>(initialTokenRangeMapping.getRangeMap()
                                                                                           .asMapOfRanges()
                                                                                           .keySet());
            Range<BigInteger> tokenRange = getTokenRange(taskContext);
            List<Range<BigInteger>> subRanges = newRanges.contains(tokenRange) ?
                                                Collections.singletonList(tokenRange) : // no overlaps
                                                getIntersectingSubRanges(newRanges, tokenRange); // has overlaps; split into sub-ranges

            int currentRangeIndex = 0;
            Range<BigInteger> currentRange = subRanges.get(currentRangeIndex);
            while (dataIterator.hasNext())
            {
                if (streamSession != null)
                {
                    streamSession.throwIfLastStreamFailed();
                }
                Tuple2<DecoratedKey, Object[]> rowData = dataIterator.next();
                BigInteger token = rowData._1().getToken();
                // Advance to the next range that contains the token.
                // The intermediate ranges that do not contain the token will be skipped
                while (!currentRange.contains(token))
                {
                    currentRangeIndex++;
                    if (currentRangeIndex >= subRanges.size())
                    {
                        String errMsg = String.format("Received Token %s outside the expected ranges %s", token, subRanges);
                        throw new IllegalStateException(errMsg);
                    }
                    currentRange = subRanges.get(currentRangeIndex);
                }
                maybeSwitchToNewStreamSession(taskContext, currentRange);
                writeRow(rowData, valueMap, partitionId, streamSession.getTokenRange());
            }

            // Finalize SSTable for the last StreamSession
            if (streamSession != null)
            {
                flushAsync(partitionId);
            }

            List<StreamResult> results = waitForStreamCompletionAndValidate(partitionId, initialTokenRangeMapping, taskTokenRange);
            return new WriteResult(results, isClusterBeingResized);
        }
        catch (Exception exception)
        {
            LOGGER.error("[{}] Failed to write job={}, taskStageAttemptNumber={}, taskAttemptNumber={}",
                         partitionId,
                         job.getId(),
                         taskContext.stageAttemptNumber(),
                         taskContext.attemptNumber(),
                         exception);

            if (exception instanceof InterruptedException)
            {
                Thread.currentThread().interrupt();
            }
            throw new RuntimeException(exception);
        }
    }