in accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java [123:190]
public void contact(Node.Id to, Ranges ranges)
{
Key key = new Key(to, ranges);
inflight.put(key, starting(to, ranges));
Ranges ownedRanges = ownedRangesForNode(to);
Invariants.checkArgument(ownedRanges.containsAll(ranges), "Got a reply from %s for ranges %s, but owned ranges %s does not contain all the ranges", to, ranges, ownedRanges);
PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges);
node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>()
{
@Override
public void onSuccess(Node.Id from, ReadData.ReadReply reply)
{
if (!reply.isOk())
{
fail(to, new RuntimeException(reply.toString()));
inflight.remove(key).cancel();
switch ((ReadData.ReadNack) reply)
{
default: throw new AssertionError("Unhandled enum: " + reply);
case Invalid:
case Redundant:
case NotCommitted:
throw new AssertionError(String.format("Unexpected reply: %s", reply));
case Error:
// TODO (required): ensure errors are propagated to coordinators and can be logged
}
return;
}
FetchResponse ok = (FetchResponse) reply;
Ranges received;
if (ok.unavailable != null)
{
unavailable(to, ok.unavailable);
if (ok.data == null)
{
inflight.remove(key).cancel();
return;
}
received = ranges.difference(ok.unavailable);
}
else
{
received = ranges;
}
// TODO (now): make sure it works if invoked in either order
inflight.remove(key).started(ok.maxApplied);
onReadOk(to, commandStore, ok.data, received);
// received must be invoked after submitting the persistence future, as it triggers onDone
// which creates a ReducingFuture over {@code persisting}
}
@Override
public void onFailure(Node.Id from, Throwable failure)
{
inflight.remove(key).cancel();
fail(from, failure);
}
@Override
public void onCallbackFailure(Node.Id from, Throwable failure)
{
// TODO (soon)
logger.error("Fetch coordination failure from " + from, failure);
}
});
}