def apply()

in pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala [35:136]


  def apply(userId: String): Behavior[Command] = Behaviors.setup { context =>
    DistributedData.withReplicatorMessageAdapter[Command, LWWMap[String, LineItem]] { replicator =>
      implicit val node: SelfUniqueAddress = DistributedData(context.system).selfUniqueAddress

      val DataKey = LWWMapKey[String, LineItem]("cart-" + userId)

      def behavior = Behaviors.receiveMessagePartial(
        receiveGetCart
          .orElse(receiveAddItem)
          .orElse(receiveRemoveItem)
          .orElse(receiveOther))

      def receiveGetCart: PartialFunction[Command, Behavior[Command]] = {
        case GetCart(replyTo) =>
          replicator.askGet(
            askReplyTo => Get(DataKey, readMajority, askReplyTo),
            rsp => InternalGetResponse(replyTo, rsp))

          Behaviors.same

        case InternalGetResponse(replyTo, g @ GetSuccess(DataKey, _)) =>
          val data = g.get(DataKey)
          val cart = Cart(data.entries.values.toSet)
          replyTo ! cart
          Behaviors.same

        case InternalGetResponse(replyTo, NotFound(DataKey, _)) =>
          replyTo ! Cart(Set.empty)
          Behaviors.same

        case InternalGetResponse(replyTo, GetFailure(DataKey, _)) =>
          // ReadMajority failure, try again with local read
          replicator.askGet(
            askReplyTo => Get(DataKey, ReadLocal, askReplyTo),
            rsp => InternalGetResponse(replyTo, rsp))

          Behaviors.same
      }

      def receiveAddItem: PartialFunction[Command, Behavior[Command]] = {
        case AddItem(item) =>
          replicator.askUpdate(
            askReplyTo =>
              Update(DataKey, LWWMap.empty[String, LineItem], writeMajority, askReplyTo) {
                cart => updateCart(cart, item)
              },
            InternalUpdateResponse.apply)

          Behaviors.same
      }

      def updateCart(data: LWWMap[String, LineItem], item: LineItem): LWWMap[String, LineItem] = {
        data.get(item.productId) match {
          case Some(LineItem(_, _, existingQuantity)) =>
            data :+ (item.productId -> item.copy(quantity = existingQuantity + item.quantity))
          case None => data :+ (item.productId -> item)
        }
      }

      def receiveRemoveItem: PartialFunction[Command, Behavior[Command]] = {
        case RemoveItem(productId) =>
          // Try to fetch latest from a majority of nodes first, since ORMap
          // remove must have seen the item to be able to remove it.
          replicator.askGet(
            askReplyTo => Get(DataKey, readMajority, askReplyTo),
            rsp => InternalRemoveItem(productId, rsp))

          Behaviors.same

        case InternalRemoveItem(productId, GetSuccess(DataKey, _)) =>
          removeItem(productId)
          Behaviors.same

        case InternalRemoveItem(productId, GetFailure(DataKey, _)) =>
          // ReadMajority failed, fall back to best effort local value
          removeItem(productId)
          Behaviors.same

        case InternalRemoveItem(_, NotFound(DataKey, _)) =>
          // nothing to remove
          Behaviors.same
      }

      def removeItem(productId: String): Unit = {
        replicator.askUpdate(
          askReplyTo =>
            Update(DataKey, LWWMap.empty[String, LineItem], writeMajority, askReplyTo) {
              _.remove(node, productId)
            },
          InternalUpdateResponse.apply)
      }

      def receiveOther: PartialFunction[Command, Behavior[Command]] = {
        case InternalUpdateResponse(_: UpdateSuccess[_]) => Behaviors.same
        case InternalUpdateResponse(_: UpdateTimeout[_]) => Behaviors.same
        // UpdateTimeout, will eventually be replicated
        case InternalUpdateResponse(e: UpdateFailure[_]) => throw new IllegalStateException("Unexpected failure: " + e)
      }

      behavior
    }
  }