in cluster-sharding/src/multi-jvm/scala/org/apache/pekko/cluster/sbr/RandomizedBrainResolverIntegrationSpec.scala [246:395]
def verify(): Unit = {
val nodes = roles.take(scenario.numberOfNodes)
def sendToSharding(expectReply: Boolean): Unit = {
runOn(nodes: _*) {
if (!Cluster(sys).isTerminated) {
val probe = TestProbe()(sys)
for (i <- 0 until 10) {
region.tell(i, probe.ref)
if (expectReply)
probe.expectMsg(3.seconds, i)
}
}
}
}
runOn(nodes: _*) {
log.info("Running {} {} in round {}", myself.name, Cluster(sys).selfUniqueAddress, c)
}
val randomSeed = sys.settings.config.getLong("test.random-seed")
val random = new Random(randomSeed)
enterBarrier(s"log-startup-$c")
within(3.minutes) {
join(nodes.head, nodes.head, awaitUp = true) // oldest
join(nodes.last, nodes.head, awaitUp = true) // next oldest
for (n <- nodes.tail.dropRight(1))
join(n, nodes.head, awaitUp = false)
runOn(nodes: _*) {
awaitMemberUp()
}
enterBarrier(s"all-up-$c")
singletonProbe.expectNoMessage(1.second)
shardingProbe.expectNoMessage(10.millis)
sendToSharding(expectReply = true)
enterBarrier(s"initialized-$c")
runOn(nodes: _*) {
log.info("Initialized {} {} in round {}", myself.name, Cluster(sys).selfUniqueAddress, c)
}
runOn(node1) {
val cleanSplit = random.nextBoolean()
val healCleanSplit = cleanSplit && random.nextBoolean()
val side1 = nodes.take(1 + random.nextInt(nodes.size - 1))
val side2 = nodes.drop(side1.size)
// The test is limited to one flaky step, see issue #29185.
val numberOfFlaky = if (cleanSplit) 0 else 1
val healLastFlaky = numberOfFlaky > 0 && random.nextBoolean()
val flaky: Map[Int, (RoleName, List[RoleName])] =
(0 until numberOfFlaky).map { i =>
val from = nodes(random.nextInt(nodes.size))
val targets = nodes.filterNot(_ == from)
val to = (0 to random.nextInt(math.min(5, targets.size))).map(j => targets(j)).toList
i -> (from -> to)
}.toMap
val delays = (0 until 10).map(_ => 2 + random.nextInt(13))
log.info(
s"Generated $scenario with random seed [$randomSeed] in round [$c]: " +
s"cleanSplit [$cleanSplit], healCleanSplit [$healCleanSplit] " +
(if (cleanSplit)
s"side1 [${side1.map(_.name).mkString(", ")}], side2 [${side2.map(_.name).mkString(", ")}] "
else " ") +
s", flaky [${flaky.map { case (_, (from, to)) =>
from.name -> to.map(_.name).mkString("(", ", ", ")")
}.mkString("; ")}] " +
s", healLastFlaky [$healLastFlaky] " +
s", delays [${delays.mkString(", ")}]")
var delayIndex = 0
def nextDelay(): Unit = {
Thread.sleep(delays(delayIndex) * 1000)
delayIndex += 1
}
if (cleanSplit) {
for (n1 <- side1; n2 <- side2)
blackhole(n1, n2)
nextDelay()
}
flaky.foreach {
case (i, (from, to)) =>
if (i != 0) {
// heal previous flakiness
val (prevFrom, prevTo) = flaky(i - 1)
for (n <- prevTo)
passThrough(prevFrom, n)
}
for (n <- to)
blackhole(from, n)
nextDelay()
}
if (healLastFlaky) {
val (prevFrom, prevTo) = flaky(flaky.size - 1)
for (n <- prevTo)
passThrough(prevFrom, n)
nextDelay()
}
if (healCleanSplit) {
for (n1 <- side1; n2 <- side2)
passThrough(n1, n2)
}
}
enterBarrier(s"scenario-done-$c")
runOn(nodes: _*) {
sendToSharding(expectReply = false)
singletonProbe.expectNoMessage(10.seconds)
shardingProbe.expectNoMessage(10.millis)
var loopLimit = 20
while (loopLimit != 0 && !Cluster(sys).isTerminated && Cluster(sys).state.unreachable.nonEmpty) {
sendToSharding(expectReply = false)
singletonProbe.expectNoMessage(5.seconds)
shardingProbe.expectNoMessage(10.millis)
loopLimit -= 1
}
}
enterBarrier(s"terminated-or-unreachable-removed-$c")
runOn(nodes: _*) {
(Cluster(sys).isTerminated || Cluster(sys).state.unreachable.isEmpty) should ===(true)
within(30.seconds) {
awaitAssert {
sendToSharding(expectReply = true)
}
}
singletonProbe.expectNoMessage(5.seconds)
shardingProbe.expectNoMessage(10.millis)
if (!Cluster(sys).isTerminated)
log.info(s"Survived ${Cluster(sys).state.members.size} members in round $c")
}
enterBarrier(s"verified-$c")
}
enterBarrier(s"after-$c")
}