pekko-sample-distributed-data-java/src/multi-jvm/scala/sample/distributeddata/ShoppingCartSpec.scala (91 lines of code) (raw):

package sample.distributeddata import scala.concurrent.duration._ import org.apache.pekko.actor.testkit.typed.scaladsl.TestProbe import org.apache.pekko.actor.typed.ActorSystem import org.apache.pekko.actor.typed.scaladsl.adapter._ import org.apache.pekko.cluster.ddata.Replicator import org.apache.pekko.cluster.ddata.typed.scaladsl.DistributedData import org.apache.pekko.cluster.ddata.typed.scaladsl.Replicator.GetReplicaCount import org.apache.pekko.cluster.ddata.typed.scaladsl.Replicator.ReplicaCount import org.apache.pekko.cluster.typed.{ Cluster, Join } import org.apache.pekko.remote.testconductor.RoleName import org.apache.pekko.remote.testkit.MultiNodeConfig import org.apache.pekko.remote.testkit.MultiNodeSpec import com.typesafe.config.ConfigFactory import scala.collection.JavaConverters._ object ShoppingCartSpec extends MultiNodeConfig { val node1 = role("node-1") val node2 = role("node-2") val node3 = role("node-3") commonConfig(ConfigFactory.parseString(""" pekko.loglevel = INFO pekko.actor.provider = "cluster" pekko.log-dead-letters-during-shutdown = off pekko.actor.serialization-bindings { "sample.distributeddata.ShoppingCart$LineItem" = jackson-cbor } """)) } class ShoppingCartSpecMultiJvmNode1 extends ShoppingCartSpec class ShoppingCartSpecMultiJvmNode2 extends ShoppingCartSpec class ShoppingCartSpecMultiJvmNode3 extends ShoppingCartSpec class ShoppingCartSpec extends MultiNodeSpec(ShoppingCartSpec) with STMultiNodeSpec { import ShoppingCartSpec._ import ShoppingCart._ override def initialParticipants = roles.size implicit val typedSystem: ActorSystem[Nothing] = system.toTyped val cluster = Cluster(typedSystem) val shoppingCart = system.spawnAnonymous(ShoppingCart.create("user-1")) def join(from: RoleName, to: RoleName): Unit = { runOn(from) { cluster.manager ! Join(node(to).address) } enterBarrier(from.name + "-joined") } "Demo of a replicated shopping cart" must { "join cluster" in within(20.seconds) { join(node1, node1) join(node2, node1) join(node3, node1) awaitAssert { val probe = TestProbe[ReplicaCount]() DistributedData(typedSystem).replicator ! GetReplicaCount(probe.ref) probe.expectMessage(Replicator.ReplicaCount(roles.size)) } enterBarrier("after-1") } "handle updates directly after start" in within(15.seconds) { runOn(node2) { shoppingCart ! new ShoppingCart.AddItem(new LineItem("1", "Apples", 2)) shoppingCart ! new ShoppingCart.AddItem(new LineItem("2", "Oranges", 3)) } enterBarrier("updates-done") awaitAssert { val probe = TestProbe[Cart]() shoppingCart ! new ShoppingCart.GetCart(probe.ref) val cart = probe.expectMessageType[Cart] cart.items.asScala.toSet should be(Set( new LineItem("1", "Apples", 2), new LineItem("2", "Oranges", 3))) } enterBarrier("after-2") } "handle updates from different nodes" in within(5.seconds) { runOn(node2) { shoppingCart ! new ShoppingCart.AddItem(new LineItem("1", "Apples", 5)) shoppingCart ! new ShoppingCart.RemoveItem("2") } runOn(node3) { shoppingCart ! new ShoppingCart.AddItem(new LineItem("3", "Bananas", 4)) } enterBarrier("updates-done") awaitAssert { val probe = TestProbe[Cart]() shoppingCart ! new ShoppingCart.GetCart(probe.ref) val cart = probe.expectMessageType[Cart] cart.items.asScala.toSet should be( Set(new LineItem("1", "Apples", 7), new LineItem("3", "Bananas", 4))) } enterBarrier("after-3") } } }