def createLogicAndMaterializedValue()

in remote/src/main/scala/org/apache/pekko/remote/artery/Codecs.scala [380:638]


  def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, InboundCompressionAccess) = {
    val logic = new TimerGraphStageLogic(shape)
      with InboundCompressionAccessImpl
      with InHandler
      with OutHandler
      with StageLogging {
      import Decoder.RetryResolveRemoteDeployedRecipient

      override val compressions = inboundCompressions

      private val headerBuilder = HeaderBuilder.in(compressions)
      private val actorRefResolver: ActorRefResolveCacheWithAddress =
        new ActorRefResolveCacheWithAddress(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress)
      private val bannedRemoteDeployedActorRefs = new java.util.HashSet[String]

      private val retryResolveRemoteDeployedRecipientInterval = 50.millis
      private val retryResolveRemoteDeployedRecipientAttempts = 20

      // adaptive sampling when rate > 1000 msg/s
      private var messageCount = 0L
      private var heavyHitterMask = 0 // 0 => no sampling, otherwise power of two - 1
      private val adaptiveSamplingRateThreshold = 1000
      private var tickTimestamp = System.nanoTime()
      private var tickMessageCount = 0L

      override protected def logSource = classOf[Decoder]

      override def preStart(): Unit = {
        val tickDelay = 1.seconds
        scheduleWithFixedDelay(Tick, tickDelay, tickDelay)

        if (settings.Advanced.Compression.ActorRefs.Enabled) {
          val d = settings.Advanced.Compression.ActorRefs.AdvertisementInterval
          scheduleWithFixedDelay(AdvertiseActorRefsCompressionTable, d, d)
        }
        if (settings.Advanced.Compression.Manifests.Enabled) {
          val d = settings.Advanced.Compression.Manifests.AdvertisementInterval
          scheduleWithFixedDelay(AdvertiseClassManifestsCompressionTable, d, d)
        }
      }
      override def onPush(): Unit =
        try {
          messageCount += 1
          val envelope = grab(in)
          headerBuilder.resetMessageFields()
          envelope.parseHeader(headerBuilder)

          val originUid = headerBuilder.uid
          val association = inboundContext.association(originUid)

          val recipient: OptionVal[InternalActorRef] =
            try headerBuilder.recipientActorRef(originUid) match {
                case OptionVal.Some(ref) =>
                  OptionVal(ref.asInstanceOf[InternalActorRef])
                case OptionVal.None if headerBuilder.recipientActorRefPath.isDefined =>
                  resolveRecipient(headerBuilder.recipientActorRefPath.get)
                case _ =>
                  OptionVal.None
              }
            catch {
              case NonFatal(e) =>
                // probably version mismatch due to restarted system
                log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e)
                OptionVal.None
            }

          val sender: OptionVal[InternalActorRef] =
            try headerBuilder.senderActorRef(originUid) match {
                case OptionVal.Some(ref) =>
                  OptionVal(ref.asInstanceOf[InternalActorRef])
                case OptionVal.None if headerBuilder.senderActorRefPath.isDefined =>
                  OptionVal(actorRefResolver.resolve(headerBuilder.senderActorRefPath.get))
                case _ =>
                  OptionVal.None
              }
            catch {
              case NonFatal(e) =>
                // probably version mismatch due to restarted system
                log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e)
                OptionVal.None
            }

          val classManifestOpt =
            try headerBuilder.manifest(originUid)
            catch {
              case NonFatal(e) =>
                // probably version mismatch due to restarted system
                log.warning("Couldn't decompress manifest from originUid [{}]. {}", originUid, e)
                OptionVal.None
            }

          if ((recipient.isEmpty && headerBuilder.recipientActorRefPath.isEmpty && !headerBuilder.isNoRecipient) ||
            (sender.isEmpty && headerBuilder.senderActorRefPath.isEmpty && !headerBuilder.isNoSender)) {
            log.debug(
              "Dropping message for unknown recipient/sender. It was probably sent from system [{}] with compression " +
              "table [{}] built for previous incarnation of the destination system, or it was compressed with a table " +
              "that has already been discarded in the destination system.",
              originUid,
              headerBuilder.inboundActorRefCompressionTableVersion)
            pull(in)
          } else if (classManifestOpt.isEmpty) {
            log.debug(
              "Dropping message with unknown manifest. It was probably sent from system [{}] with compression " +
              "table [{}] built for previous incarnation of the destination system, or it was compressed with a table " +
              "that has already been discarded in the destination system.",
              originUid,
              headerBuilder.inboundActorRefCompressionTableVersion)
            pull(in)
          } else {
            val classManifest = classManifestOpt.get

            if ((messageCount & heavyHitterMask) == 0) {
              // --- hit refs and manifests for heavy-hitter counting
              association match {
                case OptionVal.Some(assoc) =>
                  val remoteAddress = assoc.remoteAddress
                  if (sender.isDefined)
                    compressions.hitActorRef(originUid, remoteAddress, sender.get, 1)

                  if (recipient.isDefined)
                    compressions.hitActorRef(originUid, remoteAddress, recipient.get, 1)

                  compressions.hitClassManifest(originUid, remoteAddress, classManifest, 1)
                case _ =>
                  // we don't want to record hits for compression while handshake is still in progress.
                  log.debug(
                    "Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?")
              }
              // --- end of hit refs and manifests for heavy-hitter counting
            }

            val decoded = inEnvelopePool
              .acquire()
              .init(
                recipient,
                sender,
                originUid,
                headerBuilder.serializer,
                classManifest,
                headerBuilder.flags,
                envelope,
                association,
                lane = 0)

            if (recipient.isEmpty && !headerBuilder.isNoRecipient) {
              // The remote deployed actor might not be created yet when resolving the
              // recipient for the first message that is sent to it, best effort retry.
              // However, if the retried resolve isn't successful the ref is banned and
              // we will not do the delayed retry resolve again. The reason for that is
              // if many messages are sent to such dead refs the resolve process will slow
              // down other messages.
              val recipientActorRefPath = headerBuilder.recipientActorRefPath.get
              if (bannedRemoteDeployedActorRefs.contains(recipientActorRefPath)) {

                headerBuilder.recipientActorRefPath match {
                  case OptionVal.Some(path) =>
                    val ref = actorRefResolver.getOrCompute(path)
                    if (ref.isInstanceOf[EmptyLocalActorRef])
                      log.warning(
                        "Message for banned (terminated, unresolved) remote deployed recipient [{}].",
                        recipientActorRefPath)
                    push(out, decoded.withRecipient(ref))
                  case _ =>
                    log.warning(
                      "Dropping message for banned (terminated, unresolved) remote deployed recipient [{}].",
                      recipientActorRefPath)
                    pull(in)
                }

              } else
                scheduleOnce(
                  RetryResolveRemoteDeployedRecipient(
                    retryResolveRemoteDeployedRecipientAttempts,
                    recipientActorRefPath,
                    decoded),
                  retryResolveRemoteDeployedRecipientInterval)
            } else {
              push(out, decoded)
            }
          }
        } catch {
          case NonFatal(e) =>
            log.warning("Dropping message due to: {}", e)
            pull(in)
        }

      private def resolveRecipient(path: String): OptionVal[InternalActorRef] = {
        actorRefResolver.getOrCompute(path) match {
          case empty: EmptyLocalActorRef =>
            val pathElements = empty.path.elements
            if (pathElements.nonEmpty && pathElements.head == "remote") OptionVal.None
            else OptionVal(empty)
          case ref => OptionVal(ref)
        }
      }

      override def onPull(): Unit = pull(in)

      override protected def onTimer(timerKey: Any): Unit = {
        timerKey match {
          case Tick =>
            val now = System.nanoTime()
            val d = math.max(1, now - tickTimestamp)
            val rate = (messageCount - tickMessageCount) * TimeUnit.SECONDS.toNanos(1) / d
            val oldHeavyHitterMask = heavyHitterMask
            heavyHitterMask =
              if (rate < adaptiveSamplingRateThreshold) 0 // no sampling
              else if (rate < adaptiveSamplingRateThreshold * 10) (1 << 6) - 1 // sample every 64nth message
              else if (rate < adaptiveSamplingRateThreshold * 100) (1 << 7) - 1 // sample every 128nth message
              else (1 << 8) - 1 // sample every 256nth message
            if (oldHeavyHitterMask > 0 && heavyHitterMask == 0)
              log.debug("Turning off adaptive sampling of compression hit counting")
            else if (oldHeavyHitterMask != heavyHitterMask)
              log.debug("Turning on adaptive sampling ({}nth message) of compression hit counting", heavyHitterMask + 1)
            tickMessageCount = messageCount
            tickTimestamp = now

          case AdvertiseActorRefsCompressionTable =>
            compressions
              .runNextActorRefAdvertisement() // TODO: optimise these operations, otherwise they stall the hotpath

          case AdvertiseClassManifestsCompressionTable =>
            compressions
              .runNextClassManifestAdvertisement() // TODO: optimise these operations, otherwise they stall the hotpath

          case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope) =>
            resolveRecipient(recipientPath) match {
              case OptionVal.Some(recipient) =>
                push(out, inboundEnvelope.withRecipient(recipient))
              case _ =>
                if (attemptsLeft > 0)
                  scheduleOnce(
                    RetryResolveRemoteDeployedRecipient(attemptsLeft - 1, recipientPath, inboundEnvelope),
                    retryResolveRemoteDeployedRecipientInterval)
                else {
                  // No more attempts left. If the retried resolve isn't successful the ref is banned and
                  // we will not do the delayed retry resolve again. The reason for that is
                  // if many messages are sent to such dead refs the resolve process will slow
                  // down other messages.
                  if (bannedRemoteDeployedActorRefs.size >= 100) {
                    // keep it bounded
                    bannedRemoteDeployedActorRefs.clear()
                  }
                  bannedRemoteDeployedActorRefs.add(recipientPath)

                  val recipient = actorRefResolver.getOrCompute(recipientPath)
                  push(out, inboundEnvelope.withRecipient(recipient))
                }
            }

          case unknown => throw new IllegalArgumentException(s"Unknown timer key: $unknown")
        }
      }

      setHandlers(in, out, this)
    }

    (logic, logic)
  }