public static ConsensusAttemptResult cas()

in src/java/org/apache/cassandra/service/paxos/Paxos.java [707:886]


    public static ConsensusAttemptResult cas(DecoratedKey partitionKey,
                                             CASRequest request,
                                             ConsistencyLevel consistencyForConsensus,
                                             ConsistencyLevel consistencyForCommit,
                                             ClientState clientState,
                                             Dispatcher.RequestTime requestTime)
            throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
    {
        final long proposeDeadline = requestTime.startedAtNanos() + getCasContentionTimeout(NANOSECONDS);
        final long commitDeadline = Math.max(proposeDeadline, requestTime.startedAtNanos() + getWriteRpcTimeout(NANOSECONDS));
        SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
        TableMetadata metadata = readCommand.metadata();

        consistencyForConsensus.validateForCas();
        consistencyForCommit.validateForCasCommit(Keyspace.open(metadata.keyspace).getReplicationStrategy());

        int failedAttemptsDueToContention = 0;
        try (PaxosOperationLock lock = PaxosState.lock(partitionKey, metadata, proposeDeadline, consistencyForConsensus, true))
        {
            Paxos.Async<PaxosCommit.Status> commit = null;
            done: while (true)
            {
                // read the current values and check they validate the conditions
                Tracing.trace("Reading existing values for CAS precondition");

                BeginResult begin = begin(proposeDeadline, readCommand, consistencyForConsensus,
                        true, null, failedAttemptsDueToContention, request.requestTime());

                if (begin.retryWithNewConsenusProtocol)
                {
                    casWriteMetrics.beginMigrationRejects.mark();
                    return RETRY_NEW_PROTOCOL;
                }

                Ballot ballot = begin.ballot;
                Participants participants = begin.participants;
                failedAttemptsDueToContention = begin.failedAttemptsDueToContention;

                FilteredPartition current;
                try (RowIterator iter = PartitionIterators.getOnlyElement(begin.readResponse, readCommand))
                {
                    current = FilteredPartition.create(iter);
                }

                Proposal proposal;
                boolean conditionMet = request.appliesTo(current);
                if (!conditionMet)
                {
                    if (getPaxosVariant() == v2_without_linearizable_reads_or_rejected_writes)
                    {
                        Tracing.trace("CAS precondition rejected", current);
                        casWriteMetrics.conditionNotMet.inc();
                        return casResult(current.rowIterator(false));
                    }

                    // If we failed to meet our condition, it does not mean we can do nothing: if we do not propose
                    // anything that is accepted by a quorum, it is possible for our !conditionMet state
                    // to not be serialized wrt other operations.
                    // If a later read encounters an "in progress" write that did not reach a majority,
                    // but that would have permitted conditionMet had it done so (and hence we evidently did not witness),
                    // that operation will complete the in-progress proposal before continuing, so that this and future
                    // reads will perceive conditionMet without any intervening modification from the time at which we
                    // assured a conditional write that !conditionMet.
                    // So our evaluation is only serialized if we invalidate any in progress operations by proposing an empty update
                    // See also CASSANDRA-12126
                    if (begin.isLinearizableRead)
                    {
                        Tracing.trace("CAS precondition does not match current values {}; read is already linearizable; aborting", current);
                        return casResult(conditionNotMet(current));
                    }

                    Tracing.trace("CAS precondition does not match current values {}; proposing empty update", current);
                    proposal = Proposal.empty(ballot, partitionKey, metadata);
                }
                else if (begin.isPromised)
                {
                    // finish the paxos round w/ the desired updates
                    // TODO "turn null updates into delete?" - what does this TODO even mean?
                    PartitionUpdate updates = request.makeUpdates(current, clientState, begin.ballot);

                    // Update the metrics before triggers potentially add mutations.
                    ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(updates);

                    // Apply triggers to cas updates. A consideration here is that
                    // triggers emit Mutations, and so a given trigger implementation
                    // may generate mutations for partitions other than the one this
                    // paxos round is scoped for. In this case, TriggerExecutor will
                    // validate that the generated mutations are targetted at the same
                    // partition as the initial updates and reject (via an
                    // InvalidRequestException) any which aren't.
                    updates = TriggerExecutor.instance.execute(updates);

                    proposal = Proposal.of(ballot, updates);
                    Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
                }
                else
                {
                    // must retry, as only achieved read success in begin
                    Tracing.trace("CAS precondition is met, but ballot stale for proposal; retrying", current);
                    continue;
                }

                PaxosPropose.Status propose = propose(proposal, participants, conditionMet, false).awaitUntil(proposeDeadline);
                switch (propose.outcome)
                {
                    default: throw new IllegalStateException();

                    case MAYBE_FAILURE:
                        throw propose.maybeFailure().markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);

                    case SUCCESS:
                    {
                        if (!conditionMet)
                            return casResult(conditionNotMet(current));

                        // no need to commit a no-op; either it
                        //   1) reached a majority, in which case it was agreed, had no effect and we can do nothing; or
                        //   2) did not reach a majority, was not agreed, and was not user visible as a result so we can ignore it
                        if (!proposal.update.isEmpty())
                            commit = commit(proposal.agreed(), participants, consistencyForConsensus, consistencyForCommit, true);

                        break done;
                    }

                    case SUPERSEDED:
                    {
                        Superseded superseded = propose.superseded();
                        switch (superseded.hadSideEffects)
                        {
                            default: throw new IllegalStateException();

                            case MAYBE:
                                // We don't know if our update has been applied, as the competing ballot may have completed
                                // our proposal.  We yield our uncertainty to the caller via timeout exception.
                                // TODO: should return more useful result to client, and should also avoid this situation where possible
                                throw new MaybeFailure(false, participants.sizeOfPoll(), participants.sizeOfConsensusQuorum, 0, emptyMap())
                                        .markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);

                            case NO:
                                // Shouldn't retry on this protocol
                                if (superseded.needsConsensusMigration)
                                {
                                    casWriteMetrics.acceptMigrationRejects.mark();
                                    return RETRY_NEW_PROTOCOL;
                                }
                                // We have been superseded without our proposal being accepted by anyone, so we can safely retry
                                Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)");
                                if (!waitForContention(proposeDeadline, ++failedAttemptsDueToContention, metadata, partitionKey, consistencyForConsensus, WRITE))
                                    throw MaybeFailure.noResponses(participants).markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
                        }
                    }
                }
                // continue to retry
            }

            if (commit != null)
            {
                PaxosCommit.Status result = commit.awaitUntil(commitDeadline);
                if (!result.isSuccess())
                    throw result.maybeFailure().markAndThrowAsTimeoutOrFailure(true, consistencyForCommit, failedAttemptsDueToContention);
            }
            Tracing.trace("CAS successful");
            return casResult((RowIterator)null);

        }
        finally
        {
            final long latency = nanoTime() - requestTime.startedAtNanos();

            if (failedAttemptsDueToContention > 0)
            {
                casWriteMetrics.contention.update(failedAttemptsDueToContention);
                openAndGetStore(metadata).metric.topCasPartitionContention.addSample(partitionKey.getKey(), failedAttemptsDueToContention);
            }


            casWriteMetrics.addNano(latency);
            writeMetricsMap.get(consistencyForConsensus).addNano(latency);
        }
    }