in accord-core/src/main/java/accord/coordinate/Invalidate.java [160:279]
private void invalidate()
{
Invariants.require(!isPrepareDone);
isPrepareDone = true;
FullRoute<?> fullRoute = InvalidateReply.findRoute(replies.unsafeValuesBackingArray());
Route<?> someRoute = InvalidateReply.mergeRoutes(replies.unsafeValuesBackingArray());
// first look to see if it has already been decided/invalidated
// check each shard independently - if we find any that can be invalidated, do so
InvalidateReply max = InvalidateReply.max(replies.unsafeValuesBackingArray());
InvalidateReply maxNotTruncated = !max.maxStatus.is(Status.Truncated) ? max : InvalidateReply.maxNotTruncated(replies.unsafeValuesBackingArray());
if (maxNotTruncated != null)
{
switch (maxNotTruncated.maxKnowledgeStatus.status)
{
default: throw new UnhandledEnum(maxNotTruncated.maxKnowledgeStatus.status);
case Truncated: throw illegalState();
case AcceptedInvalidate:
// latest accept also invalidating, so we're on the same page and should finish our invalidation
case NotDefined:
break;
case PreAccepted:
if (tracker.isSafeToInvalidate() || transitivelyInvokedByPriorInvalidation)
break;
case Applied:
case PreApplied:
case Stable:
case Committed:
case PreCommitted:
Invariants.require(maxNotTruncated.maxKnowledgeStatus.status == PreAccepted || !invalidateWith.contains(someRoute.homeKey()) || fullRoute != null);
case AcceptedMedium:
case AcceptedSlow:
// TODO (desired, efficiency): if we see Committed or above, go straight to Execute if we have assembled enough information
Invariants.require(fullRoute != null, "Received a reply from a node that must have known some route, but that did not include it"); // we now require the FullRoute on all replicas to preaccept, commit or apply
// The data we see might have made it only to a minority in the event of PreAccept ONLY.
// We want to protect against infinite loops, so we inform the recovery of the state we have
// witnessed during our initial invalidation.
// However, if the state is not guaranteed to be recoverable (i.e. PreAccept/NotWitnessed),
// we do not relay this information unless we can guarantee that any shard recovery may contact
// has been prevented from reaching a _later_ fast-path decision by our promises.
// Which means checking we contacted every shard, since we only reach that point if we have promises
// from every shard we contacted.
// Note that there's lots of scope for variations in behaviour here, but lots of care is needed.
Status witnessedByInvalidation = maxNotTruncated.maxKnowledgeStatus.status;
if (!witnessedByInvalidation.hasBeen(AcceptedMedium))
{
Invariants.require(tracker.all(InvalidationShardTracker::isPromised));
if (!invalidateWith.containsAll(fullRoute))
witnessedByInvalidation = null;
}
RecoverWithRoute.recover(node, ballot, txnId, NotKnownToBeInvalid, fullRoute, witnessedByInvalidation, reportLowEpoch, reportHighEpoch, callback);
return;
case Invalidated:
// TODO (desired, API consistency): standardise semantics of whether local application of state prior is async or sync to callback
isDone = true;
commitInvalidate();
return;
}
}
if (max != maxNotTruncated || max.maxStatus != max.maxKnowledgeStatus)
{
// note: this may all be out of date given other evolutions (and it was previously buggy)
// TODO (required): revisit this logic some more
// this exists because if we have truncated a range we cannot seek votes to advance the state machine, so we may not be able to reach a quorum
// but, equally, we may not propagate the GC point to all replicas.
// the current logic has pitfalls when *some* replica has a GC point that others cannot reach.
// we should reconsider and at least
// 1) ensure GC points can be propagated between nodes;
// 2) broaden cases where we can derive that the command has not been decided and send invalidated/erased to everyone
// TODO (required): we have another edge cases to consider here only when we don't have a FullRoute and
// we report that the outcome is durable - which may not be the case for all of the shards.
// The home shard will stop attempting to recover the transaction in this case.
// This is perhaps not even a problem, and requires that no healthy home shard even has the FullRoute.
// Other shards would be expected to coordinate the invalidation of this transaction themselves.
Invariants.require(maxNotTruncated == null || !maxNotTruncated.maxKnowledgeStatus.hasBeen(Status.PreCommitted));
isDone = true;
callback.accept(TRUNCATED_DURABLE_OR_INVALIDATED, null);
return;
}
// if we have witnessed the transaction, but are able to invalidate, do we want to proceed?
// Probably simplest to do so, but perhaps better for user if we don't.
Ranges ranges = Ranges.of(tracker.promisedShard().range);
// we look up by TxnId at the target node, so it's fine to pick a RoutingKey even if it's a range transaction
RoutingKey someKey = invalidateWith.slice(ranges).get(0).someIntersectingRoutingKey(ranges);
proposeInvalidate(node, ballot, txnId, someKey, (success, fail) -> {
/*
We're now inside our *exactly once* callback we registered with proposeInvalidate, and we need to
make sure we honour our own exactly once semantics with {@code callback}.
So we are responsible for all exception handling.
*/
isDone = true;
if (fail != null)
{
callback.accept(null, fail);
}
else
{
try
{
commitInvalidate();
}
catch (Throwable t)
{
callback.accept(null, t);
}
}
});
}