def upMembers = cluster.state.members.filter()

in cluster-sharding/src/multi-jvm/scala/org/apache/pekko/cluster/sharding/RollingUpdateShardAllocationSpec.scala [102:232]


  def upMembers = cluster.state.members.filter(_.status == Up)

  "Cluster sharding" must {

    "form cluster" in {
      awaitClusterUp(first, second)
      enterBarrier("cluster-started")
    }
    lazy val shardRegion = startSharding(
      system,
      typeName = typeName,
      entityProps = Props[GiveMeYourHome](),
      extractEntityId = GiveMeYourHome.extractEntityId,
      extractShardId = GiveMeYourHome.extractShardId)

    "start cluster sharding on first" in {
      runOn(first, second) {

        // make sure both regions have completed registration before triggering entity allocation
        // so the folloing allocations end up as one on each node
        awaitAssert {
          shardRegion ! ShardRegion.GetCurrentRegions
          expectMsgType[ShardRegion.CurrentRegions].regions should have size 2
        }

        shardRegion ! GiveMeYourHome.Get("id1")
        // started on either of the nodes
        val address1 = expectMsgType[GiveMeYourHome.Home].address

        shardRegion ! GiveMeYourHome.Get("id2")
        // started on the other of the nodes (because least
        val address2 = expectMsgType[GiveMeYourHome.Home].address

        // one on each node
        Set(address1, address2) should have size 2
      }
      enterBarrier("first-version-started")
    }
    "start a rolling upgrade" in {
      join(third, first)

      runOn(first, second, third) {
        shardRegion

        // new shards should now go on third since that is the highest version,
        // however there is a race where the shard has not yet completed registration
        // with the coordinator and shards will be allocated on the old nodes, so we need
        // to make sure the third region has completed registration before trying
        // if we didn't the strategy will default it back to the old nodes
        awaitAssert {
          shardRegion ! ShardRegion.GetCurrentRegions
          expectMsgType[ShardRegion.CurrentRegions].regions should have size 3
        }
      }
      enterBarrier("third-region-registered")
      runOn(first, second) {
        shardRegion ! GiveMeYourHome.Get("id3")
        expectMsgType[GiveMeYourHome.Home]
      }
      runOn(third) {
        // now third region should be only option as the other two are old versions
        // but first new allocated shard would anyway go there because of balance, so we
        // need to do more than one
        (3 to 5).foreach { n =>
          shardRegion ! GiveMeYourHome.Get(s"id$n")
          expectMsgType[GiveMeYourHome.Home].address should ===(Cluster(system).selfAddress)
        }
      }
      enterBarrier("rolling-upgrade-in-progress")
    }
    "complete a rolling upgrade" in {
      join(fourth, first)

      runOn(first) {
        val cluster = Cluster(system)
        cluster.leave(cluster.selfAddress)
      }
      runOn(second, third, fourth) {
        awaitAssert(upMembers.size should ===(3))
      }
      enterBarrier("first-left")

      runOn(second, third, fourth) {
        awaitAssert({
            shardRegion ! ShardRegion.GetCurrentRegions
            expectMsgType[ShardRegion.CurrentRegions].regions should have size 3
          }, 30.seconds)
      }
      enterBarrier("sharding-handed-off")

      // trigger allocation (no verification because we don't know which id was on node 1)
      runOn(second, third, fourth) {
        awaitAssert {
          shardRegion ! GiveMeYourHome.Get("id1")
          expectMsgType[GiveMeYourHome.Home]

          shardRegion ! GiveMeYourHome.Get("id2")
          expectMsgType[GiveMeYourHome.Home]
        }
      }
      enterBarrier("first-allocated")

      runOn(second) {
        val cluster = Cluster(system)
        cluster.leave(cluster.selfAddress)
      }
      runOn(third, fourth) {
        // make sure coordinator has noticed there are only two regions
        awaitAssert({
            shardRegion ! ShardRegion.GetCurrentRegions
            expectMsgType[ShardRegion.CurrentRegions].regions should have size 2
          }, 30.seconds)
      }
      enterBarrier("second-left")

      // trigger allocation and verify where each was started
      runOn(third, fourth) {
        awaitAssert {
          shardRegion ! GiveMeYourHome.Get("id1")
          val address1 = expectMsgType[GiveMeYourHome.Home].address
          upMembers.map(_.address) should contain(address1)

          shardRegion ! GiveMeYourHome.Get("id2")
          val address2 = expectMsgType[GiveMeYourHome.Home].address
          upMembers.map(_.address) should contain(address2)
        }
      }
      enterBarrier("completo")
    }

  }