in cluster/src/multi-jvm/scala/org/apache/pekko/cluster/routing/ClusterRoundRobinSpec.scala [149:393]
def currentRoutees(router: ActorRef) =
Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees
"A cluster router with a RoundRobin router" must {
"start cluster with 2 nodes" in {
awaitClusterUp(first, second)
enterBarrier("after-1")
}
"deploy routees to the member nodes in the cluster" in {
runOn(first) {
router1.isInstanceOf[RoutedActorRef] should ===(true)
// max-nr-of-instances-per-node=2 times 2 nodes
awaitAssert(currentRoutees(router1).size should ===(4))
val iterationCount = 10
for (_ <- 0 until iterationCount) {
router1 ! "hit"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should be > 0
replies(second) should be > 0
replies(third) should ===(0)
replies(fourth) should ===(0)
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-2")
}
"lookup routees on the member nodes in the cluster" in {
// cluster consists of first and second
system.actorOf(Props(classOf[SomeActor], GroupRoutee), "myserviceA")
system.actorOf(Props(classOf[SomeActor], GroupRoutee), "myserviceB")
enterBarrier("myservice-started")
runOn(first) {
// 2 nodes, 2 routees on each node
within(10.seconds) {
awaitAssert(currentRoutees(router4).size should ===(4))
}
val iterationCount = 10
for (_ <- 0 until iterationCount) {
router4 ! "hit"
}
val replies = receiveReplies(GroupRoutee, iterationCount)
replies(first) should be > 0
replies(second) should be > 0
replies(third) should ===(0)
replies(fourth) should ===(0)
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-3")
}
"deploy routees to new nodes in the cluster" in {
// add third and fourth
awaitClusterUp(first, second, third, fourth)
runOn(first) {
// max-nr-of-instances-per-node=2 times 4 nodes
awaitAssert(currentRoutees(router1).size should ===(8))
val iterationCount = 10
for (_ <- 0 until iterationCount) {
router1 ! "hit"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies.values.foreach { _ should be > 0 }
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-4")
}
"lookup routees on new nodes in the cluster" in {
// cluster consists of first, second, third and fourth
runOn(first) {
// 4 nodes, 2 routee on each node
awaitAssert(currentRoutees(router4).size should ===(8))
val iterationCount = 10
for (_ <- 0 until iterationCount) {
router4 ! "hit"
}
val replies = receiveReplies(GroupRoutee, iterationCount)
replies.values.foreach { _ should be > 0 }
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-5")
}
"deploy routees to only remote nodes when allow-local-routees = off" in {
runOn(first) {
// max-nr-of-instances-per-node=1 times 3 nodes
awaitAssert(currentRoutees(router3).size should ===(3))
val iterationCount = 10
for (_ <- 0 until iterationCount) {
router3 ! "hit"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should ===(0)
replies(second) should be > 0
replies(third) should be > 0
replies(fourth) should be > 0
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-6")
}
"deploy routees to specified node role" in {
runOn(first) {
awaitAssert(currentRoutees(router5).size should ===(2))
val iterationCount = 10
for (_ <- 0 until iterationCount) {
router5 ! "hit"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should be > 0
replies(second) should be > 0
replies(third) should ===(0)
replies(fourth) should ===(0)
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-7")
}
"deploy programatically defined routees to the member nodes in the cluster" in {
runOn(first) {
router2.isInstanceOf[RoutedActorRef] should ===(true)
// totalInstances = 3, maxInstancesPerNode = 1
awaitAssert(currentRoutees(router2).size should ===(3))
val iterationCount = 10
for (_ <- 0 until iterationCount) {
router2 ! "hit"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
// note that router2 has totalInstances = 3, maxInstancesPerNode = 1
val routees = currentRoutees(router2)
val routeeAddresses = routees.collect { case ActorRefRoutee(ref) => fullAddress(ref) }
routeeAddresses.size should ===(3)
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-8")
}
"remove routees for unreachable nodes, and add when reachable again" in within(30.seconds) {
// myservice is already running
def routees = currentRoutees(router4)
def routeeAddresses = routees.collect { case ActorSelectionRoutee(sel) => fullAddress(sel.anchor) }.toSet
runOn(first) {
// 4 nodes, 2 routees on each node
awaitAssert(currentRoutees(router4).size should ===(8))
testConductor.blackhole(first, second, Direction.Both).await
awaitAssert(routees.size should ===(6))
routeeAddresses should not contain address(second)
testConductor.passThrough(first, second, Direction.Both).await
awaitAssert(routees.size should ===(8))
routeeAddresses should contain(address(second))
}
enterBarrier("after-9")
}
"deploy programatically defined routees to other node when a node becomes down" in {
muteMarkingAsUnreachable()
runOn(first) {
def routees = currentRoutees(router2)
def routeeAddresses = routees.collect { case ActorRefRoutee(ref) => fullAddress(ref) }.toSet
routees.foreach {
case ActorRefRoutee(ref) => watch(ref)
case _ =>
}
val notUsedAddress = roles.map(address).toSet.diff(routeeAddresses).head
val downAddress = routeeAddresses.find(_ != address(first)).get
val downRouteeRef = routees.collectFirst {
case ActorRefRoutee(ref) if ref.path.address == downAddress => ref
}.get
cluster.down(downAddress)
expectMsgType[Terminated](15.seconds).actor should ===(downRouteeRef)
awaitAssert {
routeeAddresses should contain(notUsedAddress)
routeeAddresses should not contain downAddress
}
val iterationCount = 10
for (_ <- 0 until iterationCount) {
router2 ! "hit"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
routeeAddresses.size should ===(3)
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-10")
}
}