AsyncResult awaitLater()

in accord-core/src/main/java/accord/coordinate/Recover.java [578:644]


    AsyncResult<InferredFastPath> awaitLater(Node node, Deps waitOn, BlockedUntil blockedUntil, @Nullable Participants<?> selfCoordVotes)
    {
        if (waitOn.isEmpty())
            return AsyncResults.success(InferredFastPath.Accept);

        Participants<?> reliesOnAwaitIdCoordVote;
        Topology topology = tracker.topologies().current();
        switch (route.domain())
        {
            default: throw new UnhandledEnum(route.domain());
            case Key:
                try (BufferList<RoutingKey> tmp = new BufferList<>())
                {
                    for (int j = 0 ; j < route.size() ; ++j)
                    {
                        RoutingKey key = (RoutingKey)route.get(j);
                        RecoveryTracker.RecoveryShardTracker shardTracker = tracker.get(0, topology.indexForKey(key));
                        if (shardTracker.fastPathReliesOnUnwitnessedCoordinatorVote(txnId, selfCoordVotes))
                            tmp.add(key);
                    }
                    reliesOnAwaitIdCoordVote = RoutingKeys.ofSortedUnique(tmp);
                }
                break;
            case Range:
                try (BufferList<Range> tmp = new BufferList<>())
                {
                    for (int j = 0 ; j < route.size() ; ++j)
                    {
                        Range range = (Range)route.get(j);
                        for (int k = topology.indexForRange(range, CEIL), maxk = topology.indexForRange(range, FLOOR); k <= maxk ; k++)
                        {
                            RecoveryTracker.RecoveryShardTracker shardTracker = tracker.get(0, k);
                            if (shardTracker.fastPathReliesOnUnwitnessedCoordinatorVote(txnId, selfCoordVotes))
                                tmp.add(range.slice(shardTracker.shard.range));
                        }
                    }
                    reliesOnAwaitIdCoordVote = Ranges.ofSortedAndDeoverlapped(tmp.toArray(Range[]::new));
                }
        }

        if (reliesOnAwaitIdCoordVote.isEmpty())
            return AsyncResults.success(InferredFastPath.Accept);

        long requireEpoch = waitOn.maxTxnId(txnId).epoch();
        return node.withEpoch(requireEpoch, () -> {
            TxnId recoverId = this.txnId;
            List<AsyncResult<InferredFastPath>> requests = new ArrayList<>();
            for (int i = 0 ; i < waitOn.txnIdCount() ; ++i)
            {
                TxnId awaitId = waitOn.txnId(i);
                Invariants.require(awaitId.is(PrivilegedCoordinatorWithDeps));
                Invariants.require(awaitId.compareTo(recoverId) > 0);
                Participants<?> participants = waitOn.participants(awaitId)
                                                     .intersecting(reliesOnAwaitIdCoordVote, Minimal);
                if (participants.isEmpty())
                    continue;

                Topologies topologies;
                if (tracker.topologies().containsEpoch(awaitId.epoch())) topologies = tracker.topologies().selectEpoch(participants, awaitId.epoch(), SHARE);
                else topologies = node.topology().forEpoch(participants, awaitId.epoch(), SHARE);
                requests.add(SynchronousRecoverAwait.awaitAny(node, topologies, awaitId, blockedUntil, true, participants, recoverId));
            }
            if (requests.isEmpty())
                return AsyncResults.success(InferredFastPath.Accept);
            return AsyncChainCombiner.reduce(requests, InferredFastPath::merge).beginAsResult();
        }).beginAsResult();
    }