in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java [132:222]
public WriteResult write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceIterator)
{
TaskContext taskContext = taskContextSupplier.get();
LOGGER.info("[{}]: Processing bulk writer partition", taskContext.partitionId());
Range<BigInteger> taskTokenRange = getTokenRange(taskContext);
Preconditions.checkState(!taskTokenRange.isEmpty(),
"Token range for the partition %s is empty",
taskTokenRange);
TokenRangeMapping<RingInstance> initialTokenRangeMapping = writerContext.cluster().getTokenRangeMapping(false);
boolean isClusterBeingResized = !initialTokenRangeMapping.pendingInstances().isEmpty();
LOGGER.info("[{}]: Fetched token range mapping for keyspace: {} with write instances: {} " +
"containing pending instances: {}",
taskContext.partitionId(),
writerContext.job().qualifiedTableName().keyspace(),
initialTokenRangeMapping.allInstances().size(),
initialTokenRangeMapping.pendingInstances().size());
writeValidator.setPhase("Environment Validation");
writeValidator.validateClOrFail(initialTokenRangeMapping);
writeValidator.setPhase("UploadAndCommit");
writerContext.cluster().validateTimeSkew(taskTokenRange);
Iterator<Tuple2<DecoratedKey, Object[]>> dataIterator = new JavaInterruptibleIterator<>(taskContext, sourceIterator);
int partitionId = taskContext.partitionId();
JobInfo job = writerContext.job();
Map<String, Object> valueMap = new HashMap<>();
try
{
// preserve the order of ranges
Set<Range<BigInteger>> newRanges = new LinkedHashSet<>(initialTokenRangeMapping.getRangeMap()
.asMapOfRanges()
.keySet());
Range<BigInteger> tokenRange = getTokenRange(taskContext);
List<Range<BigInteger>> subRanges = newRanges.contains(tokenRange) ?
Collections.singletonList(tokenRange) : // no overlaps
getIntersectingSubRanges(newRanges, tokenRange); // has overlaps; split into sub-ranges
int currentRangeIndex = 0;
Range<BigInteger> currentRange = subRanges.get(currentRangeIndex);
while (dataIterator.hasNext())
{
if (streamSession != null)
{
streamSession.throwIfLastStreamFailed();
}
Tuple2<DecoratedKey, Object[]> rowData = dataIterator.next();
BigInteger token = rowData._1().getToken();
// Advance to the next range that contains the token.
// The intermediate ranges that do not contain the token will be skipped
while (!currentRange.contains(token))
{
currentRangeIndex++;
if (currentRangeIndex >= subRanges.size())
{
String errMsg = String.format("Received Token %s outside the expected ranges %s", token, subRanges);
throw new IllegalStateException(errMsg);
}
currentRange = subRanges.get(currentRangeIndex);
}
maybeSwitchToNewStreamSession(taskContext, currentRange);
writeRow(rowData, valueMap, partitionId, streamSession.getTokenRange());
}
// Finalize SSTable for the last StreamSession
if (streamSession != null)
{
flushAsync(partitionId);
}
List<StreamResult> results = waitForStreamCompletionAndValidate(partitionId, initialTokenRangeMapping, taskTokenRange);
return new WriteResult(results, isClusterBeingResized);
}
catch (Exception exception)
{
LOGGER.error("[{}] Failed to write job={}, taskStageAttemptNumber={}, taskAttemptNumber={}",
partitionId,
job.getId(),
taskContext.stageAttemptNumber(),
taskContext.attemptNumber(),
exception);
if (exception instanceof InterruptedException)
{
Thread.currentThread().interrupt();
}
throw new RuntimeException(exception);
}
}