public void contact()

in accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java [123:190]


    public void contact(Node.Id to, Ranges ranges)
    {
        Key key = new Key(to, ranges);
        inflight.put(key, starting(to, ranges));
        Ranges ownedRanges = ownedRangesForNode(to);
        Invariants.checkArgument(ownedRanges.containsAll(ranges), "Got a reply from %s for ranges %s, but owned ranges %s does not contain all the ranges", to, ranges, ownedRanges);
        PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges);
        node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>()
        {
            @Override
            public void onSuccess(Node.Id from, ReadData.ReadReply reply)
            {
                if (!reply.isOk())
                {
                    fail(to, new RuntimeException(reply.toString()));
                    inflight.remove(key).cancel();
                    switch ((ReadData.ReadNack) reply)
                    {
                        default: throw new AssertionError("Unhandled enum: " + reply);
                        case Invalid:
                        case Redundant:
                        case NotCommitted:
                            throw new AssertionError(String.format("Unexpected reply: %s", reply));
                        case Error:
                            // TODO (required): ensure errors are propagated to coordinators and can be logged
                    }
                    return;
                }

                FetchResponse ok = (FetchResponse) reply;
                Ranges received;
                if (ok.unavailable != null)
                {
                    unavailable(to, ok.unavailable);
                    if (ok.data == null)
                    {
                        inflight.remove(key).cancel();
                        return;
                    }
                    received = ranges.difference(ok.unavailable);
                }
                else
                {
                    received = ranges;
                }

                // TODO (now): make sure it works if invoked in either order
                inflight.remove(key).started(ok.maxApplied);
                onReadOk(to, commandStore, ok.data, received);
                // received must be invoked after submitting the persistence future, as it triggers onDone
                // which creates a ReducingFuture over {@code persisting}
            }

            @Override
            public void onFailure(Node.Id from, Throwable failure)
            {
                inflight.remove(key).cancel();
                fail(from, failure);
            }

            @Override
            public void onCallbackFailure(Node.Id from, Throwable failure)
            {
                // TODO (soon)
                logger.error("Fetch coordination failure from " + from, failure);
            }
        });
    }