def verify()

in cluster-sharding/src/multi-jvm/scala/org/apache/pekko/cluster/sbr/RandomizedBrainResolverIntegrationSpec.scala [246:393]


    def verify(): Unit = {
      val nodes = roles.take(scenario.numberOfNodes)

      def sendToSharding(expectReply: Boolean): Unit = {
        runOn(nodes: _*) {
          if (!Cluster(sys).isTerminated) {
            val probe = TestProbe()(sys)
            for (i <- 0 until 10) {
              region.tell(i, probe.ref)
              if (expectReply)
                probe.expectMsg(3.seconds, i)
            }
          }
        }
      }

      runOn(nodes: _*) {
        log.info("Running {} {} in round {}", myself.name, Cluster(sys).selfUniqueAddress, c)
      }
      val randomSeed = sys.settings.config.getLong("test.random-seed")
      val random = new Random(randomSeed)
      enterBarrier(s"log-startup-$c")

      within(3.minutes) {

        join(nodes.head, nodes.head, awaitUp = true) // oldest
        join(nodes.last, nodes.head, awaitUp = true) // next oldest
        for (n <- nodes.tail.dropRight(1))
          join(n, nodes.head, awaitUp = false)
        runOn(nodes: _*) {
          awaitMemberUp()
        }
        enterBarrier(s"all-up-$c")

        singletonProbe.expectNoMessage(1.second)
        shardingProbe.expectNoMessage(10.millis)

        sendToSharding(expectReply = true)

        enterBarrier(s"initialized-$c")
        runOn(nodes: _*) {
          log.info("Initialized {} {} in round {}", myself.name, Cluster(sys).selfUniqueAddress, c)
        }

        runOn(node1) {
          val cleanSplit = random.nextBoolean()
          val healCleanSplit = cleanSplit && random.nextBoolean()
          val side1 = nodes.take(1 + random.nextInt(nodes.size - 1))
          val side2 = nodes.drop(side1.size)

          // The test is limited to one flaky step, see issue #29185.
          val numberOfFlaky = if (cleanSplit) 0 else 1
          val healLastFlaky = numberOfFlaky > 0 && random.nextBoolean()
          val flaky: Map[Int, (RoleName, List[RoleName])] =
            (0 until numberOfFlaky).map { i =>
              val from = nodes(random.nextInt(nodes.size))
              val targets = nodes.filterNot(_ == from)
              val to = (0 to random.nextInt(math.min(5, targets.size))).map(j => targets(j)).toList
              i -> (from -> to)
            }.toMap

          val delays = (0 until 10).map(_ => 2 + random.nextInt(13))

          log.info(
            s"Generated $scenario with random seed [$randomSeed] in round [$c]: " +
            s"cleanSplit [$cleanSplit], healCleanSplit [$healCleanSplit] " +
            (if (cleanSplit)
               s"side1 [${side1.map(_.name).mkString(", ")}], side2 [${side2.map(_.name).mkString(", ")}] "
             else " ") +
            s", flaky [${flaky.map { case (_, (from, to)) => from.name -> to.map(_.name).mkString("(", ", ", ")") }.mkString("; ")}] " +
            s", healLastFlaky [$healLastFlaky] " +
            s", delays [${delays.mkString(", ")}]")

          var delayIndex = 0
          def nextDelay(): Unit = {
            Thread.sleep(delays(delayIndex) * 1000)
            delayIndex += 1
          }

          if (cleanSplit) {
            for (n1 <- side1; n2 <- side2)
              blackhole(n1, n2)

            nextDelay()
          }

          flaky.foreach {
            case (i, (from, to)) =>
              if (i != 0) {
                // heal previous flakiness
                val (prevFrom, prevTo) = flaky(i - 1)
                for (n <- prevTo)
                  passThrough(prevFrom, n)
              }

              for (n <- to)
                blackhole(from, n)

              nextDelay()
          }

          if (healLastFlaky) {
            val (prevFrom, prevTo) = flaky(flaky.size - 1)
            for (n <- prevTo)
              passThrough(prevFrom, n)

            nextDelay()
          }

          if (healCleanSplit) {
            for (n1 <- side1; n2 <- side2)
              passThrough(n1, n2)
          }
        }
        enterBarrier(s"scenario-done-$c")

        runOn(nodes: _*) {
          sendToSharding(expectReply = false)
          singletonProbe.expectNoMessage(10.seconds)
          shardingProbe.expectNoMessage(10.millis)

          var loopLimit = 20
          while (loopLimit != 0 && !Cluster(sys).isTerminated && Cluster(sys).state.unreachable.nonEmpty) {
            sendToSharding(expectReply = false)
            singletonProbe.expectNoMessage(5.seconds)
            shardingProbe.expectNoMessage(10.millis)
            loopLimit -= 1
          }
        }
        enterBarrier(s"terminated-or-unreachable-removed-$c")

        runOn(nodes: _*) {
          (Cluster(sys).isTerminated || Cluster(sys).state.unreachable.isEmpty) should ===(true)
          within(30.seconds) {
            awaitAssert {
              sendToSharding(expectReply = true)
            }
          }
          singletonProbe.expectNoMessage(5.seconds)
          shardingProbe.expectNoMessage(10.millis)
          if (!Cluster(sys).isTerminated)
            log.info(s"Survived ${Cluster(sys).state.members.size} members in round $c")
        }

        enterBarrier(s"verified-$c")
      }
      enterBarrier(s"after-$c")
    }