in pekko-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala [60:110]
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
implicit val typedSystem = system.toTyped
var singletonProxy: ActorRef[StatsService.Command] = _
"The stats sample with single master" must {
"illustrate how to startup cluster" in within(15.seconds) {
Cluster(system).subscribe(testActor, classOf[MemberUp])
expectMsgClass(classOf[CurrentClusterState])
val firstAddress = node(first).address
val secondAddress = node(second).address
val thirdAddress = node(third).address
Cluster(system).join(firstAddress)
receiveN(3).collect { case MemberUp(m) => m.address }.toSet should be(
Set(firstAddress, secondAddress, thirdAddress))
Cluster(system).unsubscribe(testActor)
val singletonSettings = ClusterSingletonSettings(typedSystem).withRole("compute")
singletonProxy = ClusterSingleton(typedSystem).init(
SingletonActor(
Behaviors.setup[StatsService.Command] { ctx =>
// just run some local workers for this test
val workersRouter = ctx.spawn(Routers.pool(2)(StatsWorker()), "WorkersRouter")
StatsService(workersRouter)
},
"StatsService").withSettings(singletonSettings))
testConductor.enter("all-up")
}
"show usage of the statsServiceProxy" in within(20.seconds) {
// eventually the service should be ok,
// service and worker nodes might not be up yet
awaitAssert {
system.log.info("Trying a request")
val probe = TestProbe[StatsService.Response]()
singletonProxy ! StatsService.ProcessText("this is the text that will be analyzed", probe.ref)
val response = probe.expectMessageType[StatsService.JobResult](3.seconds)
response.meanWordLength should be(3.875 +- 0.001)
}
testConductor.enter("done")
}
}