in accord-core/src/main/java/accord/coordinate/Recover.java [201:319]
private void recover()
{
Invariants.checkState(!isBallotPromised);
isBallotPromised = true;
// first look for the most recent Accept (or later); if present, go straight to proposing it again
RecoverOk acceptOrCommit = maxAcceptedOrLater(recoverOks);
if (acceptOrCommit != null)
{
Timestamp executeAt = acceptOrCommit.executeAt;
switch (acceptOrCommit.status)
{
default: throw new IllegalStateException();
case Invalidated:
{
commitInvalidate();
return;
}
case Applied:
case PreApplied:
{
Deps committedDeps = tryMergeCommittedDeps();
node.withEpoch(executeAt.epoch(), () -> {
// TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
if (committedDeps != null) Persist.persistAndCommitMaximal(node, txnId, route, txn, executeAt, committedDeps, acceptOrCommit.writes, acceptOrCommit.result);
else CollectDeps.withDeps(node, txnId, route, txn.keys(), executeAt, (deps, fail) -> {
if (fail != null) accept(null, fail);
else Persist.persistAndCommitMaximal(node, txnId, route, txn, executeAt, deps, acceptOrCommit.writes, acceptOrCommit.result);
});
});
accept(acceptOrCommit.result, null);
return;
}
case ReadyToExecute:
case PreCommitted:
case Committed:
{
Deps committedDeps = tryMergeCommittedDeps();
node.withEpoch(executeAt.epoch(), () -> {
if (committedDeps != null)
{
Execute.execute(node, txnId, txn, route, executeAt, committedDeps, this);
}
else
{
CollectDeps.withDeps(node, txnId, route, txn.keys(), executeAt, (deps, fail) -> {
if (fail != null) accept(null, fail);
else Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, deps, this);
});
}
});
return;
}
case Accepted:
{
// TODO (required): this is broken given feedback from Gotsman, Sutra and Ryabinin. Every transaction proposes deps.
// Today we send Deps for all Accept rounds, however their contract is different for SyncPoint
// and regular transactions. The former require that their deps be durably agreed before they
// proceed, but this is impossible for a regular transaction that may agree a later executeAt
// than proposed. Deps for standard transactions are only used for recovery.
// While we could safely behave identically here for them, we expect to stop sending deps in the
// Accept round for these transactions as it is not necessary for correctness with some modifications
// to recovery.
Deps deps;
if (txnId.rw().proposesDeps()) deps = tryMergeAcceptedDeps();
else deps = mergeDeps();
if (deps != null)
{
// TODO (desired, behaviour): if we didn't find Accepted in *every* shard, consider invalidating for consistency of behaviour
propose(acceptOrCommit.executeAt, deps);
return;
}
// if we propose deps, and cannot assemble a complete set, then we never reached consensus and can be invalidated
}
case AcceptedInvalidate:
{
invalidate();
return;
}
case NotWitnessed:
case PreAccepted:
throw new IllegalStateException("Should only be possible to have Accepted or later commands");
}
}
if (txnId.rw().proposesDeps() || tracker.rejectsFastPath() || recoverOks.stream().anyMatch(ok -> ok.rejectsFastPath))
{
invalidate();
return;
}
// should all be PreAccept
Deps deps = mergeDeps();
Deps earlierAcceptedNoWitness = Deps.merge(recoverOks, ok -> ok.earlierAcceptedNoWitness);
Deps earlierCommittedWitness = Deps.merge(recoverOks, ok -> ok.earlierCommittedWitness);
earlierAcceptedNoWitness = earlierAcceptedNoWitness.without(earlierCommittedWitness::contains);
if (!earlierAcceptedNoWitness.isEmpty())
{
// If there exist commands that were proposed an earlier execution time than us that have not witnessed us,
// we have to be certain these commands have not successfully committed without witnessing us (thereby
// ruling out a fast path decision for us and changing our recovery decision).
// So, we wait for these commands to finish committing before retrying recovery.
// TODO (required): check paper: do we assume that witnessing in PreAccept implies witnessing in Accept? Not guaranteed.
// See whitepaper for more details
awaitCommits(node, earlierAcceptedNoWitness).addCallback((success, failure) -> {
if (failure != null) accept(null, failure);
else retry();
});
return;
}
propose(txnId, deps);
}