in distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala [1737:1841]
def isLocalSender(): Boolean = !replyTo.path.address.hasGlobalScope
def receiveUpdate[A <: ReplicatedData](
key: KeyR,
modify: Option[A] => A,
writeConsistency: WriteConsistency,
req: Option[Any]): Unit = {
val localValue = getData(key.id)
def deltaOrPlaceholder(d: DeltaReplicatedData): Option[ReplicatedDelta] = {
d.delta match {
case s @ Some(_) => s
case None => Some(NoDeltaPlaceholder)
}
}
Try {
localValue match {
case Some(envelope @ DataEnvelope(DeletedData, _, _)) =>
(envelope, None)
case Some(envelope @ DataEnvelope(existing, _, _)) =>
modify(Some(existing.asInstanceOf[A])) match {
case d: DeltaReplicatedData if deltaCrdtEnabled =>
(envelope.merge(d.resetDelta.asInstanceOf[existing.T]), deltaOrPlaceholder(d))
case d =>
(envelope.merge(d.asInstanceOf[existing.T]), None)
}
case None =>
modify(None) match {
case d: DeltaReplicatedData if deltaCrdtEnabled =>
(DataEnvelope(d.resetDelta), deltaOrPlaceholder(d))
case d => (DataEnvelope(d), None)
}
}
} match {
case Success((DataEnvelope(DeletedData, _, _), _)) =>
log.debug("Received Update for deleted key [{}].", key)
replyTo ! UpdateDataDeleted(key, req)
case Success((envelope, delta)) =>
log.debug("Received Update for key [{}].", key)
// handle the delta
delta match {
case Some(d) => deltaPropagationSelector.update(key.id, d)
case None => // not DeltaReplicatedData
}
// note that it's important to do deltaPropagationSelector.update before setData,
// so that the latest delta version is used
val newEnvelope = setData(key.id, envelope)
val durable = isDurable(key.id)
if (isLocalUpdate(writeConsistency)) {
if (durable)
durableStore ! Store(
key.id,
new DurableDataEnvelope(newEnvelope),
Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), replyTo)))
else
replyTo ! UpdateSuccess(key, req)
} else {
val (writeEnvelope, writeDelta) = delta match {
case Some(NoDeltaPlaceholder) => (newEnvelope, None)
case Some(d: RequiresCausalDeliveryOfDeltas) =>
val v = deltaPropagationSelector.currentVersion(key.id)
(newEnvelope, Some(Delta(newEnvelope.copy(data = d), v, v)))
case Some(d) => (newEnvelope.copy(data = d), None)
case None => (newEnvelope, None)
}
// When RequiresCausalDeliveryOfDeltas use deterministic order to so that sequence numbers
// of subsequent updates are in sync on the destination nodes.
// The order is also kept when prefer-oldest is enabled.
val shuffle = !(settings.preferOldest || writeDelta.exists(_.requiresCausalDeliveryOfDeltas))
val excludeExiting = writeConsistency match {
case _: WriteMajorityPlus | _: WriteAll => true
case _ => false
}
val writeAggregator =
context.actorOf(
WriteAggregator
.props(
key,
writeEnvelope,
writeDelta,
writeConsistency,
req,
selfUniqueAddress,
nodesForReadWrite(excludeExiting),
unreachable,
shuffle,
replyTo,
durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(
key.id,
new DurableDataEnvelope(newEnvelope),
Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), writeAggregator)))
}
}
case Failure(e) =>
log.debug("Received Update for key [{}], failed: {}", key, e.getMessage)
replyTo ! ModifyFailure(key, "Update failed: " + e.getMessage, e, req)
}
}