in cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala [46:231]
def apply(
settings: dd.ReplicatorSettings,
underlyingReplicator: Option[pekko.actor.ActorRef]): Behavior[SReplicator.Command] = {
Behaviors.setup { ctx =>
val classicReplicator = underlyingReplicator match {
case Some(ref) => ref
case None =>
// FIXME perhaps add supervisor for restarting, see PR https://github.com/akka/akka/pull/25988
val classicReplicatorProps = dd.Replicator.props(settings)
ctx.actorOf(classicReplicatorProps, name = "underlying")
}
def withState(
subscribeAdapters: Map[
ActorRef[JReplicator.SubscribeResponse[ReplicatedData]], ActorRef[dd.Replicator.SubscribeResponse[
ReplicatedData]]]): Behavior[SReplicator.Command] = {
def stopSubscribeAdapter(
subscriber: ActorRef[JReplicator.SubscribeResponse[ReplicatedData]]): Behavior[SReplicator.Command] = {
subscribeAdapters.get(subscriber) match {
case Some(adapter) =>
// will be unsubscribed from classicReplicator via Terminated
ctx.stop(adapter)
withState(subscribeAdapters - subscriber)
case None => // already unsubscribed or terminated
Behaviors.same
}
}
Behaviors
.receive[SReplicator.Command] { (ctx, msg) =>
msg match {
case cmd: SReplicator.Get[_] =>
classicReplicator.tell(dd.Replicator.Get(cmd.key, cmd.consistency), sender = cmd.replyTo.toClassic)
Behaviors.same
case cmd: JReplicator.Get[d] =>
implicit val timeout: Timeout = Timeout(cmd.consistency.timeout match {
case java.time.Duration.ZERO => localAskTimeout
case t => t.asScala + additionalAskTimeout
})
import ctx.executionContext
val reply =
(classicReplicator ? dd.Replicator.Get(cmd.key, cmd.consistency.toClassic))
.mapTo[dd.Replicator.GetResponse[d]]
.map {
case rsp: dd.Replicator.GetSuccess[d] =>
JReplicator.GetSuccess(rsp.key)(rsp.dataValue)
case rsp: dd.Replicator.NotFound[d] => JReplicator.NotFound(rsp.key)
case rsp: dd.Replicator.GetFailure[d] => JReplicator.GetFailure(rsp.key)
case rsp: dd.Replicator.GetDataDeleted[d] => JReplicator.GetDataDeleted(rsp.key)
}
.recover {
case _ => JReplicator.GetFailure(cmd.key)
}
reply.foreach { cmd.replyTo ! _ }
Behaviors.same
case cmd: SReplicator.Update[_] =>
classicReplicator.tell(
dd.Replicator.Update(cmd.key, cmd.writeConsistency, None)(cmd.modify),
sender = cmd.replyTo.toClassic)
Behaviors.same
case cmd: JReplicator.Update[d] =>
implicit val timeout: Timeout = Timeout(cmd.writeConsistency.timeout match {
case java.time.Duration.ZERO => localAskTimeout
case t => t.asScala + additionalAskTimeout
})
import ctx.executionContext
val reply =
(classicReplicator ? dd.Replicator.Update(cmd.key, cmd.writeConsistency.toClassic, None)(cmd.modify))
.mapTo[dd.Replicator.UpdateResponse[d]]
.map {
case rsp: dd.Replicator.UpdateSuccess[d] => JReplicator.UpdateSuccess(rsp.key)
case rsp: dd.Replicator.UpdateTimeout[d] => JReplicator.UpdateTimeout(rsp.key)
case rsp: dd.Replicator.ModifyFailure[d] =>
JReplicator.ModifyFailure(rsp.key, rsp.errorMessage, rsp.cause)
case rsp: dd.Replicator.StoreFailure[d] => JReplicator.StoreFailure(rsp.key)
case rsp: dd.Replicator.UpdateDataDeleted[d] => JReplicator.UpdateDataDeleted(rsp.key)
}
.recover {
case _ => JReplicator.UpdateTimeout(cmd.key)
}
reply.foreach { cmd.replyTo ! _ }
Behaviors.same
case cmd: SReplicator.Subscribe[_] =>
// For the Scala API the SubscribeResponse messages can be sent directly to the subscriber
classicReplicator.tell(
dd.Replicator.Subscribe(cmd.key, cmd.subscriber.toClassic),
sender = cmd.subscriber.toClassic)
Behaviors.same
case cmd: JReplicator.Subscribe[ReplicatedData] @unchecked =>
// For the Java API the Changed/Deleted messages must be mapped to the JReplicator.Changed/Deleted class.
// That is done with an adapter, and we have to keep track of the lifecycle of the original
// subscriber and stop the adapter when the original subscriber is stopped.
val adapter: ActorRef[dd.Replicator.SubscribeResponse[ReplicatedData]] = ctx.spawnMessageAdapter {
rsp =>
InternalSubscribeResponse(rsp, cmd.subscriber)
}
classicReplicator.tell(
dd.Replicator.Subscribe(cmd.key, adapter.toClassic),
sender = pekko.actor.ActorRef.noSender)
ctx.watch(cmd.subscriber)
withState(subscribeAdapters.updated(cmd.subscriber, adapter))
case InternalSubscribeResponse(rsp, subscriber) =>
rsp match {
case chg: dd.Replicator.Changed[_] => subscriber ! JReplicator.Changed(chg.key)(chg.dataValue)
case del: dd.Replicator.Deleted[_] => subscriber ! JReplicator.Deleted(del.key)
}
Behaviors.same
case cmd: SReplicator.Unsubscribe[_] =>
classicReplicator.tell(
dd.Replicator.Unsubscribe(cmd.key, cmd.subscriber.toClassic),
sender = cmd.subscriber.toClassic)
Behaviors.same
case cmd: JReplicator.Unsubscribe[ReplicatedData] @unchecked =>
stopSubscribeAdapter(cmd.subscriber)
case cmd: SReplicator.Delete[_] =>
classicReplicator.tell(dd.Replicator.Delete(cmd.key, cmd.consistency), sender = cmd.replyTo.toClassic)
Behaviors.same
case cmd: JReplicator.Delete[d] =>
implicit val timeout: Timeout = Timeout(cmd.consistency.timeout match {
case java.time.Duration.ZERO => localAskTimeout
case t => t.asScala + additionalAskTimeout
})
import ctx.executionContext
val reply =
(classicReplicator ? dd.Replicator.Delete(cmd.key, cmd.consistency.toClassic))
.mapTo[dd.Replicator.DeleteResponse[d]]
.map {
case rsp: dd.Replicator.DeleteSuccess[d] => JReplicator.DeleteSuccess(rsp.key)
case rsp: dd.Replicator.ReplicationDeleteFailure[d] =>
JReplicator.DeleteFailure(rsp.key)
case rsp: dd.Replicator.DataDeleted[d] => JReplicator.DataDeleted(rsp.key)
case rsp: dd.Replicator.StoreFailure[d] => JReplicator.StoreFailure(rsp.key)
}
.recover {
case _ => JReplicator.DeleteFailure(cmd.key)
}
reply.foreach { cmd.replyTo ! _ }
Behaviors.same
case SReplicator.GetReplicaCount(replyTo) =>
classicReplicator.tell(dd.Replicator.GetReplicaCount, sender = replyTo.toClassic)
Behaviors.same
case JReplicator.GetReplicaCount(replyTo) =>
implicit val timeout = Timeout(localAskTimeout)
import ctx.executionContext
val reply =
(classicReplicator ? dd.Replicator.GetReplicaCount)
.mapTo[dd.Replicator.ReplicaCount]
.map(rsp => JReplicator.ReplicaCount(rsp.n))
reply.foreach { replyTo ! _ }
Behaviors.same
case SReplicator.FlushChanges | JReplicator.FlushChanges =>
classicReplicator.tell(dd.Replicator.FlushChanges, sender = pekko.actor.ActorRef.noSender)
Behaviors.same
case unexpected =>
throw new RuntimeException(s"Unexpected message: ${unexpected.getClass}") // compiler exhaustiveness check pleaser
}
}
.receiveSignal {
case (_, Terminated(ref: ActorRef[JReplicator.SubscribeResponse[ReplicatedData]] @unchecked)) =>
stopSubscribeAdapter(ref)
}
}
withState(Map.empty)
}
}