private void recover()

in accord-core/src/main/java/accord/coordinate/Recover.java [243:430]


    private void recover()
    {
        Invariants.require(!isBallotPromised);
        isBallotPromised = true;

        SortedListMap<Id, RecoverOk> recoverOks = this.recoverOks;
        if (!Invariants.debug()) this.recoverOks = null;
        List<RecoverOk> recoverOkList = recoverOks.valuesAsNullableList();
        RecoverOk acceptOrCommit = maxAccepted(recoverOkList);
        RecoverOk acceptOrCommitNotTruncated = acceptOrCommit == null || acceptOrCommit.status != Status.Truncated
                                               ? acceptOrCommit : maxAcceptedNotTruncated(recoverOkList);

        if (acceptOrCommitNotTruncated != null)
        {
            Timestamp executeAt = acceptOrCommitNotTruncated.executeAt;
            Status status; {
                Status tmp = acceptOrCommitNotTruncated.status;
                if (committedExecuteAt != null)
                {
                    Invariants.require(acceptOrCommitNotTruncated.status.compareTo(Status.PreCommitted) < 0 || executeAt.equals(committedExecuteAt));
                    // if we know from a prior Accept attempt that this is committed we can go straight to the commit phase
                    if (tmp == AcceptedMedium || tmp == AcceptedSlow)
                        tmp = Status.Committed;
                }
                status = tmp;
            }

            switch (status)
            {
                case Truncated: throw illegalState("Truncate should be filtered");
                case Invalidated:
                {
                    commitInvalidate(invalidateUntil(recoverOks));
                    return;
                }

                case AcceptedInvalidate:
                {
                    invalidate(recoverOks);
                    return;
                }

                case NotDefined:
                case PreAccepted:
                    throw illegalState("Should only be possible to have Accepted or later commands");
            }

            LatestDeps.Merge merge = mergeDeps(recoverOkList);
            Participants<?> await = merge.notAccepted(route);
            awaitPartialEarlier(recoverOkList, await, () -> {
                switch (status)
                {
                    default: throw new UnhandledEnum(status);
                    case Applied:
                    case PreApplied:
                    {
                        withStableDeps(merge, executeAt, (i, t) -> node.agent().acceptAndWrap(i, t), stableDeps -> {
                            adapter.persist(node, tracker.topologies(), route, txnId, txn, executeAt, stableDeps, acceptOrCommitNotTruncated.writes, acceptOrCommitNotTruncated.result, (i, t) -> node.agent().acceptAndWrap(i, t));
                        });
                        accept(acceptOrCommitNotTruncated.result, null);
                        return;
                    }

                    case Stable:
                    {
                        withStableDeps(merge, executeAt, this, stableDeps -> {
                            adapter.execute(node, tracker.topologies(), route, RECOVER, ExecuteFlags.none(), txnId, txn, executeAt, stableDeps, stableDeps, this);
                        });
                        return;
                    }

                    case PreCommitted:
                    case Committed:
                    {
                        withCommittedDeps(merge, executeAt, this, committedDeps -> {
                            adapter.stabilise(node, tracker.topologies(), route, ballot, txnId, txn, executeAt, committedDeps, this);
                        });
                        return;
                    }

                    case AcceptedSlow:
                    case AcceptedMedium:
                    {
                        // TODO (desired): if we have a quorum of Accept with matching ballot or proposal we can go straight to Commit
                        // TODO (desired): if we didn't find Accepted in *every* shard, consider invalidating for consistency of behaviour
                        //     however, note that we may have taken the fast path and recovered, so we can only do this if acceptedOrCommitted=Ballot.ZERO
                        //     (otherwise recovery was attempted and did not invalidate, so it must have determined it needed to complete)
                        Deps proposeDeps = merge.mergeProposal();
                        propose(SLOW, acceptOrCommitNotTruncated.executeAt, proposeDeps);
                    }
                }
            });
            return;
        }

        if (acceptOrCommit != null && acceptOrCommit != acceptOrCommitNotTruncated)
        {
            // TODO (required): match logic in Invalidate; we need to know which keys have been invalidated
            Topologies topologies = tracker.topologies();
            boolean allShardsTruncated = true;
            for (int topologyIndex = 0 ; topologyIndex < topologies.size() ; ++topologyIndex)
            {
                Topology topology = topologies.get(topologyIndex);
                for (Shard shard : topology.shards())
                {
                    RecoverOk maxReply = maxAccepted(recoverOks.select(shard.nodes));
                    allShardsTruncated &= maxReply.status == Status.Truncated;
                }
                if (allShardsTruncated)
                {
                    // TODO (required, correctness): this is not a safe inference in the case of an ErasedOrInvalidOrVestigial response.
                    //   We need to tighten up the inference and spread of truncation/invalid outcomes.
                    //   In this case, at minimum this can lead to liveness violations as the home shard stops coordinating
                    //   a command that it hasn't invalidated, but nor is it possible to recover. This happens because
                    //   when the home shard shares all of its replicas with another shard that has autonomously invalidated
                    //   the transaction, so that all received InvalidateReply show truncation (when in fact this is only partial).
                    //   We could paper over this, but better to revisit and provide stronger invariants we can rely on.
                    isDone = true;
                    callback.accept(TRUNCATED_DURABLE_OR_INVALIDATED, null);
                    return;
                }
            }
        }

        Invariants.require(committedExecuteAt == null || committedExecuteAt.equals(txnId));

        boolean coordinatorInRecoveryQuorum = recoverOks.get(txnId.node) != null;
        Participants<?> extraCoordVotes = extraCoordinatorVotes(txnId, coordinatorInRecoveryQuorum, recoverOkList);
        Participants<?> extraRejects = Deps.merge(recoverOkList, recoverOkList.size(), List::get, ok -> ok.laterCoordRejects)
                                           .intersecting(route, id -> !recoverOks.containsKey(id.node));
        InferredFastPath fastPath;
        if (txnId.hasPrivilegedCoordinator() && coordinatorInRecoveryQuorum) fastPath = Reject;
        else fastPath = merge(
            supersedingRejects(recoverOkList) ? Reject : Unknown,
            tracker.inferFastPathDecision(txnId, extraCoordVotes, extraRejects)
        );

        switch (fastPath)
        {
            case Reject:
            {
                invalidate(recoverOks);
                return;
            }
            case Accept:
            {
                // we still have to wait for earlier transactions to decide themselves so we don't accidentally include
                // a non-fastpath transaction in our dependencies and permit it to conclude it is safe to execute.
                // So, we fall-through to Unknown condition - though we don't in principle need to wait for any future transactions
            }
            case Unknown:
            {
                // should all be PreAccept
                Deps earlierWait = Deps.merge(recoverOkList, recoverOkList.size(), List::get, ok -> ok.earlierWait);
                Deps earlierNoWait = Deps.merge(recoverOkList, recoverOkList.size(), List::get, ok -> ok.earlierNoWait);
                earlierWait = earlierWait.without(earlierNoWait);
                Deps laterWitnessedCoordRejects = Deps.merge(recoverOks, recoverOks.domainSize(), (map, i) -> selectCoordinatorReplies(map.getKey(i), map.getValue(i)), Function.identity());

                if (!earlierWait.isEmpty() || !laterWitnessedCoordRejects.isEmpty())
                {
                    // If there exist commands that were proposed a later execution time than us that have not witnessed us,
                    // we have to be certain these commands have not successfully committed without witnessing us (thereby
                    // ruling out a fast path decision for us and changing our recovery decision).
                    // So, we wait for these commands to commit and recompute supersedingRejects for them.
                    AsyncChainCombiner.reduce(awaitEarlier(node, earlierWait, HasCommittedDeps),
                                              awaitLater(node, laterWitnessedCoordRejects, CommittedOrNotFastPathCommit, extraCoordVotes),
                                              InferredFastPath::merge)
                                      .begin((inferred, failure) -> {
                                          if (failure != null) accept(null, failure);
                                          else
                                          {
                                              switch (inferred)
                                              {
                                                  default: throw new UnhandledEnum(inferred);
                                                  case Accept: propose(SLOW, txnId, recoverOkList); break;
                                                  case Unknown: retry(committedExecuteAt); break;
                                                  case Reject: invalidate(recoverOks); break;
                                              }
                                          }
                                      });
                }
                else
                {
                    propose(SLOW, txnId, recoverOkList);
                }
            }
        }
    }