in pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ReplicatedCache.scala [25:68]
def apply(): Behavior[Command] = Behaviors.setup { context =>
DistributedData.withReplicatorMessageAdapter[Command, LWWMap[String, String]] { replicator =>
implicit val node: SelfUniqueAddress = DistributedData(context.system).selfUniqueAddress
def dataKey(entryKey: String): LWWMapKey[String, String] =
LWWMapKey("cache-" + math.abs(entryKey.hashCode % 100))
Behaviors.receiveMessage[Command] {
case PutInCache(key, value) =>
replicator.askUpdate(
askReplyTo =>
Update(dataKey(key), LWWMap.empty[String, String], WriteLocal, askReplyTo)(_ :+ (key -> value)),
InternalUpdateResponse.apply)
Behaviors.same
case Evict(key) =>
replicator.askUpdate(
askReplyTo =>
Update(dataKey(key), LWWMap.empty[String, String], WriteLocal, askReplyTo)(_.remove(node, key)),
InternalUpdateResponse.apply)
Behaviors.same
case GetFromCache(key, replyTo) =>
replicator.askGet(
askReplyTo => Get(dataKey(key), ReadLocal, askReplyTo),
rsp => InternalGetResponse(key, replyTo, rsp))
Behaviors.same
case InternalGetResponse(key, replyTo, g @ GetSuccess(_, _)) =>
replyTo ! Cached(key, g.dataValue.get(key))
Behaviors.same
case InternalGetResponse(key, replyTo, _: NotFound[_]) =>
replyTo ! Cached(key, None)
Behaviors.same
case _: InternalGetResponse => Behaviors.same // ok
case _: InternalUpdateResponse => Behaviors.same // ok
}
}
}