in pekko-sample-distributed-data-scala/src/main/scala/sample/distributeddata/VotingService.scala [32:113]
def apply(): Behavior[Command] = Behaviors.setup { context =>
DistributedData.withReplicatorMessageAdapter[Command, Flag] { replicatorFlag =>
DistributedData.withReplicatorMessageAdapter[Command, PNCounterMap[String]] { replicatorCounters =>
implicit val node: SelfUniqueAddress = DistributedData(context.system).selfUniqueAddress
val OpenedKey = FlagKey("contestOpened")
val ClosedKey = FlagKey("contestClosed")
val CountersKey = PNCounterMapKey[String]("contestCounters")
replicatorFlag.subscribe(OpenedKey, InternalSubscribeResponse.apply)
def start = Behaviors.receiveMessagePartial[Command] {
case Open =>
replicatorFlag.askUpdate(
askReplyTo => Update(OpenedKey, Flag(), WriteAll(5.seconds), askReplyTo)(_.switchOn),
InternalUpdateResponse.apply)
becomeOpen()
case InternalSubscribeResponse(c @ Changed(OpenedKey)) if c.get(OpenedKey).enabled =>
becomeOpen()
case GetVotes(replyTo) =>
replyTo ! Votes(Map.empty, open = false)
Behaviors.same
}
def becomeOpen() = {
replicatorFlag.unsubscribe(OpenedKey)
replicatorFlag.subscribe(ClosedKey, InternalSubscribeResponse.apply)
Behaviors.receiveMessagePartial(open.orElse(getVotes(open = true)))
}
def open: PartialFunction[Command, Behavior[Command]] = {
case Vote(participant) =>
replicatorCounters.askUpdate(
askReplyTo =>
Update(CountersKey, PNCounterMap[String](), WriteLocal, askReplyTo)(_.incrementBy(participant, 1)),
InternalUpdateResponse.apply)
Behaviors.same
case InternalUpdateResponse(_: UpdateSuccess[_]) => Behaviors.same
case Close =>
replicatorFlag.askUpdate(
askReplyTo => Update(ClosedKey, Flag(), WriteAll(5.seconds), askReplyTo)(_.switchOn),
InternalUpdateResponse.apply)
Behaviors.receiveMessagePartial(getVotes(open = false))
case InternalSubscribeResponse(c @ Changed(ClosedKey)) if c.get(ClosedKey).enabled =>
Behaviors.receiveMessagePartial(getVotes(open = false))
case InternalSubscribeResponse(Changed(OpenedKey)) => Behaviors.same
}
def getVotes(open: Boolean): PartialFunction[Command, Behavior[Command]] = {
case GetVotes(replyTo) =>
replicatorCounters.askGet(
askReplyTo => Get(CountersKey, ReadAll(3.seconds), askReplyTo),
rsp => InternalGetResponse(replyTo, rsp))
Behaviors.same
case InternalGetResponse(replyTo, g @ GetSuccess(CountersKey, _)) =>
val data = g.get(CountersKey)
replyTo ! Votes(data.entries, open)
Behaviors.same
case InternalGetResponse(replyTo, NotFound(CountersKey, _)) =>
replyTo ! Votes(Map.empty, open)
Behaviors.same
case InternalGetResponse(_, _: GetFailure[_]) => Behaviors.same
case InternalUpdateResponse(_: UpdateSuccess[_]) => Behaviors.same
}
start
}
}
}