def verify()

in cluster-sharding/src/multi-jvm/scala/org/apache/pekko/cluster/sbr/SplitBrainResolverIntegrationSpec.scala [241:386]


    def verify(): Unit = {
      val side1 = roles.take(scenario.side1Size)
      val side2 = roles.drop(scenario.side1Size).take(scenario.side2Size)

      def singletonRegisterKey(node: RoleName): String =
        "/user/singletonRegistry/singleton-" + scenario.dcDecider(node)

      runOn(side1 ++ side2: _*) {
        log.info("Running {} {} in round {}", myself.name, Cluster(sys).selfUniqueAddress, c)
      }
      enterBarrier(s"log-startup-$c")

      within(90.seconds) {

        join(side1.head, side1.head, awaitUp = true) // oldest
        join(side2.head, side1.head, awaitUp = true) // next oldest
        for (n <- side1.tail ++ side2.tail)
          join(n, side1.head, awaitUp = false)
        runOn(side1 ++ side2: _*) {
          awaitAllMembersUp(side1 ++ side2: _*)
        }
        enterBarrier(s"all-up-$c")

        runOn(node1) {
          singletonProbe.within(25.seconds) {
            singletonProbe.expectMsg(Register(singletonRegisterKey(node1), sysAddress(node1)))
          }
          shardingProbe.expectNoMessage(100.millis)
        }

        runOn(side1 ++ side2: _*) {
          val probe = TestProbe()(sys)
          for (i <- 0 until 10) {
            region.tell(i, probe.ref)
            probe.expectMsg(5.seconds, i)
          }
        }

        enterBarrier(s"initialized-$c")
        runOn(side1 ++ side2: _*) {
          log.info("Initialized {} {} in round {}", myself.name, Cluster(sys).selfUniqueAddress, c)
        }

        runOn(node1) {
          for (n1 <- side1; n2 <- side2)
            blackhole(n1, n2)
        }
        enterBarrier(s"blackhole-$c")

        val resolvedExpected = scenario.expected match {
          case KeepLeader =>
            import Member.addressOrdering
            val address = (side1 ++ side2).map(sysAddress).min
            if (side1.exists(sysAddress(_) == address)) KeepSide1
            else if (side2.exists(sysAddress(_) == address)) KeepSide2
            else ShutdownBoth
          case other => other
        }

        resolvedExpected match {
          case ShutdownBoth =>
            runOn(side1 ++ side2: _*) {
              awaitCond(Cluster(sys).isTerminated, max = 30.seconds)
            }
            enterBarrier(s"sys-terminated-$c")
            runOn(node1) {
              singletonProbe.within(20.seconds) {
                singletonProbe.expectMsg(Unregister(singletonRegisterKey(side1.head), sysAddress(side1.head)))
              }
              shardingProbe.expectNoMessage(100.millis)
            }

          case KeepSide1 =>
            runOn(side1: _*) {
              val expectedAddresses = side1.map(sysAddress).toSet
              within(remaining - 3.seconds) {
                awaitAssert {
                  val probe = TestProbe()(sys)
                  for (i <- 0 until 10) {
                    region.tell(i, probe.ref)
                    probe.expectMsg(2.seconds, i)
                  }

                  Cluster(sys).state.members.map(_.address) should be(expectedAddresses)
                }
              }
            }
            runOn(side2: _*) {
              awaitCond(Cluster(sys).isTerminated, max = 30.seconds)
            }
            enterBarrier(s"cluster-shutdown-verified-$c")
            singletonProbe.expectNoMessage(1.second)
            shardingProbe.expectNoMessage(100.millis)

          case KeepSide2 =>
            runOn(side1: _*) {
              awaitCond(Cluster(sys).isTerminated, max = 30.seconds)
            }
            enterBarrier(s"sys-terminated-$c")
            runOn(node1) {
              singletonProbe.within(30.seconds) {
                singletonProbe.expectMsg(Unregister(singletonRegisterKey(side1.head), sysAddress(side1.head)))
                singletonProbe.expectMsg(Register(singletonRegisterKey(side2.head), sysAddress(side2.head)))
              }
              shardingProbe.expectNoMessage(100.millis)
            }
            runOn(side2: _*) {
              val expectedAddresses = side2.map(sysAddress).toSet
              within(remaining - 3.seconds) {
                awaitAssert {
                  val probe = TestProbe()(sys)
                  for (i <- 0 until 10) {
                    region.tell(i, probe.ref)
                    probe.expectMsg(2.seconds, i)
                  }

                  Cluster(sys).state.members.map(_.address) should be(expectedAddresses)
                }
              }
            }

          case KeepAll =>
            runOn((side1 ++ side2): _*) {
              val expectedAddresses = (side1 ++ side2).map(sysAddress).toSet
              within(remaining - 3.seconds) {
                awaitAssert {
                  val probe = TestProbe()(sys)
                  for (i <- 0 until 10) {
                    region.tell(i, probe.ref)
                    probe.expectMsg(2.seconds, i)
                  }

                  Cluster(sys).state.members.map(_.address) should be(expectedAddresses)
                }
              }
              Cluster(sys).isTerminated should be(false)
            }
            enterBarrier(s"cluster-intact-verified-$c")

          case KeepLeader => throw new IllegalStateException // already resolved to other case
        }

        enterBarrier(s"verified-$c")
      }
      enterBarrier(s"after-$c")
    }