def currentRoutees()

in cluster/src/multi-jvm/scala/org/apache/pekko/cluster/routing/ClusterRoundRobinSpec.scala [150:394]


  def currentRoutees(router: ActorRef) =
    Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees

  "A cluster router with a RoundRobin router" must {
    "start cluster with 2 nodes" in {
      awaitClusterUp(first, second)
      enterBarrier("after-1")
    }

    "deploy routees to the member nodes in the cluster" in {

      runOn(first) {
        router1.isInstanceOf[RoutedActorRef] should ===(true)

        // max-nr-of-instances-per-node=2 times 2 nodes
        awaitAssert(currentRoutees(router1).size should ===(4))

        val iterationCount = 10
        for (_ <- 0 until iterationCount) {
          router1 ! "hit"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        replies(first) should be > 0
        replies(second) should be > 0
        replies(third) should ===(0)
        replies(fourth) should ===(0)
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-2")
    }

    "lookup routees on the member nodes in the cluster" in {

      // cluster consists of first and second

      system.actorOf(Props(classOf[SomeActor], GroupRoutee), "myserviceA")
      system.actorOf(Props(classOf[SomeActor], GroupRoutee), "myserviceB")
      enterBarrier("myservice-started")

      runOn(first) {
        // 2 nodes, 2 routees on each node
        within(10.seconds) {
          awaitAssert(currentRoutees(router4).size should ===(4))
        }

        val iterationCount = 10
        for (_ <- 0 until iterationCount) {
          router4 ! "hit"
        }

        val replies = receiveReplies(GroupRoutee, iterationCount)

        replies(first) should be > 0
        replies(second) should be > 0
        replies(third) should ===(0)
        replies(fourth) should ===(0)
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-3")
    }

    "deploy routees to new nodes in the cluster" in {

      // add third and fourth
      awaitClusterUp(first, second, third, fourth)

      runOn(first) {
        // max-nr-of-instances-per-node=2 times 4 nodes
        awaitAssert(currentRoutees(router1).size should ===(8))

        val iterationCount = 10
        for (_ <- 0 until iterationCount) {
          router1 ! "hit"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        replies.values.foreach { _ should be > 0 }
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-4")
    }

    "lookup routees on new nodes in the cluster" in {

      // cluster consists of first, second, third and fourth

      runOn(first) {
        // 4 nodes, 2 routee on each node
        awaitAssert(currentRoutees(router4).size should ===(8))

        val iterationCount = 10
        for (_ <- 0 until iterationCount) {
          router4 ! "hit"
        }

        val replies = receiveReplies(GroupRoutee, iterationCount)

        replies.values.foreach { _ should be > 0 }
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-5")
    }

    "deploy routees to only remote nodes when allow-local-routees = off" in {

      runOn(first) {
        // max-nr-of-instances-per-node=1 times 3 nodes
        awaitAssert(currentRoutees(router3).size should ===(3))

        val iterationCount = 10
        for (_ <- 0 until iterationCount) {
          router3 ! "hit"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        replies(first) should ===(0)
        replies(second) should be > 0
        replies(third) should be > 0
        replies(fourth) should be > 0
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-6")
    }

    "deploy routees to specified node role" in {

      runOn(first) {
        awaitAssert(currentRoutees(router5).size should ===(2))

        val iterationCount = 10
        for (_ <- 0 until iterationCount) {
          router5 ! "hit"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        replies(first) should be > 0
        replies(second) should be > 0
        replies(third) should ===(0)
        replies(fourth) should ===(0)
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-7")
    }

    "deploy programatically defined routees to the member nodes in the cluster" in {

      runOn(first) {
        router2.isInstanceOf[RoutedActorRef] should ===(true)

        // totalInstances = 3, maxInstancesPerNode = 1
        awaitAssert(currentRoutees(router2).size should ===(3))

        val iterationCount = 10
        for (_ <- 0 until iterationCount) {
          router2 ! "hit"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        // note that router2 has totalInstances = 3, maxInstancesPerNode = 1
        val routees = currentRoutees(router2)
        val routeeAddresses = routees.collect { case ActorRefRoutee(ref) => fullAddress(ref) }

        routeeAddresses.size should ===(3)
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-8")
    }

    "remove routees for unreachable nodes, and add when reachable again" in within(30.seconds) {

      // myservice is already running

      def routees = currentRoutees(router4)
      def routeeAddresses = routees.collect { case ActorSelectionRoutee(sel) => fullAddress(sel.anchor) }.toSet

      runOn(first) {
        // 4 nodes, 2 routees on each node
        awaitAssert(currentRoutees(router4).size should ===(8))

        testConductor.blackhole(first, second, Direction.Both).await

        awaitAssert(routees.size should ===(6))
        routeeAddresses should not contain address(second)

        testConductor.passThrough(first, second, Direction.Both).await
        awaitAssert(routees.size should ===(8))
        routeeAddresses should contain(address(second))

      }

      enterBarrier("after-9")
    }

    "deploy programatically defined routees to other node when a node becomes down" in {
      muteMarkingAsUnreachable()

      runOn(first) {
        def routees = currentRoutees(router2)
        def routeeAddresses = routees.collect { case ActorRefRoutee(ref) => fullAddress(ref) }.toSet

        routees.foreach {
          case ActorRefRoutee(ref) => watch(ref)
          case _                   =>
        }
        val notUsedAddress = roles.map(address).toSet.diff(routeeAddresses).head
        val downAddress = routeeAddresses.find(_ != address(first)).get
        val downRouteeRef = routees.collectFirst {
          case ActorRefRoutee(ref) if ref.path.address == downAddress => ref
        }.get

        cluster.down(downAddress)
        expectMsgType[Terminated](15.seconds).actor should ===(downRouteeRef)
        awaitAssert {
          routeeAddresses should contain(notUsedAddress)
          routeeAddresses should not contain downAddress
        }

        val iterationCount = 10
        for (_ <- 0 until iterationCount) {
          router2 ! "hit"
        }

        val replies = receiveReplies(PoolRoutee, iterationCount)

        routeeAddresses.size should ===(3)
        replies.values.sum should ===(iterationCount)
      }

      enterBarrier("after-10")
    }

  }