in accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java [146:297]
protected void onDone(Success success, Throwable failure)
{
if (failure != null)
{
callback.accept(null, failure);
return;
}
CheckStatusOkFull full = ((CheckStatusOkFull) this.merged).finish(route, route, route, success.withQuorum, previouslyKnownToBeInvalidIf);
Known known = full.knownFor(txnId, route, route);
// TODO (required): audit this logic, and centralise with e.g. FetchData inferences
// TODO (expected): skip straight to ExecuteTxn if we have a Stable reply from each shard
switch (known.outcome())
{
default: throw new AssertionError();
case Unknown:
if (known.definition().isKnown())
{
Txn txn = full.partialTxn.reconstitute(route);
Recover.recover(node, txnId, txn, route, reportLowEpoch, reportHighEpoch, callback);
}
else if (!known.definition().isOrWasKnown())
{
if (witnessedByInvalidation != null && witnessedByInvalidation.compareTo(Status.PreAccepted) > 0)
throw illegalState("We previously invalidated, finding a status that should be recoverable");
Invalidate.invalidate(node, txnId, route, witnessedByInvalidation != null, reportLowEpoch, reportHighEpoch, callback);
}
else
{
callback.accept(full.toProgressToken(), null);
}
break;
case WasApply:
case Apply:
if (!known.isDefinitionKnown())
{
if (!known.isTruncated() && !known.isInvalidated())
{
// we must have raced with a successful apply, so should simply abort
callback.accept(ProgressToken.NONE, null);
return;
}
// TODO (expected): if we determine new durability, propagate it
CheckStatusOkFull propagate;
if (!full.map.hasFullyTruncated(route))
{
// we might have only part of the full transaction, and a shard may have truncated;
// in this case we want to skip straight to apply, but only for the shards that haven't truncated
Route<?> trySendTo = route.without(full.map.matchingRanges(minMax -> minMax.min.isTruncated()));
if (!trySendTo.isEmpty())
{
if (known.isInvalidated())
{
Commit.Invalidate.commitInvalidate(node, txnId, trySendTo, txnId);
}
else
{
known = full.knownFor(txnId, trySendTo, trySendTo);
if (known.isDefinitionKnown() && known.is(ApplyAtKnown) && known.outcome() == Apply)
{
if (!known.is(DepsKnown))
{
Invariants.require(txnId.isSystemTxn() || full.partialTxn.covers(trySendTo));
Participants<?> haveStable = full.map.knownFor(Known.DepsOnly, route);
Route<?> haveUnstable = route.without(haveStable);
Deps stable = full.stableDeps.reconstitutePartial(haveStable).asFullUnsafe();
LatestDeps.withStable(node.coordinationAdapter(txnId, Recovery), node, txnId, full.executeAt, full.partialTxn, stable, haveUnstable, trySendTo, SLICE, route, callback, deps -> {
Deps stableDeps = deps.intersecting(trySendTo);
node.coordinationAdapter(txnId, Recovery).persist(node, null, trySendTo, trySendTo, SLICE, route, txnId, full.partialTxn, full.executeAt, stableDeps, full.writes, full.result, null);
});
}
else
{
Invariants.require(full.stableDeps.covers(trySendTo));
Invariants.require(txnId.isSystemTxn() || full.partialTxn.covers(trySendTo));
node.coordinationAdapter(txnId, Recovery).persist(node, null, trySendTo, trySendTo, SLICE, route, txnId, full.partialTxn, full.executeAt, full.stableDeps, full.writes, full.result, null);
}
}
}
propagate = full;
}
else
{
propagate = full.merge(Majority);
}
}
else
{
propagate = full;
}
Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, reportLowEpoch, reportHighEpoch, success.withQuorum, route, route, null, propagate, (s, f) -> callback.accept(f == null ? propagate.toProgressToken() : null, f));
break;
}
Txn txn = full.partialTxn.reconstitute(route);
if (known.is(ApplyAtKnown) && known.outcome() == Apply)
{
Deps deps;
Route<?> missingDeps;
if (known.is(DepsKnown))
{
deps = full.stableDeps.reconstitute(route);
missingDeps = route.slice(0, 0);
}
else
{
Participants<?> hasDeps = full.map.knownFor(Known.DepsOnly, route);
missingDeps = route.without(hasDeps);
if (full.stableDeps == null)
{
Invariants.require(hasDeps.isEmpty());
deps = Deps.NONE;
}
else
{
// convert to plain Deps as when we merge with latest deps we may erroneously keep the
// PartialDeps if e.g. an empty range of deps is found
deps = new Deps(full.stableDeps.reconstitutePartial(hasDeps));
}
}
LatestDeps.withStable(node.coordinationAdapter(txnId, Recovery), node, txnId, full.executeAt, full.partialTxn, deps, missingDeps, route, SHARE, route, callback, mergedDeps -> {
node.withEpoch(full.executeAt.epoch(), node.agent(), t -> WrappableException.wrap(t), () -> {
node.coordinationAdapter(txnId, Recovery).persist(node, topologies, route, txnId, txn, full.executeAt, mergedDeps, full.writes, full.result, (s, f) -> {
callback.accept(f == null ? APPLIED : null, f);
});
});
});
}
else
{
Recover.recover(node, txnId, txn, route, callback);
}
break;
case Abort:
if (witnessedByInvalidation != null && witnessedByInvalidation.hasBeen(Status.PreCommitted))
throw illegalState("We previously invalidated, finding a status that should be recoverable");
Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, reportLowEpoch, reportHighEpoch, success.withQuorum, route, route, null, full, (s, f) -> callback.accept(f == null ? INVALIDATED : null, f));
break;
case Erased:
// we should only be able to hit the Erased case if every participating shard has advanced past this TxnId, so we don't need to recover it
Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, reportLowEpoch, reportHighEpoch, success.withQuorum, route, route, null, full, (s, f) -> callback.accept(f == null ? TRUNCATED_DURABLE_OR_INVALIDATED : null, f));
break;
}
}