in accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java [170:222]
public boolean processPending()
{
if (pending.size() == recurring)
return false;
Object next = pending.poll();
if (next == null)
return false;
if (next instanceof Packet)
{
Packet deliver = (Packet) next;
Node on = lookup.apply(deliver.dest);
switch (deliver.body.type)
{
case init:
throw new IllegalStateException();
case txn:
err.println(clock++ + " RECV " + deliver);
err.flush();
on.receive((MaelstromRequest)deliver.body, deliver.src, deliver);
break;
default:
// Drop the message if it goes across the partition
boolean drop = !(partitionSet.contains(deliver.src) && partitionSet.contains(deliver.dest)
|| !partitionSet.contains(deliver.src) && !partitionSet.contains(deliver.dest));
if (drop)
{
err.println(clock++ + " DROP " + deliver);
err.flush();
break;
}
err.println(clock++ + " RECV " + deliver);
err.flush();
Object body = ((Wrapper)deliver.body).body;
// for some reason InformOfTxnReply has deliver.body.in_reply_to == Body.SENTINEL_MSG_ID, so is unique
// for all reply types
if (deliver.body.in_reply_to > Body.SENTINEL_MSG_ID || body instanceof Reply)
{
Reply reply = (Reply) body;
SafeCallback callback = sinks.get(deliver.dest).callbacks.remove(deliver.body.in_reply_to);
if (callback != null)
callback.success(deliver.src, reply);
}
else on.receive((Request) body, deliver.src, deliver);
}
}
else
{
((Runnable) next).run();
}
return true;
}