public static void mutateAtomically()

in src/java/org/apache/cassandra/service/StorageProxy.java [1388:1549]


    public static void mutateAtomically(List<Mutation> mutations,
                                        ConsistencyLevel consistencyLevel,
                                        boolean requireQuorumForRemove,
                                        Dispatcher.RequestTime requestTime)
    throws UnavailableException, OverloadedException, WriteTimeoutException
    {
        Tracing.trace("Determining replicas for atomic batch");
        long startTime = nanoTime();
        boolean attributeNonAccordLatency = true;
        long nonAccordEndTime = -1;

        if (mutations.stream().anyMatch(mutation -> Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy().hasTransientReplicas()))
            throw new AssertionError("Logged batches are unsupported with transient replication");

        try
        {
            ConsistencyLevel batchConsistencyLevel = consistencyLevelForBatchLog(consistencyLevel, requireQuorumForRemove);
            // This can't be updated for each iteration because cleanup has to go to the correct replicas which is where the batchlog is originally written
            ReplicaPlan.ForWrite batchlogReplicaPlan = ReplicaPlans.forBatchlogWrite(ClusterMetadata.current(), batchConsistencyLevel == ConsistencyLevel.ANY);
            final TimeUUID batchUUID = nextTimeUUID();
            boolean wroteToBatchLog = false;
            while (true)
            {
                ClusterMetadata cm = ClusterMetadata.current();
                // In case we hit an error in before/during splitting
                attributeNonAccordLatency = true;
                List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size());
                List<Mutation> accordMutations = new ArrayList<>(mutations.size());
                BatchlogCleanup cleanup = new BatchlogCleanup(() -> asyncRemoveFromBatchlog(batchlogReplicaPlan, batchUUID, requestTime));

                // add a handler for each mutation that will not be written on Accord - includes checking availability, but doesn't initiate any writes, yet
                SplitConsumer<Mutation> splitConsumer = (accordMutation, normalMutation, originalMutations, mutationIndex) -> {
                    Mutation eitherMutation = normalMutation != null ? normalMutation : accordMutation;
                    Keyspace keyspace = Keyspace.open(eitherMutation.getKeyspaceName());
                    Token tk = eitherMutation.key().getToken();

                    if (accordMutation != null)
                        accordMutations.add(accordMutation);

                    if (normalMutation == null)
                        return;

                    // Always construct the replica plan to check availability
                    ReplicaPlan.ForWrite dataReplicaPlan = ReplicaPlans.forWrite(cm, keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);

                    if (dataReplicaPlan.lookup(FBUtilities.getBroadcastAddressAndPort()) != null)
                        writeMetrics.localRequests.mark();
                    else
                        writeMetrics.remoteRequests.mark();

                    WriteResponseHandlerWrapper wrapper = wrapBatchResponseHandler(normalMutation,
                                                                                   dataReplicaPlan,
                                                                                   batchConsistencyLevel,
                                                                                   WriteType.BATCH,
                                                                                   cleanup,
                                                                                   requestTime);
                    wrappers.add(wrapper);
                };
                splitMutationsIntoAccordAndNormal(cm, mutations,  splitConsumer);
                attributeNonAccordLatency = !wrappers.isEmpty();
                cleanup.setMutationsWaitingFor(wrappers.size() + (accordMutations.isEmpty() ? 0 : 1));
                Tracing.trace("Split batch into Accord {} and normal {}", accordMutations, wrappers);

                // If the entire batch can execute on Accord then we can skip the batch log entirely
                // Write to the batch log first in case it fails so we don't end up with Accord applying
                // part of the batch independently
                if (!wrappers.isEmpty() && !wroteToBatchLog)
                {
                    // write to the batchlog, including writes that will be routed to Accord to preserve the behavior
                    // of the batch log where if part of a batch is visible then eventually the entire batch is visible.
                    // If the Accord routed mutations depend on the Accord txn succeeding then it is no longer consistent
                    // with the mutations delivered by the batch log since an unacknowledged Accord txn won't be retried
                    // unless those mutations are also written to the batch log
                    // Only write to the log once and reuse the batchUUID for every attempt to route the mutations correctly
                    doFallibleWriteWithMetricTracking(() -> syncWriteToBatchlog(mutations, batchlogReplicaPlan, batchUUID, requestTime), consistencyLevel);
                    Tracing.trace("Successfully wrote to batchlog");
                    wroteToBatchLog = true;
                }

                // Start Accord executing so it executes while the mutations are synchronously applied
                IAccordResult<TxnResult> accordResult = !accordMutations.isEmpty() ? mutateWithAccordAsync(cm, accordMutations, consistencyLevel, requestTime) : null;

                Throwable failure = null;
                try
                {
                    // now actually perform the writes and wait for them to complete
                    if (!wrappers.isEmpty())
                    {
                        doFallibleWriteWithMetricTracking(() -> syncWriteBatchedMutations(wrappers, Stage.MUTATION, requestTime), consistencyLevel);
                        Tracing.trace("Successfully wrote normal mutations");
                    }
                }
                catch (RetryOnDifferentSystemException e)
                {
                    writeMetrics.retryDifferentSystem.mark();
                    writeMetricsForLevel(consistencyLevel).retryDifferentSystem.mark();
                    logger.debug("Retrying batch txn on different system because some mutations were misrouted");
                    Tracing.trace("Got {} from normal mutations, will retry", e);
                    continue;
                }
                catch (CoordinatorBehindException e)
                {
                    writeMetrics.retryCoordinatorBehind.mark();
                    writeMetricsForLevel(consistencyLevel).retryCoordinatorBehind.mark();
                    mutations.forEach(IMutation::clearCachedSerializationsForRetry);
                    logger.debug("Retrying batch now that coordinator has caught up to cluster metadata");
                    Tracing.trace("Got {} from normal mutations, will retry", e);
                    continue;
                }
                catch (Exception e)
                {
                    failure = Throwables.merge(failure, e);
                }
                finally
                {
                    // Try to exclude most of the Accord time
                    nonAccordEndTime = nanoTime();
                }

                // Check if the Accord mutations succeeded asynchronously
                try
                {
                    // It's notable here that the Accord portion of the batch will not be hinted
                    // while the regular mutations are hinted on failure and also going to be replayed later from
                    // the batch log. It wouldn't be difficult to add hinting here, but it does seem redundant with
                    // the batch log.
                    if (accordResult != null)
                    {
                        TxnResult.Kind kind = accordResult.awaitAndGet().kind();
                        if (kind == retry_new_protocol && failure == null)
                            continue;
                        Tracing.trace("Successfully wrote Accord mutations");
                        cleanup.ackMutation();
                    }
                }
                catch (Exception e)
                {
                    failure = Throwables.merge(failure, e);
                }
                if (failure != null)
                    throw unchecked(failure);
                break;
            }
        }
        catch (Exception t)
        {
            // Unexpected error so it would be helpful to have details
            Tracing.trace("{}", getStackTraceAsToString(t));
            throw t;
        }
        finally
        {
            if (attributeNonAccordLatency)
            {
                // On the exception path nonAccordEndTime will be -1
                long latency = nonAccordEndTime != -1 ? nonAccordEndTime : nanoTime() - startTime;
                writeMetrics.addNano(latency);
                writeMetricsForLevel(consistencyLevel).addNano(latency);
                updateCoordinatorWriteLatencyTableMetric(mutations, latency);
            }
        }
    }