protected void onDone()

in accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java [146:297]


    protected void onDone(Success success, Throwable failure)
    {
        if (failure != null)
        {
            callback.accept(null, failure);
            return;
        }

        CheckStatusOkFull full = ((CheckStatusOkFull) this.merged).finish(route, route, route, success.withQuorum, previouslyKnownToBeInvalidIf);
        Known known = full.knownFor(txnId, route, route);

        // TODO (required): audit this logic, and centralise with e.g. FetchData inferences
        // TODO (expected): skip straight to ExecuteTxn if we have a Stable reply from each shard
        switch (known.outcome())
        {
            default: throw new AssertionError();
            case Unknown:
                if (known.definition().isKnown())
                {
                    Txn txn = full.partialTxn.reconstitute(route);
                    Recover.recover(node, txnId, txn, route, reportLowEpoch, reportHighEpoch, callback);
                }
                else if (!known.definition().isOrWasKnown())
                {
                    if (witnessedByInvalidation != null && witnessedByInvalidation.compareTo(Status.PreAccepted) > 0)
                        throw illegalState("We previously invalidated, finding a status that should be recoverable");
                    Invalidate.invalidate(node, txnId, route, witnessedByInvalidation != null, reportLowEpoch, reportHighEpoch, callback);
                }
                else
                {
                    callback.accept(full.toProgressToken(), null);
                }
                break;

            case WasApply:
            case Apply:
                if (!known.isDefinitionKnown())
                {
                    if (!known.isTruncated() && !known.isInvalidated())
                    {
                        // we must have raced with a successful apply, so should simply abort
                        callback.accept(ProgressToken.NONE, null);
                        return;
                    }

                    // TODO (expected): if we determine new durability, propagate it
                    CheckStatusOkFull propagate;
                    if (!full.map.hasFullyTruncated(route))
                    {
                        // we might have only part of the full transaction, and a shard may have truncated;
                        // in this case we want to skip straight to apply, but only for the shards that haven't truncated
                        Route<?> trySendTo = route.without(full.map.matchingRanges(minMax -> minMax.min.isTruncated()));
                        if (!trySendTo.isEmpty())
                        {
                            if (known.isInvalidated())
                            {
                                Commit.Invalidate.commitInvalidate(node, txnId, trySendTo, txnId);
                            }
                            else
                            {
                                known = full.knownFor(txnId, trySendTo, trySendTo);
                                if (known.isDefinitionKnown() && known.is(ApplyAtKnown) && known.outcome() == Apply)
                                {
                                    if (!known.is(DepsKnown))
                                    {
                                        Invariants.require(txnId.isSystemTxn() || full.partialTxn.covers(trySendTo));
                                        Participants<?> haveStable = full.map.knownFor(Known.DepsOnly, route);
                                        Route<?> haveUnstable = route.without(haveStable);
                                        Deps stable = full.stableDeps.reconstitutePartial(haveStable).asFullUnsafe();

                                        LatestDeps.withStable(node.coordinationAdapter(txnId, Recovery), node, txnId, full.executeAt, full.partialTxn, stable, haveUnstable, trySendTo, SLICE, route, callback, deps -> {
                                            Deps stableDeps = deps.intersecting(trySendTo);
                                            node.coordinationAdapter(txnId, Recovery).persist(node, null, trySendTo, trySendTo, SLICE, route, txnId, full.partialTxn, full.executeAt, stableDeps, full.writes, full.result, null);
                                        });
                                    }
                                    else
                                    {
                                        Invariants.require(full.stableDeps.covers(trySendTo));
                                        Invariants.require(txnId.isSystemTxn() || full.partialTxn.covers(trySendTo));
                                        node.coordinationAdapter(txnId, Recovery).persist(node, null, trySendTo, trySendTo, SLICE, route, txnId, full.partialTxn, full.executeAt, full.stableDeps, full.writes, full.result, null);
                                    }
                                }
                            }
                            propagate = full;
                        }
                        else
                        {
                            propagate = full.merge(Majority);
                        }
                    }
                    else
                    {
                        propagate = full;
                    }

                    Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, reportLowEpoch, reportHighEpoch, success.withQuorum, route, route, null, propagate, (s, f) -> callback.accept(f == null ? propagate.toProgressToken() : null, f));
                    break;
                }

                Txn txn = full.partialTxn.reconstitute(route);
                if (known.is(ApplyAtKnown) && known.outcome() == Apply)
                {
                    Deps deps;
                    Route<?> missingDeps;
                    if (known.is(DepsKnown))
                    {
                        deps = full.stableDeps.reconstitute(route);
                        missingDeps = route.slice(0, 0);
                    }
                    else
                    {
                        Participants<?> hasDeps = full.map.knownFor(Known.DepsOnly, route);
                        missingDeps = route.without(hasDeps);
                        if (full.stableDeps == null)
                        {
                            Invariants.require(hasDeps.isEmpty());
                            deps = Deps.NONE;
                        }
                        else
                        {
                            // convert to plain Deps as when we merge with latest deps we may erroneously keep the
                            // PartialDeps if e.g. an empty range of deps is found
                            deps = new Deps(full.stableDeps.reconstitutePartial(hasDeps));
                        }
                    }
                    LatestDeps.withStable(node.coordinationAdapter(txnId, Recovery), node, txnId, full.executeAt, full.partialTxn, deps, missingDeps, route, SHARE, route, callback, mergedDeps -> {
                        node.withEpoch(full.executeAt.epoch(), node.agent(), t -> WrappableException.wrap(t), () -> {
                            node.coordinationAdapter(txnId, Recovery).persist(node, topologies, route, txnId, txn, full.executeAt, mergedDeps, full.writes, full.result, (s, f) -> {
                                callback.accept(f == null ? APPLIED : null, f);
                            });
                        });
                    });
                }
                else
                {
                    Recover.recover(node, txnId, txn, route, callback);
                }
                break;

            case Abort:
                if (witnessedByInvalidation != null && witnessedByInvalidation.hasBeen(Status.PreCommitted))
                    throw illegalState("We previously invalidated, finding a status that should be recoverable");

                Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, reportLowEpoch, reportHighEpoch, success.withQuorum, route, route, null, full, (s, f) -> callback.accept(f == null ? INVALIDATED : null, f));
                break;

            case Erased:
                // we should only be able to hit the Erased case if every participating shard has advanced past this TxnId, so we don't need to recover it
                Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, sourceEpoch, reportLowEpoch, reportHighEpoch, success.withQuorum, route, route, null, full, (s, f) -> callback.accept(f == null ? TRUNCATED_DURABLE_OR_INVALIDATED : null, f));
                break;
        }
    }