in accord-core/src/main/java/accord/coordinate/Recover.java [578:644]
AsyncResult<InferredFastPath> awaitLater(Node node, Deps waitOn, BlockedUntil blockedUntil, @Nullable Participants<?> selfCoordVotes)
{
if (waitOn.isEmpty())
return AsyncResults.success(InferredFastPath.Accept);
Participants<?> reliesOnAwaitIdCoordVote;
Topology topology = tracker.topologies().current();
switch (route.domain())
{
default: throw new UnhandledEnum(route.domain());
case Key:
try (BufferList<RoutingKey> tmp = new BufferList<>())
{
for (int j = 0 ; j < route.size() ; ++j)
{
RoutingKey key = (RoutingKey)route.get(j);
RecoveryTracker.RecoveryShardTracker shardTracker = tracker.get(0, topology.indexForKey(key));
if (shardTracker.fastPathReliesOnUnwitnessedCoordinatorVote(txnId, selfCoordVotes))
tmp.add(key);
}
reliesOnAwaitIdCoordVote = RoutingKeys.ofSortedUnique(tmp);
}
break;
case Range:
try (BufferList<Range> tmp = new BufferList<>())
{
for (int j = 0 ; j < route.size() ; ++j)
{
Range range = (Range)route.get(j);
for (int k = topology.indexForRange(range, CEIL), maxk = topology.indexForRange(range, FLOOR); k <= maxk ; k++)
{
RecoveryTracker.RecoveryShardTracker shardTracker = tracker.get(0, k);
if (shardTracker.fastPathReliesOnUnwitnessedCoordinatorVote(txnId, selfCoordVotes))
tmp.add(range.slice(shardTracker.shard.range));
}
}
reliesOnAwaitIdCoordVote = Ranges.ofSortedAndDeoverlapped(tmp.toArray(Range[]::new));
}
}
if (reliesOnAwaitIdCoordVote.isEmpty())
return AsyncResults.success(InferredFastPath.Accept);
long requireEpoch = waitOn.maxTxnId(txnId).epoch();
return node.withEpoch(requireEpoch, () -> {
TxnId recoverId = this.txnId;
List<AsyncResult<InferredFastPath>> requests = new ArrayList<>();
for (int i = 0 ; i < waitOn.txnIdCount() ; ++i)
{
TxnId awaitId = waitOn.txnId(i);
Invariants.require(awaitId.is(PrivilegedCoordinatorWithDeps));
Invariants.require(awaitId.compareTo(recoverId) > 0);
Participants<?> participants = waitOn.participants(awaitId)
.intersecting(reliesOnAwaitIdCoordVote, Minimal);
if (participants.isEmpty())
continue;
Topologies topologies;
if (tracker.topologies().containsEpoch(awaitId.epoch())) topologies = tracker.topologies().selectEpoch(participants, awaitId.epoch(), SHARE);
else topologies = node.topology().forEpoch(participants, awaitId.epoch(), SHARE);
requests.add(SynchronousRecoverAwait.awaitAny(node, topologies, awaitId, blockedUntil, true, participants, recoverId));
}
if (requests.isEmpty())
return AsyncResults.success(InferredFastPath.Accept);
return AsyncChainCombiner.reduce(requests, InferredFastPath::merge).beginAsResult();
}).beginAsResult();
}