in pekko-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala [56:115]
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
implicit val typedSystem = system.toTyped
"The stats sample" must {
"illustrate how to startup cluster" in within(15.seconds) {
Cluster(system).subscribe(testActor, classOf[MemberUp])
expectMsgClass(classOf[CurrentClusterState])
val firstAddress = node(first).address
val secondAddress = node(second).address
val thirdAddress = node(third).address
Cluster(system).join(firstAddress)
receiveN(3).collect { case MemberUp(m) => m.address }.toSet should be(
Set(firstAddress, secondAddress, thirdAddress))
Cluster(system).unsubscribe(testActor)
testConductor.enter("all-up")
}
"show usage of the statsService from one node" in within(15.seconds) {
runOn(first, second) {
val worker = system.spawn(StatsWorker(), "StatsWorker")
val service = system.spawn(StatsService(worker), "StatsService")
typedSystem.receptionist ! Receptionist.Register(App.StatsServiceKey, service)
}
runOn(third) {
assertServiceOk()
}
testConductor.enter("done-2")
}
def assertServiceOk(): Unit = {
// eventually the service should be ok,
// first attempts might fail because worker actors not started yet
awaitAssert {
val probe = TestProbe[AnyRef]()
typedSystem.receptionist ! Receptionist.Find(App.StatsServiceKey, probe.ref)
val App.StatsServiceKey.Listing(actors) = probe.expectMessageType[Receptionist.Listing]
actors should not be empty
actors.head ! StatsService.ProcessText("this is the text that will be analyzed", probe.ref)
probe.expectMessageType[StatsService.JobResult].meanWordLength should be(
3.875 +- 0.001)
}
}
"show usage of the statsService from all nodes" in within(15.seconds) {
assertServiceOk()
testConductor.enter("done-3")
}
}