in accord-core/src/main/java/accord/coordinate/Recover.java [243:430]
private void recover()
{
Invariants.require(!isBallotPromised);
isBallotPromised = true;
SortedListMap<Id, RecoverOk> recoverOks = this.recoverOks;
if (!Invariants.debug()) this.recoverOks = null;
List<RecoverOk> recoverOkList = recoverOks.valuesAsNullableList();
RecoverOk acceptOrCommit = maxAccepted(recoverOkList);
RecoverOk acceptOrCommitNotTruncated = acceptOrCommit == null || acceptOrCommit.status != Status.Truncated
? acceptOrCommit : maxAcceptedNotTruncated(recoverOkList);
if (acceptOrCommitNotTruncated != null)
{
Timestamp executeAt = acceptOrCommitNotTruncated.executeAt;
Status status; {
Status tmp = acceptOrCommitNotTruncated.status;
if (committedExecuteAt != null)
{
Invariants.require(acceptOrCommitNotTruncated.status.compareTo(Status.PreCommitted) < 0 || executeAt.equals(committedExecuteAt));
// if we know from a prior Accept attempt that this is committed we can go straight to the commit phase
if (tmp == AcceptedMedium || tmp == AcceptedSlow)
tmp = Status.Committed;
}
status = tmp;
}
switch (status)
{
case Truncated: throw illegalState("Truncate should be filtered");
case Invalidated:
{
commitInvalidate(invalidateUntil(recoverOks));
return;
}
case AcceptedInvalidate:
{
invalidate(recoverOks);
return;
}
case NotDefined:
case PreAccepted:
throw illegalState("Should only be possible to have Accepted or later commands");
}
LatestDeps.Merge merge = mergeDeps(recoverOkList);
Participants<?> await = merge.notAccepted(route);
awaitPartialEarlier(recoverOkList, await, () -> {
switch (status)
{
default: throw new UnhandledEnum(status);
case Applied:
case PreApplied:
{
withStableDeps(merge, executeAt, (i, t) -> node.agent().acceptAndWrap(i, t), stableDeps -> {
adapter.persist(node, tracker.topologies(), route, txnId, txn, executeAt, stableDeps, acceptOrCommitNotTruncated.writes, acceptOrCommitNotTruncated.result, (i, t) -> node.agent().acceptAndWrap(i, t));
});
accept(acceptOrCommitNotTruncated.result, null);
return;
}
case Stable:
{
withStableDeps(merge, executeAt, this, stableDeps -> {
adapter.execute(node, tracker.topologies(), route, RECOVER, ExecuteFlags.none(), txnId, txn, executeAt, stableDeps, stableDeps, this);
});
return;
}
case PreCommitted:
case Committed:
{
withCommittedDeps(merge, executeAt, this, committedDeps -> {
adapter.stabilise(node, tracker.topologies(), route, ballot, txnId, txn, executeAt, committedDeps, this);
});
return;
}
case AcceptedSlow:
case AcceptedMedium:
{
// TODO (desired): if we have a quorum of Accept with matching ballot or proposal we can go straight to Commit
// TODO (desired): if we didn't find Accepted in *every* shard, consider invalidating for consistency of behaviour
// however, note that we may have taken the fast path and recovered, so we can only do this if acceptedOrCommitted=Ballot.ZERO
// (otherwise recovery was attempted and did not invalidate, so it must have determined it needed to complete)
Deps proposeDeps = merge.mergeProposal();
propose(SLOW, acceptOrCommitNotTruncated.executeAt, proposeDeps);
}
}
});
return;
}
if (acceptOrCommit != null && acceptOrCommit != acceptOrCommitNotTruncated)
{
// TODO (required): match logic in Invalidate; we need to know which keys have been invalidated
Topologies topologies = tracker.topologies();
boolean allShardsTruncated = true;
for (int topologyIndex = 0 ; topologyIndex < topologies.size() ; ++topologyIndex)
{
Topology topology = topologies.get(topologyIndex);
for (Shard shard : topology.shards())
{
RecoverOk maxReply = maxAccepted(recoverOks.select(shard.nodes));
allShardsTruncated &= maxReply.status == Status.Truncated;
}
if (allShardsTruncated)
{
// TODO (required, correctness): this is not a safe inference in the case of an ErasedOrInvalidOrVestigial response.
// We need to tighten up the inference and spread of truncation/invalid outcomes.
// In this case, at minimum this can lead to liveness violations as the home shard stops coordinating
// a command that it hasn't invalidated, but nor is it possible to recover. This happens because
// when the home shard shares all of its replicas with another shard that has autonomously invalidated
// the transaction, so that all received InvalidateReply show truncation (when in fact this is only partial).
// We could paper over this, but better to revisit and provide stronger invariants we can rely on.
isDone = true;
callback.accept(TRUNCATED_DURABLE_OR_INVALIDATED, null);
return;
}
}
}
Invariants.require(committedExecuteAt == null || committedExecuteAt.equals(txnId));
boolean coordinatorInRecoveryQuorum = recoverOks.get(txnId.node) != null;
Participants<?> extraCoordVotes = extraCoordinatorVotes(txnId, coordinatorInRecoveryQuorum, recoverOkList);
Participants<?> extraRejects = Deps.merge(recoverOkList, recoverOkList.size(), List::get, ok -> ok.laterCoordRejects)
.intersecting(route, id -> !recoverOks.containsKey(id.node));
InferredFastPath fastPath;
if (txnId.hasPrivilegedCoordinator() && coordinatorInRecoveryQuorum) fastPath = Reject;
else fastPath = merge(
supersedingRejects(recoverOkList) ? Reject : Unknown,
tracker.inferFastPathDecision(txnId, extraCoordVotes, extraRejects)
);
switch (fastPath)
{
case Reject:
{
invalidate(recoverOks);
return;
}
case Accept:
{
// we still have to wait for earlier transactions to decide themselves so we don't accidentally include
// a non-fastpath transaction in our dependencies and permit it to conclude it is safe to execute.
// So, we fall-through to Unknown condition - though we don't in principle need to wait for any future transactions
}
case Unknown:
{
// should all be PreAccept
Deps earlierWait = Deps.merge(recoverOkList, recoverOkList.size(), List::get, ok -> ok.earlierWait);
Deps earlierNoWait = Deps.merge(recoverOkList, recoverOkList.size(), List::get, ok -> ok.earlierNoWait);
earlierWait = earlierWait.without(earlierNoWait);
Deps laterWitnessedCoordRejects = Deps.merge(recoverOks, recoverOks.domainSize(), (map, i) -> selectCoordinatorReplies(map.getKey(i), map.getValue(i)), Function.identity());
if (!earlierWait.isEmpty() || !laterWitnessedCoordRejects.isEmpty())
{
// If there exist commands that were proposed a later execution time than us that have not witnessed us,
// we have to be certain these commands have not successfully committed without witnessing us (thereby
// ruling out a fast path decision for us and changing our recovery decision).
// So, we wait for these commands to commit and recompute supersedingRejects for them.
AsyncChainCombiner.reduce(awaitEarlier(node, earlierWait, HasCommittedDeps),
awaitLater(node, laterWitnessedCoordRejects, CommittedOrNotFastPathCommit, extraCoordVotes),
InferredFastPath::merge)
.begin((inferred, failure) -> {
if (failure != null) accept(null, failure);
else
{
switch (inferred)
{
default: throw new UnhandledEnum(inferred);
case Accept: propose(SLOW, txnId, recoverOkList); break;
case Unknown: retry(committedExecuteAt); break;
case Reject: invalidate(recoverOks); break;
}
}
});
}
else
{
propose(SLOW, txnId, recoverOkList);
}
}
}
}