def isLocalSender()

in distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala [1737:1841]


  def isLocalSender(): Boolean = !replyTo.path.address.hasGlobalScope

  def receiveUpdate[A <: ReplicatedData](
      key: KeyR,
      modify: Option[A] => A,
      writeConsistency: WriteConsistency,
      req: Option[Any]): Unit = {
    val localValue = getData(key.id)

    def deltaOrPlaceholder(d: DeltaReplicatedData): Option[ReplicatedDelta] = {
      d.delta match {
        case s @ Some(_) => s
        case None        => Some(NoDeltaPlaceholder)
      }
    }

    Try {
      localValue match {
        case Some(envelope @ DataEnvelope(DeletedData, _, _)) =>
          (envelope, None)
        case Some(envelope @ DataEnvelope(existing, _, _)) =>
          modify(Some(existing.asInstanceOf[A])) match {
            case d: DeltaReplicatedData if deltaCrdtEnabled =>
              (envelope.merge(d.resetDelta.asInstanceOf[existing.T]), deltaOrPlaceholder(d))
            case d =>
              (envelope.merge(d.asInstanceOf[existing.T]), None)
          }
        case None =>
          modify(None) match {
            case d: DeltaReplicatedData if deltaCrdtEnabled =>
              (DataEnvelope(d.resetDelta), deltaOrPlaceholder(d))
            case d => (DataEnvelope(d), None)
          }
      }
    } match {
      case Success((DataEnvelope(DeletedData, _, _), _)) =>
        log.debug("Received Update for deleted key [{}].", key)
        replyTo ! UpdateDataDeleted(key, req)
      case Success((envelope, delta)) =>
        log.debug("Received Update for key [{}].", key)

        // handle the delta
        delta match {
          case Some(d) => deltaPropagationSelector.update(key.id, d)
          case None    => // not DeltaReplicatedData
        }

        // note that it's important to do deltaPropagationSelector.update before setData,
        // so that the latest delta version is used
        val newEnvelope = setData(key.id, envelope)

        val durable = isDurable(key.id)
        if (isLocalUpdate(writeConsistency)) {
          if (durable)
            durableStore ! Store(
              key.id,
              new DurableDataEnvelope(newEnvelope),
              Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), replyTo)))
          else
            replyTo ! UpdateSuccess(key, req)
        } else {
          val (writeEnvelope, writeDelta) = delta match {
            case Some(NoDeltaPlaceholder) => (newEnvelope, None)
            case Some(d: RequiresCausalDeliveryOfDeltas) =>
              val v = deltaPropagationSelector.currentVersion(key.id)
              (newEnvelope, Some(Delta(newEnvelope.copy(data = d), v, v)))
            case Some(d) => (newEnvelope.copy(data = d), None)
            case None    => (newEnvelope, None)
          }
          // When RequiresCausalDeliveryOfDeltas use deterministic order to so that sequence numbers
          // of subsequent updates are in sync on the destination nodes.
          // The order is also kept when prefer-oldest is enabled.
          val shuffle = !(settings.preferOldest || writeDelta.exists(_.requiresCausalDeliveryOfDeltas))
          val excludeExiting = writeConsistency match {
            case _: WriteMajorityPlus | _: WriteAll => true
            case _                                  => false
          }
          val writeAggregator =
            context.actorOf(
              WriteAggregator
                .props(
                  key,
                  writeEnvelope,
                  writeDelta,
                  writeConsistency,
                  req,
                  selfUniqueAddress,
                  nodesForReadWrite(excludeExiting),
                  unreachable,
                  shuffle,
                  replyTo,
                  durable)
                .withDispatcher(context.props.dispatcher))
          if (durable) {
            durableStore ! Store(
              key.id,
              new DurableDataEnvelope(newEnvelope),
              Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), writeAggregator)))
          }
        }
      case Failure(e) =>
        log.debug("Received Update for key [{}], failed: {}", key, e.getMessage)
        replyTo ! ModifyFailure(key, "Update failed: " + e.getMessage, e, req)
    }
  }