private void runInternal()

in accord-core/src/main/java/accord/impl/progresslog/WaitingState.java [392:481]


    private void runInternal(SafeCommandStore safeStore, SafeCommand safeCommand, DefaultProgressLog owner)
    {
        BlockedUntil blockedUntil = blockedUntil();
        Command command = safeCommand.current();
        Invariants.require(!owner.hasActive(Waiting, txnId));
        Invariants.require(command.saveStatus().compareTo(blockedUntil.unblockedFrom) < 0,
                           "Command has met desired criteria (%s) but progress log entry has not been cancelled: %s", blockedUntil.unblockedFrom, command);

        set(safeStore, owner, blockedUntil, Querying);
        TxnId txnId = safeCommand.txnId();
        // first make sure we have enough information to obtain the command locally
        Timestamp executeAt = command.executeAtIfKnown();
        Participants<?> fetchKeys = Invariants.nonNull(command.maxContactable());

        if (!Route.isRoute(fetchKeys))
        {
            long lowEpoch = updateLowEpoch(safeStore, txnId, command);
            long highEpoch = updateHighEpoch(safeStore, txnId, blockedUntil, command, executeAt);
            fetchRoute(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, fetchKeys);
            return;
        }

        Route<?> route = Route.castToRoute(fetchKeys);
        if (homeSatisfies().compareTo(blockedUntil) < 0)
        {
            // first wait until the homeKey has progressed to a point where it can answer our query; we don't expect our shards to know until then anyway
            awaitHomeKey(owner, blockedUntil, txnId, executeAt, route);
            return;
        }

        long prevLowEpoch = readLowEpoch(safeStore, txnId, route);
        long prevHighEpoch = readHighEpoch(safeStore, txnId, route);
        long lowEpoch = updateLowEpoch(safeStore, txnId, command);
        long highEpoch = updateHighEpoch(safeStore, txnId, blockedUntil, command, executeAt);
        // TODO (expected): split into txn and deps sources
        Route<?> slicedRoute = slicedRoute(safeStore, txnId, route, lowEpoch, highEpoch);
        Invariants.require(!slicedRoute.isEmpty());
        if (!command.hasBeen(Status.PreCommitted))
        {
            // we know it has been decided one way or the other by the home shard at least, so we attempt a fetch
            // including the home shard to get us to at least PreCommitted where we can safely wait on individual shards
            fetch(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, slicedRoute, slicedRoute.withHomeKey(), route);
            return;
        }

        // the awaitRoute may be only the home shard, if that is sufficient to indicate the fetchRoute will be able to answer our query;
        // the fetchRoute may also be only the home shard, if that is sufficient to answer our query (e.g. for executeAt)
        Route<?> awaitRoute = awaitRoute(slicedRoute, blockedUntil);
        Route<?> fetchRoute = fetchRoute(slicedRoute, awaitRoute, blockedUntil, safeStore, lowEpoch, txnId, highEpoch, route);

        if (awaitRoute.isHomeKeyOnlyRoute())
        {
            // at this point we can switch to polling as we know someone has the relevant state
            fetch(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, slicedRoute, fetchRoute, route);
            return;
        }

        int roundSize = awaitRoundSize(awaitRoute);
        if (hasNewLowEpoch(safeStore, txnId, prevLowEpoch, lowEpoch) || hasNewHighEpoch(safeStore, txnId, prevHighEpoch, highEpoch))
        {
            // update round counters because we have changed the epochs involved
            Route<?> prevSlicedRoute = slicedRoute(safeStore, txnId, route, prevLowEpoch, prevHighEpoch);
            Route<?> prevAwaitRoute = awaitRoute(prevSlicedRoute, blockedUntil);
            int prevRoundSize = awaitRoundSize(prevAwaitRoute);
            int prevRoundIndex = awaitRoundIndex(prevRoundSize);
            int prevRoundStart = prevRoundIndex * prevRoundSize;
            int newRoundIndex = -1;
            if (prevRoundStart < prevAwaitRoute.size())
                newRoundIndex = (int)awaitRoute.findNextSameKindIntersection(0, (Unseekables)prevAwaitRoute, prevRoundStart + prevRoundIndex);
            if (newRoundIndex < 0)
                newRoundIndex = awaitRoute.size();
            updateAwaitRound(newRoundIndex, roundSize);
        }

        int roundIndex = awaitRoundIndex(roundSize);
        int roundStart = roundIndex * roundSize;
        if (roundStart >= awaitRoute.size())
        {
            // all of the shards we are awaiting have been processed and found at least one replica that has the state needed to answer our query
            // at this point we can switch to polling as we know someone has the relevant state
            fetch(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, slicedRoute, fetchRoute, route);
            return;
        }

        int roundEnd = Math.min(roundStart + roundSize, awaitRoute.size());
        awaitRoute = awaitRoute.slice(roundStart, roundEnd);
        // TODO (desired): use some mechanism (e.g. random chance or another counter)
        //   to either periodically fetch the whole remaining route or gradually increase the slice length
        awaitSlice(owner, blockedUntil, txnId, executeAt, awaitRoute, (roundIndex << 1) | 1);
    }