in accord-maelstrom/src/main/java/accord/maelstrom/Main.java [165:243]
public static void listen(TopologyFactory topologyFactory, Supplier<String> in, PrintStream out, PrintStream err) throws IOException
{
long start = System.nanoTime();
err.println("Starting...");
err.flush();
ThreadPoolScheduler scheduler = new ThreadPoolScheduler();
Node on;
Topology topology;
StdoutSink sink;
{
String line = in.get();
err.println("Received " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + line);
err.flush();
Packet packet = Json.GSON.fromJson(line, Packet.class);
MaelstromInit init = (MaelstromInit) packet.body;
topology = topologyFactory.toTopology(init.cluster);
sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
Journal journal = new Cluster.NoOpJournal();
TimeService time = TimeService.ofNonMonotonic(System::currentTimeMillis, TimeUnit.MILLISECONDS);
on = new Node(init.self, sink, new SimpleConfigService(topology),
time, new UniqueTimeService.AtomicUniqueTime(time),
MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER,
DefaultRemoteListeners::new, DefaultTimeouts::new, DefaultProgressLogs::new, DefaultLocalListeners.Factory::new,
InMemoryCommandStores.SingleThread::new, new CoordinationAdapter.DefaultFactory(),
DurableBefore.NOOP_PERSISTER, journal);
awaitUninterruptibly(on.unsafeStart());
err.println("Initialized node " + init.self);
err.flush();
sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id));
}
try
{
while (true)
{
String line = in.get();
if (line == null)
{
err.println("Received EOF; terminating");
err.flush();
scheduler.stop();
err.println("Terminated");
err.flush();
return;
}
err.println("Received " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + line);
err.flush();
Packet next = Packet.parse(line);
switch (next.body.type)
{
case txn:
on.receive((MaelstromRequest)next.body, next.src, MaelstromReplyContext.contextFor(next.body.msg_id));
break;
default:
if (next.body.in_reply_to > Body.SENTINEL_MSG_ID)
{
Reply reply = (Reply)((Wrapper)next.body).body;
CallbackInfo callback = sink.callbacks.remove(next.body.in_reply_to);
if (callback != null)
scheduler.now(() -> {
try
{
callback.callback.onSuccess(next.src, reply);
}
catch (Throwable t)
{
callback.callback.onCallbackFailure(next.src, t);
}
});
}
else on.receive((Request)((Wrapper)next.body).body, next.src, MaelstromReplyContext.contextFor(next.body.msg_id));
}
}
}
finally
{
on.shutdown();
}
}