private static BeginResult begin()

in src/java/org/apache/cassandra/service/paxos/Paxos.java [1052:1187]


    private static BeginResult begin(long deadline,
                                     SinglePartitionReadCommand query,
                                     ConsistencyLevel consistencyForConsensus,
                                     final boolean isWrite,
                                     Ballot minimumBallot,
                                     int failedAttemptsDueToContention,
                                     Dispatcher.RequestTime requestTime)
            throws WriteTimeoutException, WriteFailureException, ReadTimeoutException, ReadFailureException
    {
        boolean acceptEarlyReadPermission = !isWrite; // if we're reading, begin by assuming a read permission is sufficient
        Participants initialParticipants = Participants.get(query.metadata(), query.partitionKey(), consistencyForConsensus);
        initialParticipants.assureSufficientLiveNodes(isWrite);
        PaxosPrepare preparing = prepare(minimumBallot, initialParticipants, query, isWrite, acceptEarlyReadPermission);
        while (true)
        {
            // prepare
            PaxosPrepare retry = null;
            PaxosPrepare.Status prepare = preparing.awaitUntil(deadline);

            // After performing the prepare phase we may discover that we can't propose
            // our own transaction on this protocol by discovering a new CM Epoch
            if (ConsensusRequestRouter.instance.isKeyInMigratingOrMigratedRangeDuringPaxosBegin(query.metadata().id, query.partitionKey()) && prepare.outcome != MAYBE_FAILURE)
            {
                return BeginResult.retryOnNewProtocol();
            }

            boolean isPromised = false;
            retry: switch (prepare.outcome)
            {
                default: throw new IllegalStateException();

                case FOUND_INCOMPLETE_COMMITTED:
                {
                    FoundIncompleteCommitted incomplete = prepare.incompleteCommitted();
                    Tracing.trace("Repairing replicas that missed the most recent commit");
                    retry = commitAndPrepare(incomplete.committed, incomplete.participants, query, isWrite, acceptEarlyReadPermission);
                    break;
                }
                case FOUND_INCOMPLETE_ACCEPTED:
                {
                    FoundIncompleteAccepted inProgress = prepare.incompleteAccepted();
                    Tracing.trace("Finishing incomplete paxos round {}", inProgress.accepted);
                    if (isWrite)
                        casWriteMetrics.unfinishedCommit.inc();
                    else
                        casReadMetrics.unfinishedCommit.inc();

                    // we DO NOT need to change the timestamp of this commit - either we or somebody else will finish it
                    // and the original timestamp is correctly linearised. By not updatinig the timestamp we leave enough
                    // information for nodes to avoid competing re-proposing the same proposal; if an in progress accept
                    // is equal to the latest commit (even if the ballots aren't) we're done and can abort earlier,
                    // and in fact it's possible for a CAS to sometimes determine if side effects occurred by reading
                    // the underlying data and not witnessing the timestamp of its ballot (or any newer for the relevant data).
                    Proposal repropose = new Proposal(inProgress.ballot, inProgress.accepted.update);
                    PaxosPropose.Status proposeResult = propose(repropose, inProgress.participants, false, true).awaitUntil(deadline);
                    switch (proposeResult.outcome)
                    {
                        default: throw new IllegalStateException();

                        case MAYBE_FAILURE:
                            throw proposeResult.maybeFailure().markAndThrowAsTimeoutOrFailure(isWrite, consistencyForConsensus, failedAttemptsDueToContention);

                        case SUCCESS:
                            retry = commitAndPrepare(repropose.agreed(), inProgress.participants, query, isWrite, acceptEarlyReadPermission);
                            break retry;

                        case SUPERSEDED:
                            checkState(!proposeResult.superseded().needsConsensusMigration, "Should not receive needsConsensusMigration rejects from begin");
                            // since we are proposing a previous value that was maybe superseded by us before completion
                            // we don't need to test the side effects, as we just want to start again, and fall through
                            // to the superseded section below
                            prepare = new PaxosPrepare.Superseded(proposeResult.superseded().by, inProgress.participants);
                    }
                }

                case SUPERSEDED:
                {
                    Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
                    // sleep a random amount to give the other proposer a chance to finish
                    if (!waitForContention(deadline, ++failedAttemptsDueToContention, query.metadata(), query.partitionKey(), consistencyForConsensus, isWrite ? WRITE : READ))
                        throw MaybeFailure.noResponses(prepare.participants).markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
                    retry = prepare(prepare.retryWithAtLeast(), prepare.participants, query, isWrite, acceptEarlyReadPermission);
                    break;
                }
                case PROMISED: isPromised = true;
                case READ_PERMITTED:
                {
                    // We have received a quorum of promises (or read permissions) that have all witnessed the commit of the prior paxos
                    // round's proposal (if any).
                    PaxosPrepare.Success success = prepare.success();

                    Supplier<Participants> plan = () -> success.participants;
                    DataResolver<?, ?> resolver = new DataResolver<>(ReadCoordinator.DEFAULT, query, plan, NoopReadRepair.instance, requestTime);
                    for (int i = 0 ; i < success.responses.size() ; ++i)
                        resolver.preprocess(success.responses.get(i));

                    class WasRun implements Runnable { boolean v; public void run() { v = true; } }
                    WasRun hadShortRead = new WasRun();
                    PartitionIterator result = resolver.resolve(hadShortRead);

                    if (!isPromised && hadShortRead.v)
                    {
                        // we need to propose an empty update to linearize our short read, but only had read success
                        // since we may continue to perform short reads, we ask our prepare not to accept an early
                        // read permission, when a promise may yet be obtained
                        // TODO: increase read size each time this happens?
                        acceptEarlyReadPermission = false;
                        break;
                    }

                    return new BeginResult(success.ballot, success.participants, failedAttemptsDueToContention, result, !hadShortRead.v && success.isReadSafe, isPromised, success.supersededBy, false);
                }

                case MAYBE_FAILURE:
                    throw prepare.maybeFailure().markAndThrowAsTimeoutOrFailure(isWrite, consistencyForConsensus, failedAttemptsDueToContention);

                case ELECTORATE_MISMATCH:
                    Participants participants = Participants.get(query.metadata(), query.partitionKey(), consistencyForConsensus);
                    participants.assureSufficientLiveNodes(isWrite);
                    retry = prepare(participants, query, isWrite, acceptEarlyReadPermission);
                    break;

            }

            if (retry == null)
            {
                Tracing.trace("Some replicas have already promised a higher ballot than ours; retrying");
                // sleep a random amount to give the other proposer a chance to finish
                if (!waitForContention(deadline, ++failedAttemptsDueToContention, query.metadata(), query.partitionKey(), consistencyForConsensus, isWrite ? WRITE : READ))
                    throw MaybeFailure.noResponses(prepare.participants).markAndThrowAsTimeoutOrFailure(true, consistencyForConsensus, failedAttemptsDueToContention);
                retry = prepare(prepare.retryWithAtLeast(), prepare.participants, query, isWrite, acceptEarlyReadPermission);
            }

            preparing = retry;
        }
    }