public boolean processPending()

in accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java [170:222]


    public boolean processPending()
    {
        if (pending.size() == recurring)
            return false;

        Object next = pending.poll();
        if (next == null)
            return false;

        if (next instanceof Packet)
        {
            Packet deliver = (Packet) next;
            Node on = lookup.apply(deliver.dest);
            switch (deliver.body.type)
            {
                case init:
                    throw new IllegalStateException();
                case txn:
                    err.println(clock++ + " RECV " + deliver);
                    err.flush();
                    on.receive((MaelstromRequest)deliver.body, deliver.src, deliver);
                    break;
                default:
                    // Drop the message if it goes across the partition
                    boolean drop = !(partitionSet.contains(deliver.src) && partitionSet.contains(deliver.dest)
                                    || !partitionSet.contains(deliver.src) && !partitionSet.contains(deliver.dest));
                    if (drop)
                    {
                        err.println(clock++ + " DROP " + deliver);
                        err.flush();
                        break;
                    }
                    err.println(clock++ + " RECV " + deliver);
                    err.flush();
                    Object body = ((Wrapper)deliver.body).body;
                    // for some reason InformOfTxnReply has deliver.body.in_reply_to == Body.SENTINEL_MSG_ID, so is unique
                    // for all reply types
                    if (deliver.body.in_reply_to > Body.SENTINEL_MSG_ID || body instanceof Reply)
                    {
                        Reply reply = (Reply) body;
                        SafeCallback callback = sinks.get(deliver.dest).callbacks.remove(deliver.body.in_reply_to);
                        if (callback != null)
                            callback.success(deliver.src, reply);
                    }
                    else on.receive((Request) body, deliver.src, deliver);
            }
        }
        else
        {
            ((Runnable) next).run();
        }
        return true;
    }