in pekko-sample-cluster-java/src/main/java/sample/cluster/stats/App.java [27:54]
static Behavior<Void> create() {
return Behaviors.setup(context -> {
Cluster cluster = Cluster.get(context.getSystem());
if (cluster.selfMember().hasRole("compute")) {
// on every compute node there is one service instance that delegates to N local workers
final int numberOfWorkers = context.getSystem().settings().config().getInt("stats-service.workers-per-node");
// The worker has a per word cache, so send the same word to the same local worker child
Behavior<StatsWorker.Process> workerPoolBehavior =
Routers.pool(numberOfWorkers, StatsWorker.create().<StatsWorker.Process>narrow())
.withConsistentHashingRouting(1, process -> process.word);
ActorRef<StatsWorker.Process> workers =
context.spawn(workerPoolBehavior, "WorkerRouter");
ActorRef<StatsService.Command> service =
context.spawn(StatsService.create(workers.narrow()), "StatsService");
// published through the receptionist to the other nodes in the cluster
context.getSystem().receptionist().tell(Receptionist.register(STATS_SERVICE_KEY, service.narrow()));
}
if (cluster.selfMember().hasRole("client")) {
ActorRef<StatsService.ProcessText> serviceRouter =
context.spawn(Routers.group(STATS_SERVICE_KEY), "ServiceRouter");
context.spawn(StatsClient.create(serviceRouter), "Client");
}
return Behaviors.empty();
});
}