in cluster-sharding/src/multi-jvm/scala/org/apache/pekko/cluster/sbr/SplitBrainResolverIntegrationSpec.scala [241:386]
def verify(): Unit = {
val side1 = roles.take(scenario.side1Size)
val side2 = roles.drop(scenario.side1Size).take(scenario.side2Size)
def singletonRegisterKey(node: RoleName): String =
"/user/singletonRegistry/singleton-" + scenario.dcDecider(node)
runOn(side1 ++ side2: _*) {
log.info("Running {} {} in round {}", myself.name, Cluster(sys).selfUniqueAddress, c)
}
enterBarrier(s"log-startup-$c")
within(90.seconds) {
join(side1.head, side1.head, awaitUp = true) // oldest
join(side2.head, side1.head, awaitUp = true) // next oldest
for (n <- side1.tail ++ side2.tail)
join(n, side1.head, awaitUp = false)
runOn(side1 ++ side2: _*) {
awaitAllMembersUp(side1 ++ side2: _*)
}
enterBarrier(s"all-up-$c")
runOn(node1) {
singletonProbe.within(25.seconds) {
singletonProbe.expectMsg(Register(singletonRegisterKey(node1), sysAddress(node1)))
}
shardingProbe.expectNoMessage(100.millis)
}
runOn(side1 ++ side2: _*) {
val probe = TestProbe()(sys)
for (i <- 0 until 10) {
region.tell(i, probe.ref)
probe.expectMsg(5.seconds, i)
}
}
enterBarrier(s"initialized-$c")
runOn(side1 ++ side2: _*) {
log.info("Initialized {} {} in round {}", myself.name, Cluster(sys).selfUniqueAddress, c)
}
runOn(node1) {
for (n1 <- side1; n2 <- side2)
blackhole(n1, n2)
}
enterBarrier(s"blackhole-$c")
val resolvedExpected = scenario.expected match {
case KeepLeader =>
import Member.addressOrdering
val address = (side1 ++ side2).map(sysAddress).min
if (side1.exists(sysAddress(_) == address)) KeepSide1
else if (side2.exists(sysAddress(_) == address)) KeepSide2
else ShutdownBoth
case other => other
}
resolvedExpected match {
case ShutdownBoth =>
runOn(side1 ++ side2: _*) {
awaitCond(Cluster(sys).isTerminated, max = 30.seconds)
}
enterBarrier(s"sys-terminated-$c")
runOn(node1) {
singletonProbe.within(20.seconds) {
singletonProbe.expectMsg(Unregister(singletonRegisterKey(side1.head), sysAddress(side1.head)))
}
shardingProbe.expectNoMessage(100.millis)
}
case KeepSide1 =>
runOn(side1: _*) {
val expectedAddresses = side1.map(sysAddress).toSet
within(remaining - 3.seconds) {
awaitAssert {
val probe = TestProbe()(sys)
for (i <- 0 until 10) {
region.tell(i, probe.ref)
probe.expectMsg(2.seconds, i)
}
Cluster(sys).state.members.map(_.address) should be(expectedAddresses)
}
}
}
runOn(side2: _*) {
awaitCond(Cluster(sys).isTerminated, max = 30.seconds)
}
enterBarrier(s"cluster-shutdown-verified-$c")
singletonProbe.expectNoMessage(1.second)
shardingProbe.expectNoMessage(100.millis)
case KeepSide2 =>
runOn(side1: _*) {
awaitCond(Cluster(sys).isTerminated, max = 30.seconds)
}
enterBarrier(s"sys-terminated-$c")
runOn(node1) {
singletonProbe.within(30.seconds) {
singletonProbe.expectMsg(Unregister(singletonRegisterKey(side1.head), sysAddress(side1.head)))
singletonProbe.expectMsg(Register(singletonRegisterKey(side2.head), sysAddress(side2.head)))
}
shardingProbe.expectNoMessage(100.millis)
}
runOn(side2: _*) {
val expectedAddresses = side2.map(sysAddress).toSet
within(remaining - 3.seconds) {
awaitAssert {
val probe = TestProbe()(sys)
for (i <- 0 until 10) {
region.tell(i, probe.ref)
probe.expectMsg(2.seconds, i)
}
Cluster(sys).state.members.map(_.address) should be(expectedAddresses)
}
}
}
case KeepAll =>
runOn((side1 ++ side2): _*) {
val expectedAddresses = (side1 ++ side2).map(sysAddress).toSet
within(remaining - 3.seconds) {
awaitAssert {
val probe = TestProbe()(sys)
for (i <- 0 until 10) {
region.tell(i, probe.ref)
probe.expectMsg(2.seconds, i)
}
Cluster(sys).state.members.map(_.address) should be(expectedAddresses)
}
}
Cluster(sys).isTerminated should be(false)
}
enterBarrier(s"cluster-intact-verified-$c")
case KeepLeader => throw new IllegalStateException // already resolved to other case
}
enterBarrier(s"verified-$c")
}
enterBarrier(s"after-$c")
}