in src/java/org/apache/cassandra/service/paxos/Paxos.java [1052:1187]
private static BeginResult begin(long deadline,
SinglePartitionReadCommand query,
ConsistencyLevel consistencyForConsensus,
final boolean isWrite,
Ballot minimumBallot,
int failedAttemptsDueToContention,
Dispatcher.RequestTime requestTime)
throws WriteTimeoutException, WriteFailureException, ReadTimeoutException, ReadFailureException
{
boolean acceptEarlyReadPermission = !isWrite; // if we're reading, begin by assuming a read permission is sufficient
Participants initialParticipants = Participants.get(query.metadata(), query.partitionKey(), consistencyForConsensus);
initialParticipants.assureSufficientLiveNodes(isWrite);
PaxosPrepare preparing = prepare(minimumBallot, initialParticipants, query, isWrite, acceptEarlyReadPermission);
while (true)
{
// prepare
PaxosPrepare retry = null;
PaxosPrepare.Status prepare = preparing.awaitUntil(deadline);
// After performing the prepare phase we may discover that we can't propose
// our own transaction on this protocol by discovering a new CM Epoch
if (ConsensusRequestRouter.instance.isKeyInMigratingOrMigratedRangeDuringPaxosBegin(query.metadata().id, query.partitionKey()) && prepare.outcome != MAYBE_FAILURE)
{
return BeginResult.retryOnNewProtocol();
}
boolean isPromised = false;
retry: switch (prepare.outcome)
{
default: throw new IllegalStateException();
case FOUND_INCOMPLETE_COMMITTED:
{
FoundIncompleteCommitted incomplete = prepare.incompleteCommitted();
Tracing.trace("Repairing replicas that missed the most recent commit");
retry = commitAndPrepare(incomplete.committed, incomplete.participants, query, isWrite, acceptEarlyReadPermission);
break;
}
case FOUND_INCOMPLETE_ACCEPTED:
{
FoundIncompleteAccepted inProgress = prepare.incompleteAccepted();
Tracing.trace("Finishing incomplete paxos round {}", inProgress.accepted);
if (isWrite)
casWriteMetrics.unfinishedCommit.inc();
else
casReadMetrics.unfinishedCommit.inc();
// we DO NOT need to change the timestamp of this commit - either we or somebody else will finish it
// and the original timestamp is correctly linearised. By not updatinig the timestamp we leave enough
// information for nodes to avoid competing re-proposing the same proposal; if an in progress accept
// is equal to the latest commit (even if the ballots aren't) we're done and can abort earlier,
// and in fact it's possible for a CAS to sometimes determine if side effects occurred by reading
// the underlying data and not witnessing the timestamp of its ballot (or any newer for the relevant data).
Proposal repropose = new Proposal(inProgress.ballot, inProgress.accepted.update);
PaxosPropose.Status proposeResult = propose(repropose, inProgress.participants, false, true).awaitUntil(deadline);
switch (proposeResult.outcome)
{
default: throw new IllegalStateException();
case MAYBE_FAILURE:
throw proposeResult.maybeFailure().markAndThrowAsTimeoutOrFailure(isWrite, consistencyForConsensus, failedAttemptsDueToContention);
case SUCCESS:
retry = commitAndPrepare(repropose.agreed(), inProgress.participants, query, isWrite, acceptEarlyReadPermission);
break retry;
case SUPERSEDED:
checkState(!proposeResult.superseded().needsConsensusMigration, "Should not receive needsConsensusMigration rejects from begin");
// since we are proposing a previous value that was maybe superseded by us before completion
// we don't need to test the side effects, as we just want to start again, and fall through
// to the superseded section below
prepare = new PaxosPrepare.Superseded(proposeResult.superseded().by, inProgress.participants);
}
}
case SUPERSEDED:
{
Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
// sleep a random amount to give the other proposer a chance to finish
if (!waitForContention(deadline, ++failedAttemptsDueToContention, query.metadata(), query.partitionKey(), consistencyForConsensus, isWrite ? WRITE : READ))
throw MaybeFailure.noResponses(prepare.participants).markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
retry = prepare(prepare.retryWithAtLeast(), prepare.participants, query, isWrite, acceptEarlyReadPermission);
break;
}
case PROMISED: isPromised = true;
case READ_PERMITTED:
{
// We have received a quorum of promises (or read permissions) that have all witnessed the commit of the prior paxos
// round's proposal (if any).
PaxosPrepare.Success success = prepare.success();
Supplier<Participants> plan = () -> success.participants;
DataResolver<?, ?> resolver = new DataResolver<>(ReadCoordinator.DEFAULT, query, plan, NoopReadRepair.instance, requestTime);
for (int i = 0 ; i < success.responses.size() ; ++i)
resolver.preprocess(success.responses.get(i));
class WasRun implements Runnable { boolean v; public void run() { v = true; } }
WasRun hadShortRead = new WasRun();
PartitionIterator result = resolver.resolve(hadShortRead);
if (!isPromised && hadShortRead.v)
{
// we need to propose an empty update to linearize our short read, but only had read success
// since we may continue to perform short reads, we ask our prepare not to accept an early
// read permission, when a promise may yet be obtained
// TODO: increase read size each time this happens?
acceptEarlyReadPermission = false;
break;
}
return new BeginResult(success.ballot, success.participants, failedAttemptsDueToContention, result, !hadShortRead.v && success.isReadSafe, isPromised, success.supersededBy, false);
}
case MAYBE_FAILURE:
throw prepare.maybeFailure().markAndThrowAsTimeoutOrFailure(isWrite, consistencyForConsensus, failedAttemptsDueToContention);
case ELECTORATE_MISMATCH:
Participants participants = Participants.get(query.metadata(), query.partitionKey(), consistencyForConsensus);
participants.assureSufficientLiveNodes(isWrite);
retry = prepare(participants, query, isWrite, acceptEarlyReadPermission);
break;
}
if (retry == null)
{
Tracing.trace("Some replicas have already promised a higher ballot than ours; retrying");
// sleep a random amount to give the other proposer a chance to finish
if (!waitForContention(deadline, ++failedAttemptsDueToContention, query.metadata(), query.partitionKey(), consistencyForConsensus, isWrite ? WRITE : READ))
throw MaybeFailure.noResponses(prepare.participants).markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
retry = prepare(prepare.retryWithAtLeast(), prepare.participants, query, isWrite, acceptEarlyReadPermission);
}
preparing = retry;
}
}