def currentRoutees()

in cluster/src/multi-jvm/scala/org/apache/pekko/cluster/routing/UseRoleIgnoredSpec.scala [93:374]


  def currentRoutees(router: ActorRef) =
    Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees

  "A cluster" must {
    "start cluster" taggedAs LongRunningTest in {
      awaitClusterUp(first, second, third)
      runOn(first) { info("first, roles: " + cluster.selfRoles) }
      runOn(second) { info("second, roles: " + cluster.selfRoles) }
      runOn(third) { info("third, roles: " + cluster.selfRoles) }

      // routees for the group routers
      system.actorOf(Props(classOf[SomeActor], GroupRoutee), "foo")
      system.actorOf(Props(classOf[SomeActor], GroupRoutee), "bar")

      enterBarrier("after-1")
    }

    "pool local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in {

      runOn(first) {
        val roles = Set("b")

        val router = system.actorOf(
          ClusterRouterPool(
            RoundRobinPool(nrOfInstances = 6),
            ClusterRouterPoolSettings(
              totalInstances = 6,
              maxInstancesPerNode = 2,
              allowLocalRoutees = false,
              useRoles = roles)).props(Props[SomeActor]()),
          "router-2")

        awaitAssert(currentRoutees(router).size should ===(4))

        val iterationCount = 10
        for (i <- 0 until iterationCount) {
          router ! s"hit-$i"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        replies(first) should ===(0) // should not be deployed locally, does not have required role
        replies(second) should be > 0
        replies(third) should be > 0
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-2")
    }

    "group local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in {

      runOn(first) {
        val roles = Set("b")

        val router = system.actorOf(
          ClusterRouterGroup(
            RoundRobinGroup(paths = Nil),
            ClusterRouterGroupSettings(
              totalInstances = 6,
              routeesPaths = List("/user/foo", "/user/bar"),
              allowLocalRoutees = false,
              useRoles = roles)).props(),
          "router-2b")

        awaitAssert(currentRoutees(router).size should ===(4))

        val iterationCount = 10
        for (i <- 0 until iterationCount) {
          router ! s"hit-$i"
        }

        val replies = receiveReplies(GroupRoutee, iterationCount)

        replies(first) should ===(0) // should not be deployed locally, does not have required role
        replies(second) should be > 0
        replies(third) should be > 0
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-2b")
    }

    "pool local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in {

      runOn(first) {
        val roles = Set("b")

        val router = system.actorOf(
          ClusterRouterPool(
            RoundRobinPool(nrOfInstances = 6),
            ClusterRouterPoolSettings(
              totalInstances = 6,
              maxInstancesPerNode = 2,
              allowLocalRoutees = true,
              useRoles = roles)).props(Props[SomeActor]()),
          "router-3")

        awaitAssert(currentRoutees(router).size should ===(4))

        val iterationCount = 10
        for (i <- 0 until iterationCount) {
          router ! s"hit-$i"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        replies(first) should ===(0) // should not be deployed locally, does not have required role
        replies(second) should be > 0
        replies(third) should be > 0
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-3")
    }

    "group local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in {

      runOn(first) {
        val roles = Set("b")

        val router = system.actorOf(
          ClusterRouterGroup(
            RoundRobinGroup(paths = Nil),
            ClusterRouterGroupSettings(
              totalInstances = 6,
              routeesPaths = List("/user/foo", "/user/bar"),
              allowLocalRoutees = true,
              useRoles = roles)).props(),
          "router-3b")

        awaitAssert(currentRoutees(router).size should ===(4))

        val iterationCount = 10
        for (i <- 0 until iterationCount) {
          router ! s"hit-$i"
        }

        val replies = receiveReplies(GroupRoutee, iterationCount)

        replies(first) should ===(0) // should not be deployed locally, does not have required role
        replies(second) should be > 0
        replies(third) should be > 0
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-3b")
    }

    "pool local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in {

      runOn(first) {
        val roles = Set("a")

        val router = system.actorOf(
          ClusterRouterPool(
            RoundRobinPool(nrOfInstances = 6),
            ClusterRouterPoolSettings(
              totalInstances = 6,
              maxInstancesPerNode = 2,
              allowLocalRoutees = true,
              useRoles = roles)).props(Props[SomeActor]()),
          "router-4")

        awaitAssert(currentRoutees(router).size should ===(2))

        val iterationCount = 10
        for (i <- 0 until iterationCount) {
          router ! s"hit-$i"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        replies(first) should be > 0
        replies(second) should ===(0)
        replies(third) should ===(0)
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-4")
    }

    "group local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in {

      runOn(first) {
        val roles = Set("a")

        val router = system.actorOf(
          ClusterRouterGroup(
            RoundRobinGroup(paths = Nil),
            ClusterRouterGroupSettings(
              totalInstances = 6,
              routeesPaths = List("/user/foo", "/user/bar"),
              allowLocalRoutees = true,
              useRoles = roles)).props(),
          "router-4b")

        awaitAssert(currentRoutees(router).size should ===(2))

        val iterationCount = 10
        for (i <- 0 until iterationCount) {
          router ! s"hit-$i"
        }

        val replies = receiveReplies(GroupRoutee, iterationCount)

        replies(first) should be > 0
        replies(second) should ===(0)
        replies(third) should ===(0)
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-4b")
    }

    "pool local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in {

      runOn(first) {
        val roles = Set("c")

        val router = system.actorOf(
          ClusterRouterPool(
            RoundRobinPool(nrOfInstances = 6),
            ClusterRouterPoolSettings(
              totalInstances = 6,
              maxInstancesPerNode = 2,
              allowLocalRoutees = true,
              useRoles = roles)).props(Props[SomeActor]()),
          "router-5")

        awaitAssert(currentRoutees(router).size should ===(6))

        val iterationCount = 10
        for (i <- 0 until iterationCount) {
          router ! s"hit-$i"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        replies(first) should be > 0
        replies(second) should be > 0
        replies(third) should be > 0
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-5")
    }

    "group local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in {

      runOn(first) {
        val roles = Set("c")

        val router = system.actorOf(
          ClusterRouterGroup(
            RoundRobinGroup(paths = Nil),
            ClusterRouterGroupSettings(
              totalInstances = 6,
              routeesPaths = List("/user/foo", "/user/bar"),
              allowLocalRoutees = true,
              useRoles = roles)).props(),
          "router-5b")

        awaitAssert(currentRoutees(router).size should ===(6))

        val iterationCount = 10
        for (i <- 0 until iterationCount) {
          router ! s"hit-$i"
        }

        val replies = receiveReplies(GroupRoutee, iterationCount)

        replies(first) should be > 0
        replies(second) should be > 0
        replies(third) should be > 0
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-5b")
    }

  }