public static void listen()

in accord-maelstrom/src/main/java/accord/maelstrom/Main.java [165:243]


    public static void listen(TopologyFactory topologyFactory, Supplier<String> in, PrintStream out, PrintStream err) throws IOException
    {
        long start = System.nanoTime();
        err.println("Starting...");
        err.flush();
        ThreadPoolScheduler scheduler = new ThreadPoolScheduler();
        Node on;
        Topology topology;
        StdoutSink sink;
        {
            String line = in.get();
            err.println("Received " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + line);
            err.flush();
            Packet packet = Json.GSON.fromJson(line, Packet.class);
            MaelstromInit init = (MaelstromInit) packet.body;
            topology = topologyFactory.toTopology(init.cluster);
            sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
            Journal journal = new Cluster.NoOpJournal();
            TimeService time = TimeService.ofNonMonotonic(System::currentTimeMillis, TimeUnit.MILLISECONDS);
            on = new Node(init.self, sink, new SimpleConfigService(topology),
                          time, new UniqueTimeService.AtomicUniqueTime(time),
                          MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
                          MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER,
                          DefaultRemoteListeners::new, DefaultTimeouts::new, DefaultProgressLogs::new, DefaultLocalListeners.Factory::new,
                          InMemoryCommandStores.SingleThread::new, new CoordinationAdapter.DefaultFactory(),
                          DurableBefore.NOOP_PERSISTER, journal);
            awaitUninterruptibly(on.unsafeStart());
            err.println("Initialized node " + init.self);
            err.flush();
            sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id));
        }
        try
        {
            while (true)
            {
                String line = in.get();
                if (line == null)
                {
                    err.println("Received EOF; terminating");
                    err.flush();
                    scheduler.stop();
                    err.println("Terminated");
                    err.flush();
                    return;
                }
                err.println("Received " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + line);
                err.flush();
                Packet next = Packet.parse(line);
                switch (next.body.type)
                {
                    case txn:
                        on.receive((MaelstromRequest)next.body, next.src, MaelstromReplyContext.contextFor(next.body.msg_id));
                        break;
                    default:
                        if (next.body.in_reply_to > Body.SENTINEL_MSG_ID)
                        {
                            Reply reply = (Reply)((Wrapper)next.body).body;
                            CallbackInfo callback = sink.callbacks.remove(next.body.in_reply_to);
                            if (callback != null)
                                scheduler.now(() -> {
                                    try
                                    {
                                        callback.callback.onSuccess(next.src, reply);
                                    }
                                    catch (Throwable t)
                                    {
                                        callback.callback.onCallbackFailure(next.src, t);
                                    }
                                });
                        }
                        else on.receive((Request)((Wrapper)next.body).body, next.src, MaelstromReplyContext.contextFor(next.body.msg_id));
                }
            }
        }
        finally
        {
            on.shutdown();
        }
    }