in src/java/org/apache/cassandra/service/StorageProxy.java [1388:1549]
public static void mutateAtomically(List<Mutation> mutations,
ConsistencyLevel consistencyLevel,
boolean requireQuorumForRemove,
Dispatcher.RequestTime requestTime)
throws UnavailableException, OverloadedException, WriteTimeoutException
{
Tracing.trace("Determining replicas for atomic batch");
long startTime = nanoTime();
boolean attributeNonAccordLatency = true;
long nonAccordEndTime = -1;
if (mutations.stream().anyMatch(mutation -> Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy().hasTransientReplicas()))
throw new AssertionError("Logged batches are unsupported with transient replication");
try
{
ConsistencyLevel batchConsistencyLevel = consistencyLevelForBatchLog(consistencyLevel, requireQuorumForRemove);
// This can't be updated for each iteration because cleanup has to go to the correct replicas which is where the batchlog is originally written
ReplicaPlan.ForWrite batchlogReplicaPlan = ReplicaPlans.forBatchlogWrite(ClusterMetadata.current(), batchConsistencyLevel == ConsistencyLevel.ANY);
final TimeUUID batchUUID = nextTimeUUID();
boolean wroteToBatchLog = false;
while (true)
{
ClusterMetadata cm = ClusterMetadata.current();
// In case we hit an error in before/during splitting
attributeNonAccordLatency = true;
List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size());
List<Mutation> accordMutations = new ArrayList<>(mutations.size());
BatchlogCleanup cleanup = new BatchlogCleanup(() -> asyncRemoveFromBatchlog(batchlogReplicaPlan, batchUUID, requestTime));
// add a handler for each mutation that will not be written on Accord - includes checking availability, but doesn't initiate any writes, yet
SplitConsumer<Mutation> splitConsumer = (accordMutation, normalMutation, originalMutations, mutationIndex) -> {
Mutation eitherMutation = normalMutation != null ? normalMutation : accordMutation;
Keyspace keyspace = Keyspace.open(eitherMutation.getKeyspaceName());
Token tk = eitherMutation.key().getToken();
if (accordMutation != null)
accordMutations.add(accordMutation);
if (normalMutation == null)
return;
// Always construct the replica plan to check availability
ReplicaPlan.ForWrite dataReplicaPlan = ReplicaPlans.forWrite(cm, keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
if (dataReplicaPlan.lookup(FBUtilities.getBroadcastAddressAndPort()) != null)
writeMetrics.localRequests.mark();
else
writeMetrics.remoteRequests.mark();
WriteResponseHandlerWrapper wrapper = wrapBatchResponseHandler(normalMutation,
dataReplicaPlan,
batchConsistencyLevel,
WriteType.BATCH,
cleanup,
requestTime);
wrappers.add(wrapper);
};
splitMutationsIntoAccordAndNormal(cm, mutations, splitConsumer);
attributeNonAccordLatency = !wrappers.isEmpty();
cleanup.setMutationsWaitingFor(wrappers.size() + (accordMutations.isEmpty() ? 0 : 1));
Tracing.trace("Split batch into Accord {} and normal {}", accordMutations, wrappers);
// If the entire batch can execute on Accord then we can skip the batch log entirely
// Write to the batch log first in case it fails so we don't end up with Accord applying
// part of the batch independently
if (!wrappers.isEmpty() && !wroteToBatchLog)
{
// write to the batchlog, including writes that will be routed to Accord to preserve the behavior
// of the batch log where if part of a batch is visible then eventually the entire batch is visible.
// If the Accord routed mutations depend on the Accord txn succeeding then it is no longer consistent
// with the mutations delivered by the batch log since an unacknowledged Accord txn won't be retried
// unless those mutations are also written to the batch log
// Only write to the log once and reuse the batchUUID for every attempt to route the mutations correctly
doFallibleWriteWithMetricTracking(() -> syncWriteToBatchlog(mutations, batchlogReplicaPlan, batchUUID, requestTime), consistencyLevel);
Tracing.trace("Successfully wrote to batchlog");
wroteToBatchLog = true;
}
// Start Accord executing so it executes while the mutations are synchronously applied
IAccordResult<TxnResult> accordResult = !accordMutations.isEmpty() ? mutateWithAccordAsync(cm, accordMutations, consistencyLevel, requestTime) : null;
Throwable failure = null;
try
{
// now actually perform the writes and wait for them to complete
if (!wrappers.isEmpty())
{
doFallibleWriteWithMetricTracking(() -> syncWriteBatchedMutations(wrappers, Stage.MUTATION, requestTime), consistencyLevel);
Tracing.trace("Successfully wrote normal mutations");
}
}
catch (RetryOnDifferentSystemException e)
{
writeMetrics.retryDifferentSystem.mark();
writeMetricsForLevel(consistencyLevel).retryDifferentSystem.mark();
logger.debug("Retrying batch txn on different system because some mutations were misrouted");
Tracing.trace("Got {} from normal mutations, will retry", e);
continue;
}
catch (CoordinatorBehindException e)
{
writeMetrics.retryCoordinatorBehind.mark();
writeMetricsForLevel(consistencyLevel).retryCoordinatorBehind.mark();
mutations.forEach(IMutation::clearCachedSerializationsForRetry);
logger.debug("Retrying batch now that coordinator has caught up to cluster metadata");
Tracing.trace("Got {} from normal mutations, will retry", e);
continue;
}
catch (Exception e)
{
failure = Throwables.merge(failure, e);
}
finally
{
// Try to exclude most of the Accord time
nonAccordEndTime = nanoTime();
}
// Check if the Accord mutations succeeded asynchronously
try
{
// It's notable here that the Accord portion of the batch will not be hinted
// while the regular mutations are hinted on failure and also going to be replayed later from
// the batch log. It wouldn't be difficult to add hinting here, but it does seem redundant with
// the batch log.
if (accordResult != null)
{
TxnResult.Kind kind = accordResult.awaitAndGet().kind();
if (kind == retry_new_protocol && failure == null)
continue;
Tracing.trace("Successfully wrote Accord mutations");
cleanup.ackMutation();
}
}
catch (Exception e)
{
failure = Throwables.merge(failure, e);
}
if (failure != null)
throw unchecked(failure);
break;
}
}
catch (Exception t)
{
// Unexpected error so it would be helpful to have details
Tracing.trace("{}", getStackTraceAsToString(t));
throw t;
}
finally
{
if (attributeNonAccordLatency)
{
// On the exception path nonAccordEndTime will be -1
long latency = nonAccordEndTime != -1 ? nonAccordEndTime : nanoTime() - startTime;
writeMetrics.addNano(latency);
writeMetricsForLevel(consistencyLevel).addNano(latency);
updateCoordinatorWriteLatencyTableMetric(mutations, latency);
}
}
}