in pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala [35:136]
def apply(userId: String): Behavior[Command] = Behaviors.setup { context =>
DistributedData.withReplicatorMessageAdapter[Command, LWWMap[String, LineItem]] { replicator =>
implicit val node: SelfUniqueAddress = DistributedData(context.system).selfUniqueAddress
val DataKey = LWWMapKey[String, LineItem]("cart-" + userId)
def behavior = Behaviors.receiveMessagePartial(
receiveGetCart
.orElse(receiveAddItem)
.orElse(receiveRemoveItem)
.orElse(receiveOther))
def receiveGetCart: PartialFunction[Command, Behavior[Command]] = {
case GetCart(replyTo) =>
replicator.askGet(
askReplyTo => Get(DataKey, readMajority, askReplyTo),
rsp => InternalGetResponse(replyTo, rsp))
Behaviors.same
case InternalGetResponse(replyTo, g @ GetSuccess(DataKey, _)) =>
val data = g.get(DataKey)
val cart = Cart(data.entries.values.toSet)
replyTo ! cart
Behaviors.same
case InternalGetResponse(replyTo, NotFound(DataKey, _)) =>
replyTo ! Cart(Set.empty)
Behaviors.same
case InternalGetResponse(replyTo, GetFailure(DataKey, _)) =>
// ReadMajority failure, try again with local read
replicator.askGet(
askReplyTo => Get(DataKey, ReadLocal, askReplyTo),
rsp => InternalGetResponse(replyTo, rsp))
Behaviors.same
}
def receiveAddItem: PartialFunction[Command, Behavior[Command]] = {
case AddItem(item) =>
replicator.askUpdate(
askReplyTo =>
Update(DataKey, LWWMap.empty[String, LineItem], writeMajority, askReplyTo) {
cart => updateCart(cart, item)
},
InternalUpdateResponse.apply)
Behaviors.same
}
def updateCart(data: LWWMap[String, LineItem], item: LineItem): LWWMap[String, LineItem] = {
data.get(item.productId) match {
case Some(LineItem(_, _, existingQuantity)) =>
data :+ (item.productId -> item.copy(quantity = existingQuantity + item.quantity))
case None => data :+ (item.productId -> item)
}
}
def receiveRemoveItem: PartialFunction[Command, Behavior[Command]] = {
case RemoveItem(productId) =>
// Try to fetch latest from a majority of nodes first, since ORMap
// remove must have seen the item to be able to remove it.
replicator.askGet(
askReplyTo => Get(DataKey, readMajority, askReplyTo),
rsp => InternalRemoveItem(productId, rsp))
Behaviors.same
case InternalRemoveItem(productId, GetSuccess(DataKey, _)) =>
removeItem(productId)
Behaviors.same
case InternalRemoveItem(productId, GetFailure(DataKey, _)) =>
// ReadMajority failed, fall back to best effort local value
removeItem(productId)
Behaviors.same
case InternalRemoveItem(_, NotFound(DataKey, _)) =>
// nothing to remove
Behaviors.same
}
def removeItem(productId: String): Unit = {
replicator.askUpdate(
askReplyTo =>
Update(DataKey, LWWMap.empty[String, LineItem], writeMajority, askReplyTo) {
_.remove(node, productId)
},
InternalUpdateResponse.apply)
}
def receiveOther: PartialFunction[Command, Behavior[Command]] = {
case InternalUpdateResponse(_: UpdateSuccess[_]) => Behaviors.same
case InternalUpdateResponse(_: UpdateTimeout[_]) => Behaviors.same
// UpdateTimeout, will eventually be replicated
case InternalUpdateResponse(e: UpdateFailure[_]) => throw new IllegalStateException("Unexpected failure: " + e)
}
behavior
}
}