in accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java [138:199]
public void contact(Node.Id to, Ranges ranges)
{
Key key = new Key(to, ranges);
inflight.put(key, starting(to, ranges));
Ranges ownedRanges = ownedRangesForNode(to);
Invariants.requireArgument(ownedRanges.containsAll(ranges), "Got a reply from %s for ranges %s, but owned ranges %s does not contain all the ranges", to, ranges, ownedRanges);
PartialDeps partialDeps = syncPoint.waitFor.intersecting(ranges);
node.send(to, newFetchRequest(syncPoint.syncId.epoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadReply>()
{
@Override
public void onSuccess(Node.Id from, ReadReply reply)
{
if (!reply.isOk())
{
if (reply == Insufficient)
{
CoordinateSyncPoint.sendApply(node, from, syncPoint);
}
else
{
fail(to, new RuntimeException(reply.toString()));
inflight.remove(key).cancel();
if (reply != Redundant)
throw new UnhandledEnum((CommitOrReadNack)reply);
// too late, sync point has been erased
// TODO (desired): stop fetch sync points from garbage collecting too quickly
}
return;
}
FetchResponse ok = (FetchResponse) reply;
Ranges received;
if (ok.unavailable != null)
{
unavailable(to, ok.unavailable);
if (ok.data == null)
{
inflight.remove(key).cancel();
return;
}
received = ranges.without(ok.unavailable);
}
else
{
received = ranges;
}
// TODO (expected): make sure it works if invoked in either order
inflight.remove(key).started(ok.safeToReadAfter);
onReadOk(to, commandStore, ok.data, received);
// received must be invoked after submitting the persistence future, as it triggers onDone
// which creates a ReducingFuture over {@code persisting}
}
@Override
public void onFailure(Node.Id from, Throwable failure)
{
inflight.remove(key).cancel();
fail(from, failure);
}
});
}