in cluster/src/multi-jvm/scala/org/apache/pekko/cluster/routing/UseRoleIgnoredSpec.scala [93:374]
def currentRoutees(router: ActorRef) =
Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees
"A cluster" must {
"start cluster" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
runOn(first) { info("first, roles: " + cluster.selfRoles) }
runOn(second) { info("second, roles: " + cluster.selfRoles) }
runOn(third) { info("third, roles: " + cluster.selfRoles) }
// routees for the group routers
system.actorOf(Props(classOf[SomeActor], GroupRoutee), "foo")
system.actorOf(Props(classOf[SomeActor], GroupRoutee), "bar")
enterBarrier("after-1")
}
"pool local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in {
runOn(first) {
val roles = Set("b")
val router = system.actorOf(
ClusterRouterPool(
RoundRobinPool(nrOfInstances = 6),
ClusterRouterPoolSettings(
totalInstances = 6,
maxInstancesPerNode = 2,
allowLocalRoutees = false,
useRoles = roles)).props(Props[SomeActor]()),
"router-2")
awaitAssert(currentRoutees(router).size should ===(4))
val iterationCount = 10
for (i <- 0 until iterationCount) {
router ! s"hit-$i"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should ===(0) // should not be deployed locally, does not have required role
replies(second) should be > 0
replies(third) should be > 0
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-2")
}
"group local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in {
runOn(first) {
val roles = Set("b")
val router = system.actorOf(
ClusterRouterGroup(
RoundRobinGroup(paths = Nil),
ClusterRouterGroupSettings(
totalInstances = 6,
routeesPaths = List("/user/foo", "/user/bar"),
allowLocalRoutees = false,
useRoles = roles)).props(),
"router-2b")
awaitAssert(currentRoutees(router).size should ===(4))
val iterationCount = 10
for (i <- 0 until iterationCount) {
router ! s"hit-$i"
}
val replies = receiveReplies(GroupRoutee, iterationCount)
replies(first) should ===(0) // should not be deployed locally, does not have required role
replies(second) should be > 0
replies(third) should be > 0
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-2b")
}
"pool local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in {
runOn(first) {
val roles = Set("b")
val router = system.actorOf(
ClusterRouterPool(
RoundRobinPool(nrOfInstances = 6),
ClusterRouterPoolSettings(
totalInstances = 6,
maxInstancesPerNode = 2,
allowLocalRoutees = true,
useRoles = roles)).props(Props[SomeActor]()),
"router-3")
awaitAssert(currentRoutees(router).size should ===(4))
val iterationCount = 10
for (i <- 0 until iterationCount) {
router ! s"hit-$i"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should ===(0) // should not be deployed locally, does not have required role
replies(second) should be > 0
replies(third) should be > 0
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-3")
}
"group local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in {
runOn(first) {
val roles = Set("b")
val router = system.actorOf(
ClusterRouterGroup(
RoundRobinGroup(paths = Nil),
ClusterRouterGroupSettings(
totalInstances = 6,
routeesPaths = List("/user/foo", "/user/bar"),
allowLocalRoutees = true,
useRoles = roles)).props(),
"router-3b")
awaitAssert(currentRoutees(router).size should ===(4))
val iterationCount = 10
for (i <- 0 until iterationCount) {
router ! s"hit-$i"
}
val replies = receiveReplies(GroupRoutee, iterationCount)
replies(first) should ===(0) // should not be deployed locally, does not have required role
replies(second) should be > 0
replies(third) should be > 0
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-3b")
}
"pool local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in {
runOn(first) {
val roles = Set("a")
val router = system.actorOf(
ClusterRouterPool(
RoundRobinPool(nrOfInstances = 6),
ClusterRouterPoolSettings(
totalInstances = 6,
maxInstancesPerNode = 2,
allowLocalRoutees = true,
useRoles = roles)).props(Props[SomeActor]()),
"router-4")
awaitAssert(currentRoutees(router).size should ===(2))
val iterationCount = 10
for (i <- 0 until iterationCount) {
router ! s"hit-$i"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should be > 0
replies(second) should ===(0)
replies(third) should ===(0)
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-4")
}
"group local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in {
runOn(first) {
val roles = Set("a")
val router = system.actorOf(
ClusterRouterGroup(
RoundRobinGroup(paths = Nil),
ClusterRouterGroupSettings(
totalInstances = 6,
routeesPaths = List("/user/foo", "/user/bar"),
allowLocalRoutees = true,
useRoles = roles)).props(),
"router-4b")
awaitAssert(currentRoutees(router).size should ===(2))
val iterationCount = 10
for (i <- 0 until iterationCount) {
router ! s"hit-$i"
}
val replies = receiveReplies(GroupRoutee, iterationCount)
replies(first) should be > 0
replies(second) should ===(0)
replies(third) should ===(0)
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-4b")
}
"pool local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in {
runOn(first) {
val roles = Set("c")
val router = system.actorOf(
ClusterRouterPool(
RoundRobinPool(nrOfInstances = 6),
ClusterRouterPoolSettings(
totalInstances = 6,
maxInstancesPerNode = 2,
allowLocalRoutees = true,
useRoles = roles)).props(Props[SomeActor]()),
"router-5")
awaitAssert(currentRoutees(router).size should ===(6))
val iterationCount = 10
for (i <- 0 until iterationCount) {
router ! s"hit-$i"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should be > 0
replies(second) should be > 0
replies(third) should be > 0
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-5")
}
"group local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in {
runOn(first) {
val roles = Set("c")
val router = system.actorOf(
ClusterRouterGroup(
RoundRobinGroup(paths = Nil),
ClusterRouterGroupSettings(
totalInstances = 6,
routeesPaths = List("/user/foo", "/user/bar"),
allowLocalRoutees = true,
useRoles = roles)).props(),
"router-5b")
awaitAssert(currentRoutees(router).size should ===(6))
val iterationCount = 10
for (i <- 0 until iterationCount) {
router ! s"hit-$i"
}
val replies = receiveReplies(GroupRoutee, iterationCount)
replies(first) should be > 0
replies(second) should be > 0
replies(third) should be > 0
replies.values.sum should ===(iterationCount)
}
enterBarrier("after-5b")
}
}