in src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java [546:684]
private void permitted(Permitted permitted, InetAddressAndPort from)
{
if (permitted.outcome != PROMISE)
{
hasOnlyPromises = false;
if (supersededBy == null)
supersededBy = permitted.supersededBy;
}
if (permitted.lowBound > maxLowBound)
{
maxLowBound = permitted.lowBound;
if (!latestCommitted.isNone() && latestCommitted.ballot.uuidTimestamp() < maxLowBound)
{
latestCommitted = Committed.none(request.partitionKey, request.table);
haveReadResponseWithLatest = !readResponses.isEmpty();
if (needLatest != null)
{
withLatest.addAll(needLatest);
needLatest.clear();
}
}
}
if (!haveQuorumOfPermissions)
{
Committed newLatestCommitted = permitted.latestCommitted;
if (newLatestCommitted.ballot.uuidTimestamp() < maxLowBound) newLatestCommitted = Committed.none(request.partitionKey, request.table);
CompareResult compareLatest = newLatestCommitted.compareWith(latestCommitted);
switch (compareLatest)
{
default: throw new IllegalStateException();
case IS_REPROPOSAL:
latestCommitted = permitted.latestCommitted;
case WAS_REPROPOSED_BY:
case SAME:
withLatest.add(from);
haveReadResponseWithLatest |= permitted.readResponse != null;
break;
case BEFORE:
if (needLatest == null)
needLatest = new ArrayList<>(participants.sizeOfPoll() - withLatest.size());
needLatest.add(from);
break;
case AFTER:
// move with->need
if (!withLatest.isEmpty())
{
if (needLatest == null)
{
needLatest = withLatest;
withLatest = new ArrayList<>(Math.min(participants.sizeOfPoll() - needLatest.size(), participants.sizeOfConsensusQuorum));
}
else
{
needLatest.addAll(withLatest);
withLatest.clear();
}
}
withLatest.add(from);
haveReadResponseWithLatest = permitted.readResponse != null;
latestCommitted = permitted.latestCommitted;
}
if (isAfter(permitted.latestAcceptedButNotCommitted, latestAccepted))
latestAccepted = permitted.latestAcceptedButNotCommitted;
if (permitted.readResponse != null)
{
hasProposalStability &= permitted.hadProposalStability;
addReadResponse(permitted.readResponse, from);
}
}
else
{
switch (permitted.latestCommitted.compareWith(latestCommitted))
{
default: throw new IllegalStateException();
case SAME:
case IS_REPROPOSAL:
case WAS_REPROPOSED_BY:
withLatest.add(from);
break;
case AFTER:
if (maybeCheckForLinearizabilityViolation(permitted, from))
return;
// witnessing future commit doesn't imply have seen prior, so add to refresh list
case BEFORE:
if (needLatest == null)
needLatest = new ArrayList<>(participants.sizeOfPoll() - withLatest.size());
needLatest.add(from);
}
}
haveQuorumOfPermissions |= withLatest() + needLatest() >= participants.sizeOfConsensusQuorum;
if (haveQuorumOfPermissions)
{
if (request.read != null && readResponses.size() < participants.sizeOfReadQuorum)
throw new IllegalStateException("Insufficient read responses: " + readResponses + "; need " + participants.sizeOfReadQuorum);
if (!hasOnlyPromises && !hasProposalStability)
signalDone(SUPERSEDED);
// We must be certain to have witnessed a quorum of responses before completing any in-progress proposal
// else we may complete a stale proposal that did not reach a quorum (and may do so in preference
// to a different in progress proposal that did reach a quorum).
// We should also be sure to return any in progress proposal in preference to any incompletely committed
// earlier commits (since, while we should encounter it next round, any commit that is incomplete in the
// presence of an incomplete proposal can be ignored, as either the proposal is a re-proposal of the same
// commit or the commit has already reached a quorum
else if (hasInProgressProposal())
signalDone(hasOnlyPromises ? FOUND_INCOMPLETE_ACCEPTED : SUPERSEDED);
else if (withLatest() >= participants.sizeOfConsensusQuorum)
signalDone(hasOnlyPromises ? PROMISED : READ_PERMITTED);
// otherwise if we have any read response with the latest commit,
// try to simply ensure it has been persisted to a consensus group
else if (haveReadResponseWithLatest)
{
refreshStaleParticipants();
// if an optimistic read is possible, and we are performing a read,
// we can safely answer immediately without waiting for the refresh
if (hasProposalStability && acceptEarlyReadPermission)
signalDone(Outcome.READ_PERMITTED);
}
// otherwise we need to run our reads again anyway,
// and the chance of receiving another response with latest may be slim.
// so we just start again
else
signalDone(FOUND_INCOMPLETE_COMMITTED);
}
}