in pekko-sample-cluster-scala/src/main/scala/sample/cluster/stats/AppOneMaster.scala [20:57]
def apply(): Behavior[Nothing] = Behaviors.setup[Nothing] { ctx =>
val cluster = Cluster(ctx.system)
val singletonSettings = ClusterSingletonSettings(ctx.system)
.withRole("compute")
val serviceSingleton = SingletonActor(
Behaviors.setup[StatsService.Command] { ctx =>
// the service singleton accesses available workers through a group router
val workersRouter =
ctx.spawn(
Routers
.group(WorkerServiceKey)
// the worker has a per word cache, so send the same word to the same worker
.withConsistentHashingRouting(1, _.word),
"WorkersRouter")
StatsService(workersRouter)
},
"StatsService").withStopMessage(StatsService.Stop)
.withSettings(singletonSettings)
val serviceProxy = ClusterSingleton(ctx.system).init(serviceSingleton)
if (cluster.selfMember.hasRole("compute")) {
// on every compute node N local workers, which a cluster singleton stats service delegates work to
val numberOfWorkers =
ctx.system.settings.config.getInt("stats-service.workers-per-node")
ctx.log.info("Starting {} workers", numberOfWorkers)
(0 to numberOfWorkers).foreach { n =>
val worker = ctx.spawn(StatsWorker(), s"StatsWorker$n")
ctx.system.receptionist ! Receptionist
.Register(WorkerServiceKey, worker)
}
}
if (cluster.selfMember.hasRole("client")) {
ctx.spawn(StatsClient(serviceProxy), "Client")
}
Behaviors.empty
}