in remote/src/main/scala/org/apache/pekko/remote/artery/Codecs.scala [380:638]
def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, InboundCompressionAccess) = {
val logic = new TimerGraphStageLogic(shape)
with InboundCompressionAccessImpl
with InHandler
with OutHandler
with StageLogging {
import Decoder.RetryResolveRemoteDeployedRecipient
override val compressions = inboundCompressions
private val headerBuilder = HeaderBuilder.in(compressions)
private val actorRefResolver: ActorRefResolveCacheWithAddress =
new ActorRefResolveCacheWithAddress(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress)
private val bannedRemoteDeployedActorRefs = new java.util.HashSet[String]
private val retryResolveRemoteDeployedRecipientInterval = 50.millis
private val retryResolveRemoteDeployedRecipientAttempts = 20
// adaptive sampling when rate > 1000 msg/s
private var messageCount = 0L
private var heavyHitterMask = 0 // 0 => no sampling, otherwise power of two - 1
private val adaptiveSamplingRateThreshold = 1000
private var tickTimestamp = System.nanoTime()
private var tickMessageCount = 0L
override protected def logSource = classOf[Decoder]
override def preStart(): Unit = {
val tickDelay = 1.seconds
scheduleWithFixedDelay(Tick, tickDelay, tickDelay)
if (settings.Advanced.Compression.ActorRefs.Enabled) {
val d = settings.Advanced.Compression.ActorRefs.AdvertisementInterval
scheduleWithFixedDelay(AdvertiseActorRefsCompressionTable, d, d)
}
if (settings.Advanced.Compression.Manifests.Enabled) {
val d = settings.Advanced.Compression.Manifests.AdvertisementInterval
scheduleWithFixedDelay(AdvertiseClassManifestsCompressionTable, d, d)
}
}
override def onPush(): Unit =
try {
messageCount += 1
val envelope = grab(in)
headerBuilder.resetMessageFields()
envelope.parseHeader(headerBuilder)
val originUid = headerBuilder.uid
val association = inboundContext.association(originUid)
val recipient: OptionVal[InternalActorRef] =
try headerBuilder.recipientActorRef(originUid) match {
case OptionVal.Some(ref) =>
OptionVal(ref.asInstanceOf[InternalActorRef])
case OptionVal.None if headerBuilder.recipientActorRefPath.isDefined =>
resolveRecipient(headerBuilder.recipientActorRefPath.get)
case _ =>
OptionVal.None
}
catch {
case NonFatal(e) =>
// probably version mismatch due to restarted system
log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e)
OptionVal.None
}
val sender: OptionVal[InternalActorRef] =
try headerBuilder.senderActorRef(originUid) match {
case OptionVal.Some(ref) =>
OptionVal(ref.asInstanceOf[InternalActorRef])
case OptionVal.None if headerBuilder.senderActorRefPath.isDefined =>
OptionVal(actorRefResolver.resolve(headerBuilder.senderActorRefPath.get))
case _ =>
OptionVal.None
}
catch {
case NonFatal(e) =>
// probably version mismatch due to restarted system
log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e)
OptionVal.None
}
val classManifestOpt =
try headerBuilder.manifest(originUid)
catch {
case NonFatal(e) =>
// probably version mismatch due to restarted system
log.warning("Couldn't decompress manifest from originUid [{}]. {}", originUid, e)
OptionVal.None
}
if ((recipient.isEmpty && headerBuilder.recipientActorRefPath.isEmpty && !headerBuilder.isNoRecipient) ||
(sender.isEmpty && headerBuilder.senderActorRefPath.isEmpty && !headerBuilder.isNoSender)) {
log.debug(
"Dropping message for unknown recipient/sender. It was probably sent from system [{}] with compression " +
"table [{}] built for previous incarnation of the destination system, or it was compressed with a table " +
"that has already been discarded in the destination system.",
originUid,
headerBuilder.inboundActorRefCompressionTableVersion)
pull(in)
} else if (classManifestOpt.isEmpty) {
log.debug(
"Dropping message with unknown manifest. It was probably sent from system [{}] with compression " +
"table [{}] built for previous incarnation of the destination system, or it was compressed with a table " +
"that has already been discarded in the destination system.",
originUid,
headerBuilder.inboundActorRefCompressionTableVersion)
pull(in)
} else {
val classManifest = classManifestOpt.get
if ((messageCount & heavyHitterMask) == 0) {
// --- hit refs and manifests for heavy-hitter counting
association match {
case OptionVal.Some(assoc) =>
val remoteAddress = assoc.remoteAddress
if (sender.isDefined)
compressions.hitActorRef(originUid, remoteAddress, sender.get, 1)
if (recipient.isDefined)
compressions.hitActorRef(originUid, remoteAddress, recipient.get, 1)
compressions.hitClassManifest(originUid, remoteAddress, classManifest, 1)
case _ =>
// we don't want to record hits for compression while handshake is still in progress.
log.debug(
"Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?")
}
// --- end of hit refs and manifests for heavy-hitter counting
}
val decoded = inEnvelopePool
.acquire()
.init(
recipient,
sender,
originUid,
headerBuilder.serializer,
classManifest,
headerBuilder.flags,
envelope,
association,
lane = 0)
if (recipient.isEmpty && !headerBuilder.isNoRecipient) {
// The remote deployed actor might not be created yet when resolving the
// recipient for the first message that is sent to it, best effort retry.
// However, if the retried resolve isn't successful the ref is banned and
// we will not do the delayed retry resolve again. The reason for that is
// if many messages are sent to such dead refs the resolve process will slow
// down other messages.
val recipientActorRefPath = headerBuilder.recipientActorRefPath.get
if (bannedRemoteDeployedActorRefs.contains(recipientActorRefPath)) {
headerBuilder.recipientActorRefPath match {
case OptionVal.Some(path) =>
val ref = actorRefResolver.getOrCompute(path)
if (ref.isInstanceOf[EmptyLocalActorRef])
log.warning(
"Message for banned (terminated, unresolved) remote deployed recipient [{}].",
recipientActorRefPath)
push(out, decoded.withRecipient(ref))
case _ =>
log.warning(
"Dropping message for banned (terminated, unresolved) remote deployed recipient [{}].",
recipientActorRefPath)
pull(in)
}
} else
scheduleOnce(
RetryResolveRemoteDeployedRecipient(
retryResolveRemoteDeployedRecipientAttempts,
recipientActorRefPath,
decoded),
retryResolveRemoteDeployedRecipientInterval)
} else {
push(out, decoded)
}
}
} catch {
case NonFatal(e) =>
log.warning("Dropping message due to: {}", e)
pull(in)
}
private def resolveRecipient(path: String): OptionVal[InternalActorRef] = {
actorRefResolver.getOrCompute(path) match {
case empty: EmptyLocalActorRef =>
val pathElements = empty.path.elements
if (pathElements.nonEmpty && pathElements.head == "remote") OptionVal.None
else OptionVal(empty)
case ref => OptionVal(ref)
}
}
override def onPull(): Unit = pull(in)
override protected def onTimer(timerKey: Any): Unit = {
timerKey match {
case Tick =>
val now = System.nanoTime()
val d = math.max(1, now - tickTimestamp)
val rate = (messageCount - tickMessageCount) * TimeUnit.SECONDS.toNanos(1) / d
val oldHeavyHitterMask = heavyHitterMask
heavyHitterMask =
if (rate < adaptiveSamplingRateThreshold) 0 // no sampling
else if (rate < adaptiveSamplingRateThreshold * 10) (1 << 6) - 1 // sample every 64nth message
else if (rate < adaptiveSamplingRateThreshold * 100) (1 << 7) - 1 // sample every 128nth message
else (1 << 8) - 1 // sample every 256nth message
if (oldHeavyHitterMask > 0 && heavyHitterMask == 0)
log.debug("Turning off adaptive sampling of compression hit counting")
else if (oldHeavyHitterMask != heavyHitterMask)
log.debug("Turning on adaptive sampling ({}nth message) of compression hit counting", heavyHitterMask + 1)
tickMessageCount = messageCount
tickTimestamp = now
case AdvertiseActorRefsCompressionTable =>
compressions
.runNextActorRefAdvertisement() // TODO: optimise these operations, otherwise they stall the hotpath
case AdvertiseClassManifestsCompressionTable =>
compressions
.runNextClassManifestAdvertisement() // TODO: optimise these operations, otherwise they stall the hotpath
case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope) =>
resolveRecipient(recipientPath) match {
case OptionVal.Some(recipient) =>
push(out, inboundEnvelope.withRecipient(recipient))
case _ =>
if (attemptsLeft > 0)
scheduleOnce(
RetryResolveRemoteDeployedRecipient(attemptsLeft - 1, recipientPath, inboundEnvelope),
retryResolveRemoteDeployedRecipientInterval)
else {
// No more attempts left. If the retried resolve isn't successful the ref is banned and
// we will not do the delayed retry resolve again. The reason for that is
// if many messages are sent to such dead refs the resolve process will slow
// down other messages.
if (bannedRemoteDeployedActorRefs.size >= 100) {
// keep it bounded
bannedRemoteDeployedActorRefs.clear()
}
bannedRemoteDeployedActorRefs.add(recipientPath)
val recipient = actorRefResolver.getOrCompute(recipientPath)
push(out, inboundEnvelope.withRecipient(recipient))
}
}
case unknown => throw new IllegalArgumentException(s"Unknown timer key: $unknown")
}
}
setHandlers(in, out, this)
}
(logic, logic)
}