in src/java/org/apache/cassandra/service/paxos/Paxos.java [707:886]
public static ConsensusAttemptResult cas(DecoratedKey partitionKey,
CASRequest request,
ConsistencyLevel consistencyForConsensus,
ConsistencyLevel consistencyForCommit,
ClientState clientState,
Dispatcher.RequestTime requestTime)
throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
{
final long proposeDeadline = requestTime.startedAtNanos() + getCasContentionTimeout(NANOSECONDS);
final long commitDeadline = Math.max(proposeDeadline, requestTime.startedAtNanos() + getWriteRpcTimeout(NANOSECONDS));
SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
TableMetadata metadata = readCommand.metadata();
consistencyForConsensus.validateForCas();
consistencyForCommit.validateForCasCommit(Keyspace.open(metadata.keyspace).getReplicationStrategy());
int failedAttemptsDueToContention = 0;
try (PaxosOperationLock lock = PaxosState.lock(partitionKey, metadata, proposeDeadline, consistencyForConsensus, true))
{
Paxos.Async<PaxosCommit.Status> commit = null;
done: while (true)
{
// read the current values and check they validate the conditions
Tracing.trace("Reading existing values for CAS precondition");
BeginResult begin = begin(proposeDeadline, readCommand, consistencyForConsensus,
true, null, failedAttemptsDueToContention, request.requestTime());
if (begin.retryWithNewConsenusProtocol)
{
casWriteMetrics.beginMigrationRejects.mark();
return RETRY_NEW_PROTOCOL;
}
Ballot ballot = begin.ballot;
Participants participants = begin.participants;
failedAttemptsDueToContention = begin.failedAttemptsDueToContention;
FilteredPartition current;
try (RowIterator iter = PartitionIterators.getOnlyElement(begin.readResponse, readCommand))
{
current = FilteredPartition.create(iter);
}
Proposal proposal;
boolean conditionMet = request.appliesTo(current);
if (!conditionMet)
{
if (getPaxosVariant() == v2_without_linearizable_reads_or_rejected_writes)
{
Tracing.trace("CAS precondition rejected", current);
casWriteMetrics.conditionNotMet.inc();
return casResult(current.rowIterator(false));
}
// If we failed to meet our condition, it does not mean we can do nothing: if we do not propose
// anything that is accepted by a quorum, it is possible for our !conditionMet state
// to not be serialized wrt other operations.
// If a later read encounters an "in progress" write that did not reach a majority,
// but that would have permitted conditionMet had it done so (and hence we evidently did not witness),
// that operation will complete the in-progress proposal before continuing, so that this and future
// reads will perceive conditionMet without any intervening modification from the time at which we
// assured a conditional write that !conditionMet.
// So our evaluation is only serialized if we invalidate any in progress operations by proposing an empty update
// See also CASSANDRA-12126
if (begin.isLinearizableRead)
{
Tracing.trace("CAS precondition does not match current values {}; read is already linearizable; aborting", current);
return casResult(conditionNotMet(current));
}
Tracing.trace("CAS precondition does not match current values {}; proposing empty update", current);
proposal = Proposal.empty(ballot, partitionKey, metadata);
}
else if (begin.isPromised)
{
// finish the paxos round w/ the desired updates
// TODO "turn null updates into delete?" - what does this TODO even mean?
PartitionUpdate updates = request.makeUpdates(current, clientState, begin.ballot);
// Update the metrics before triggers potentially add mutations.
ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(updates);
// Apply triggers to cas updates. A consideration here is that
// triggers emit Mutations, and so a given trigger implementation
// may generate mutations for partitions other than the one this
// paxos round is scoped for. In this case, TriggerExecutor will
// validate that the generated mutations are targetted at the same
// partition as the initial updates and reject (via an
// InvalidRequestException) any which aren't.
updates = TriggerExecutor.instance.execute(updates);
proposal = Proposal.of(ballot, updates);
Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
}
else
{
// must retry, as only achieved read success in begin
Tracing.trace("CAS precondition is met, but ballot stale for proposal; retrying", current);
continue;
}
PaxosPropose.Status propose = propose(proposal, participants, conditionMet, false).awaitUntil(proposeDeadline);
switch (propose.outcome)
{
default: throw new IllegalStateException();
case MAYBE_FAILURE:
throw propose.maybeFailure().markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
case SUCCESS:
{
if (!conditionMet)
return casResult(conditionNotMet(current));
// no need to commit a no-op; either it
// 1) reached a majority, in which case it was agreed, had no effect and we can do nothing; or
// 2) did not reach a majority, was not agreed, and was not user visible as a result so we can ignore it
if (!proposal.update.isEmpty())
commit = commit(proposal.agreed(), participants, consistencyForConsensus, consistencyForCommit, true);
break done;
}
case SUPERSEDED:
{
Superseded superseded = propose.superseded();
switch (superseded.hadSideEffects)
{
default: throw new IllegalStateException();
case MAYBE:
// We don't know if our update has been applied, as the competing ballot may have completed
// our proposal. We yield our uncertainty to the caller via timeout exception.
// TODO: should return more useful result to client, and should also avoid this situation where possible
throw new MaybeFailure(false, participants.sizeOfPoll(), participants.sizeOfConsensusQuorum, 0, emptyMap())
.markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
case NO:
// Shouldn't retry on this protocol
if (superseded.needsConsensusMigration)
{
casWriteMetrics.acceptMigrationRejects.mark();
return RETRY_NEW_PROTOCOL;
}
// We have been superseded without our proposal being accepted by anyone, so we can safely retry
Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)");
if (!waitForContention(proposeDeadline, ++failedAttemptsDueToContention, metadata, partitionKey, consistencyForConsensus, WRITE))
throw MaybeFailure.noResponses(participants).markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
}
}
}
// continue to retry
}
if (commit != null)
{
PaxosCommit.Status result = commit.awaitUntil(commitDeadline);
if (!result.isSuccess())
throw result.maybeFailure().markAndThrowAsTimeoutOrFailure(true, consistencyForCommit, failedAttemptsDueToContention);
}
Tracing.trace("CAS successful");
return casResult((RowIterator)null);
}
finally
{
final long latency = nanoTime() - requestTime.startedAtNanos();
if (failedAttemptsDueToContention > 0)
{
casWriteMetrics.contention.update(failedAttemptsDueToContention);
openAndGetStore(metadata).metric.topCasPartitionContention.addSample(partitionKey.getKey(), failedAttemptsDueToContention);
}
casWriteMetrics.addNano(latency);
writeMetricsMap.get(consistencyForConsensus).addNano(latency);
}
}