def send()

in remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala [346:458]


  def send(message: Any, sender: OptionVal[ActorRef], recipient: OptionVal[RemoteActorRef]): Unit = {

    def createOutboundEnvelope(): OutboundEnvelope =
      outboundEnvelopePool.acquire().init(recipient, message.asInstanceOf[AnyRef], sender)

    // volatile read to see latest queue array
    @nowarn("msg=never used")
    val unused = queuesVisibility

    def dropped(queueIndex: Int, qSize: Int, env: OutboundEnvelope): Unit = {
      val removed = isRemovedAfterQuarantined()
      if (removed) recipient match {
        case OptionVal.Some(ref) => ref.cachedAssociation = null // don't use this Association instance any more
        case _                   =>
      }
      val reason =
        if (removed) "Due to removed unused quarantined association"
        else s"Due to overflow of send queue, size [$qSize]"
      transport.system.eventStream
        .publish(Dropped(message, reason, env.sender.getOrElse(ActorRef.noSender), recipient.getOrElse(deadletters)))

      flightRecorder.transportSendQueueOverflow(queueIndex)
      deadletters ! env
    }

    def shouldSendUnwatch(): Boolean =
      !transport.provider.settings.HasCluster || !transport.system.isTerminating()

    def shouldSendDeathWatchNotification(d: DeathWatchNotification): Boolean =
      d.addressTerminated || !transport.provider.settings.HasCluster || !transport.system.isTerminating()

    def sendSystemMessage(outboundEnvelope: OutboundEnvelope): Unit = {
      outboundEnvelope.message match {
        case u: Unwatch if shouldSendUnwatch() =>
          log.debug(
            "Not sending Unwatch of {} to {} because it will be notified when this member " +
            "has been removed from Cluster.",
            u.watcher,
            u.watchee)
        case d: DeathWatchNotification if !shouldSendDeathWatchNotification(d) =>
          log.debug(
            "Not sending DeathWatchNotification of {} to {} because it will be notified when this member " +
            "has been removed from Cluster.",
            d.actor,
            outboundEnvelope.recipient.getOrElse("unknown"))
        case _ =>
          if (!controlQueue.offer(outboundEnvelope)) {
            quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
            dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
          }
      }
    }

    val state = associationState
    val quarantined = state.isQuarantined()
    val messageIsClearSystemMessageDelivery = message.isInstanceOf[ClearSystemMessageDelivery]

    // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system
    if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || messageIsClearSystemMessageDelivery) {
      if (quarantined && !messageIsClearSystemMessageDelivery) {
        log.debug(
          "Quarantine piercing attempt with message [{}] to [{}]",
          Logging.messageClassName(message),
          recipient.getOrElse(""))
        setupStopQuarantinedTimer()
      }
      try {
        val outboundEnvelope = createOutboundEnvelope()
        message match {
          case d: DeathWatchNotification if deathWatchNotificationFlushEnabled && shouldSendDeathWatchNotification(d) =>
            val flushingPromise = Promise[Done]()
            log.debug("Delaying death watch notification until flush has been sent. {}", d)
            transport.system.systemActorOf(
              FlushBeforeDeathWatchNotification
                .props(flushingPromise, settings.Advanced.DeathWatchNotificationFlushTimeout, this)
                .withDispatcher(Dispatchers.InternalDispatcherId),
              FlushBeforeDeathWatchNotification.nextName())
            flushingPromise.future.onComplete { _ =>
              log.debug("Sending death watch notification as flush is complete. {}", d)
              sendSystemMessage(outboundEnvelope)
            }(materializer.executionContext)
          case _: SystemMessage =>
            sendSystemMessage(outboundEnvelope)
          case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | _: ClearSystemMessageDelivery =>
            // ActorSelectionMessage with PriorityMessage is used by cluster and remote failure detector heartbeating
            if (!controlQueue.offer(outboundEnvelope)) {
              dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
            }
          case _: DaemonMsgCreate =>
            // DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because
            // remote deployment process depends on message ordering for DaemonMsgCreate and Watch messages.
            // First ordinary message may arrive earlier but then the resolve in the Decoder is retried
            // so that the first message can be delivered after the remote actor has been created.
            if (!controlQueue.offer(outboundEnvelope))
              dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
          case _ =>
            val queueIndex = selectQueue(recipient)
            val queue = queues(queueIndex)
            val offerOk = queue.offer(outboundEnvelope)
            if (!offerOk)
              dropped(queueIndex, queueSize, outboundEnvelope)
        }
      } catch {
        case ShuttingDown => // silence it
      }
    } else if (log.isDebugEnabled)
      log.debug(
        "Dropping message [{}] from [{}] to [{}] due to quarantined system [{}]",
        Logging.messageClassName(message),
        sender.getOrElse(deadletters),
        recipient.getOrElse(recipient),
        remoteAddress)
  }