in pekko-sample-cluster-java/src/main/java/sample/cluster/stats/AppOneMaster.java [30:69]
static Behavior<Void> create() {
return Behaviors.setup(context -> {
Cluster cluster = Cluster.get(context.getSystem());
ClusterSingletonSettings singletonSettings =
ClusterSingletonSettings.create(context.getSystem())
.withRole("compute");
SingletonActor<StatsService.Command> serviceSingleton =
SingletonActor.of(Behaviors.<StatsService.Command>setup(singletonContext -> {
// The worker has a per word cache, so send the same word to the same local worker child
GroupRouter<StatsWorker.Process> workerGroupBehavior =
Routers.group(WORKER_SERVICE_KEY).withConsistentHashingRouting(1, process -> process.word);
ActorRef<StatsWorker.Process> workersRouter =
singletonContext.spawn(workerGroupBehavior, "WorkersRouter");
return StatsService.create(workersRouter);
}),
"StatsService")
.withStopMessage(StatsService.Stop.INSTANCE)
.withSettings(singletonSettings);
ActorRef<StatsService.Command> serviceProxy =
ClusterSingleton.get(context.getSystem()).init(serviceSingleton);
if (cluster.selfMember().hasRole("compute")) {
// on every compute node N local workers, which a cluster singleton stats service delegates work to
final int numberOfWorkers = context.getSystem().settings().config().getInt("stats-service.workers-per-node");
context.getLog().info("Starting {} workers", numberOfWorkers);
for (int i = 0; i < 4; i++) {
ActorRef<StatsWorker.Command> worker = context.spawn(StatsWorker.create(), "StatsWorker" + i);
context.getSystem().receptionist().tell(Receptionist.register(WORKER_SERVICE_KEY, worker.narrow()));
}
}
if (cluster.selfMember().hasRole("client")) {
context.spawn(StatsClient.create(serviceProxy.narrow()), "Client");
}
return Behaviors.empty();
});
}