in remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala [347:459]
def send(message: Any, sender: OptionVal[ActorRef], recipient: OptionVal[RemoteActorRef]): Unit = {
def createOutboundEnvelope(): OutboundEnvelope =
outboundEnvelopePool.acquire().init(recipient, message.asInstanceOf[AnyRef], sender)
// volatile read to see latest queue array
@nowarn("msg=never used")
val unused = queuesVisibility
def dropped(queueIndex: Int, qSize: Int, env: OutboundEnvelope): Unit = {
val removed = isRemovedAfterQuarantined()
if (removed) recipient match {
case OptionVal.Some(ref) => ref.cachedAssociation = null // don't use this Association instance any more
case _ =>
}
val reason =
if (removed) "Due to removed unused quarantined association"
else s"Due to overflow of send queue, size [$qSize]"
transport.system.eventStream
.publish(Dropped(message, reason, env.sender.getOrElse(ActorRef.noSender), recipient.getOrElse(deadletters)))
flightRecorder.transportSendQueueOverflow(queueIndex)
deadletters ! env
}
def shouldSendUnwatch(): Boolean =
!transport.provider.settings.HasCluster || !transport.system.isTerminating()
def shouldSendDeathWatchNotification(d: DeathWatchNotification): Boolean =
d.addressTerminated || !transport.provider.settings.HasCluster || !transport.system.isTerminating()
def sendSystemMessage(outboundEnvelope: OutboundEnvelope): Unit = {
outboundEnvelope.message match {
case u: Unwatch if shouldSendUnwatch() =>
log.debug(
"Not sending Unwatch of {} to {} because it will be notified when this member " +
"has been removed from Cluster.",
u.watcher,
u.watchee)
case d: DeathWatchNotification if !shouldSendDeathWatchNotification(d) =>
log.debug(
"Not sending DeathWatchNotification of {} to {} because it will be notified when this member " +
"has been removed from Cluster.",
d.actor,
outboundEnvelope.recipient.getOrElse("unknown"))
case _ =>
if (!controlQueue.offer(outboundEnvelope)) {
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
}
}
}
val state = associationState
val quarantined = state.isQuarantined()
val messageIsClearSystemMessageDelivery = message.isInstanceOf[ClearSystemMessageDelivery]
// allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system
if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || messageIsClearSystemMessageDelivery) {
if (quarantined && !messageIsClearSystemMessageDelivery) {
log.debug(
"Quarantine piercing attempt with message [{}] to [{}]",
Logging.messageClassName(message),
recipient.getOrElse(""))
setupStopQuarantinedTimer()
}
try {
val outboundEnvelope = createOutboundEnvelope()
message match {
case d: DeathWatchNotification if deathWatchNotificationFlushEnabled && shouldSendDeathWatchNotification(d) =>
val flushingPromise = Promise[Done]()
log.debug("Delaying death watch notification until flush has been sent. {}", d)
transport.system.systemActorOf(
FlushBeforeDeathWatchNotification
.props(flushingPromise, settings.Advanced.DeathWatchNotificationFlushTimeout, this)
.withDispatcher(Dispatchers.InternalDispatcherId),
FlushBeforeDeathWatchNotification.nextName())
flushingPromise.future.onComplete { _ =>
log.debug("Sending death watch notification as flush is complete. {}", d)
sendSystemMessage(outboundEnvelope)
}(materializer.executionContext)
case _: SystemMessage =>
sendSystemMessage(outboundEnvelope)
case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | _: ClearSystemMessageDelivery =>
// ActorSelectionMessage with PriorityMessage is used by cluster and remote failure detector heartbeating
if (!controlQueue.offer(outboundEnvelope)) {
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
}
case _: DaemonMsgCreate =>
// DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because
// remote deployment process depends on message ordering for DaemonMsgCreate and Watch messages.
// First ordinary message may arrive earlier but then the resolve in the Decoder is retried
// so that the first message can be delivered after the remote actor has been created.
if (!controlQueue.offer(outboundEnvelope))
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
case _ =>
val queueIndex = selectQueue(recipient)
val queue = queues(queueIndex)
val offerOk = queue.offer(outboundEnvelope)
if (!offerOk)
dropped(queueIndex, queueSize, outboundEnvelope)
}
} catch {
case ShuttingDown => // silence it
}
} else if (log.isDebugEnabled)
log.debug(
"Dropping message [{}] from [{}] to [{}] due to quarantined system [{}]",
Logging.messageClassName(message),
sender.getOrElse(deadletters),
recipient.getOrElse(recipient),
remoteAddress)
}