public Topologies withUnsyncedEpochs()

in accord-core/src/main/java/accord/topology/TopologyManager.java [355:401]


    public Topologies withUnsyncedEpochs(Unseekables<?, ?> select, long minEpoch, long maxEpoch)
    {
        Invariants.checkArgument(minEpoch <= maxEpoch, "min epoch %d > max %d", minEpoch, maxEpoch);
        Epochs snapshot = epochs;

        if (maxEpoch == Long.MAX_VALUE) maxEpoch = snapshot.currentEpoch;
        else Invariants.checkState(snapshot.currentEpoch >= maxEpoch, "current epoch %d < max %d", snapshot.currentEpoch, maxEpoch);

        EpochState maxEpochState = nonNull(snapshot.get(maxEpoch));
        if (minEpoch == maxEpoch && maxEpochState.syncCompleteFor(select))
            return new Single(sorter, maxEpochState.global.forSelection(select, Topology.OnUnknown.REJECT));

        int start = (int)(snapshot.currentEpoch - maxEpoch);
        int limit = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch, snapshot.epochs.length));
        int count = limit - start;
        while (limit < snapshot.epochs.length && !snapshot.epochs[limit - 1].syncCompleteFor(select))
        {
            ++count;
            ++limit;
        }

        // We need to ensure we include ownership information in every epoch for all nodes we contact in any epoch
        // So we first collect the set of nodes we will contact, before selecting the affected shards and nodes in each epoch
        Set<Id> nodes = new LinkedHashSet<>();
        for (int i = start; i < limit ; ++i)
        {
            EpochState epochState = snapshot.epochs[i];
            if (epochState.epoch() < minEpoch)
                epochState.global.visitNodeForKeysOnceOrMore(select, Topology.OnUnknown.IGNORE, EpochState::shardIsUnsynced, epochState, nodes::add);
            else
                epochState.global.visitNodeForKeysOnceOrMore(select, Topology.OnUnknown.IGNORE, nodes::add);
        }
        Invariants.checkState(!nodes.isEmpty(), "Unable to find an epoch that contained %s", select);

        Topologies.Multi topologies = new Topologies.Multi(sorter, count);
        for (int i = start; i < limit ; ++i)
        {
            EpochState epochState = snapshot.epochs[i];
            if (epochState.epoch() < minEpoch)
                topologies.add(epochState.global.forSelection(select, Topology.OnUnknown.IGNORE, nodes, EpochState::shardIsUnsynced, epochState));
            else
                topologies.add(epochState.global.forSelection(select, Topology.OnUnknown.IGNORE, nodes, (ignore, idx) -> true, null));
        }
        Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch that contained %s", select);

        return topologies;
    }