in accord-core/src/main/java/accord/topology/TopologyManager.java [355:401]
public Topologies withUnsyncedEpochs(Unseekables<?, ?> select, long minEpoch, long maxEpoch)
{
Invariants.checkArgument(minEpoch <= maxEpoch, "min epoch %d > max %d", minEpoch, maxEpoch);
Epochs snapshot = epochs;
if (maxEpoch == Long.MAX_VALUE) maxEpoch = snapshot.currentEpoch;
else Invariants.checkState(snapshot.currentEpoch >= maxEpoch, "current epoch %d < max %d", snapshot.currentEpoch, maxEpoch);
EpochState maxEpochState = nonNull(snapshot.get(maxEpoch));
if (minEpoch == maxEpoch && maxEpochState.syncCompleteFor(select))
return new Single(sorter, maxEpochState.global.forSelection(select, Topology.OnUnknown.REJECT));
int start = (int)(snapshot.currentEpoch - maxEpoch);
int limit = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch, snapshot.epochs.length));
int count = limit - start;
while (limit < snapshot.epochs.length && !snapshot.epochs[limit - 1].syncCompleteFor(select))
{
++count;
++limit;
}
// We need to ensure we include ownership information in every epoch for all nodes we contact in any epoch
// So we first collect the set of nodes we will contact, before selecting the affected shards and nodes in each epoch
Set<Id> nodes = new LinkedHashSet<>();
for (int i = start; i < limit ; ++i)
{
EpochState epochState = snapshot.epochs[i];
if (epochState.epoch() < minEpoch)
epochState.global.visitNodeForKeysOnceOrMore(select, Topology.OnUnknown.IGNORE, EpochState::shardIsUnsynced, epochState, nodes::add);
else
epochState.global.visitNodeForKeysOnceOrMore(select, Topology.OnUnknown.IGNORE, nodes::add);
}
Invariants.checkState(!nodes.isEmpty(), "Unable to find an epoch that contained %s", select);
Topologies.Multi topologies = new Topologies.Multi(sorter, count);
for (int i = start; i < limit ; ++i)
{
EpochState epochState = snapshot.epochs[i];
if (epochState.epoch() < minEpoch)
topologies.add(epochState.global.forSelection(select, Topology.OnUnknown.IGNORE, nodes, EpochState::shardIsUnsynced, epochState));
else
topologies.add(epochState.global.forSelection(select, Topology.OnUnknown.IGNORE, nodes, (ignore, idx) -> true, null));
}
Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch that contained %s", select);
return topologies;
}