override def createLogic()

in remote/src/main/scala/org/apache/pekko/remote/artery/SystemMessageDelivery.scala [96:323]


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogic(shape) with InHandler with OutHandler with ControlMessageObserver with StageLogging {

      private var replyObserverAttached = false
      private var seqNo = 0L // sequence number for the first message will be 1
      private var incarnation = outboundContext.associationState.incarnation
      private val unacknowledged = new ArrayDeque[OutboundEnvelope]
      private var resending = new ArrayDeque[OutboundEnvelope]
      private var stopping = false

      private val giveUpAfterNanos = outboundContext.settings.Advanced.GiveUpSystemMessageAfter.toNanos
      private var ackTimestamp = System.nanoTime()

      private def localAddress = outboundContext.localAddress
      private def remoteAddress = outboundContext.remoteAddress
      private def remoteAddressLogParam: String =
        outboundContext.associationState.uniqueRemoteAddress().getOrElse(remoteAddress).toString

      override protected def logSource: Class[_] = classOf[SystemMessageDelivery]

      override def preStart(): Unit = {
        val callback = getAsyncCallback[Done] { _ =>
          replyObserverAttached = true
          if (isAvailable(out))
            pull(in) // onPull from downstream already called
        }
        outboundContext.controlSubject.attach(this).foreach(callback.invoke)(ExecutionContexts.parasitic)
      }

      override def postStop(): Unit = {
        val pendingCount = unacknowledged.size
        sendUnacknowledgedToDeadLetters()
        unacknowledged.clear()
        if (pendingCount > 0)
          outboundContext.quarantine(s"SystemMessageDelivery stopped with [$pendingCount] pending system messages.")
        outboundContext.controlSubject.detach(this)
      }

      override def onUpstreamFinish(): Unit = {
        if (unacknowledged.isEmpty)
          super.onUpstreamFinish()
        else
          stopping = true
      }

      override protected def onTimer(timerKey: Any): Unit =
        timerKey match {
          case ResendTick =>
            checkGiveUp()
            if (resending.isEmpty && !unacknowledged.isEmpty) {
              resending = unacknowledged.clone()
              tryResend()
            }
            if (!unacknowledged.isEmpty)
              scheduleOnce(ResendTick, resendInterval)

          case other =>
            throw new IllegalArgumentException(s"Unknown timer key: $other")
        }

      // ControlMessageObserver, external call
      override def notify(inboundEnvelope: InboundEnvelope): Unit = {
        inboundEnvelope.message match {
          case ack: Ack   => if (ack.from.address == remoteAddress) ackCallback.invoke(ack)
          case nack: Nack => if (nack.from.address == remoteAddress) nackCallback.invoke(nack)
          case _          => // not interested
        }
      }

      // ControlMessageObserver, external call but on graph logic machinery thread (getAsyncCallback safe)
      override def controlSubjectCompleted(signal: Try[Done]): Unit = {
        getAsyncCallback[Try[Done]] {
          case Success(_)     => completeStage()
          case Failure(cause) => failStage(cause)
        }.invoke(signal)
      }

      private val ackCallback = getAsyncCallback[Ack] { reply =>
        ack(reply.seqNo)
      }

      private val nackCallback = getAsyncCallback[Nack] { reply =>
        if (reply.seqNo <= seqNo) {
          ack(reply.seqNo)
          log.warning(
            "Received negative acknowledgement of system message from [{}], highest acknowledged [{}]",
            outboundContext.remoteAddress,
            reply.seqNo)
          // Nack should be very rare (connection issue) so no urgency of resending, it will be resent
          // by the scheduled tick.
        }
      }

      private def ack(n: Long): Unit = {
        ackTimestamp = System.nanoTime()
        if (n <= seqNo)
          clearUnacknowledged(n)
      }

      @tailrec private def clearUnacknowledged(ackedSeqNo: Long): Unit = {
        if (!unacknowledged.isEmpty &&
          unacknowledged.peek().message.asInstanceOf[SystemMessageEnvelope].seqNo <= ackedSeqNo) {
          unacknowledged.removeFirst()
          if (unacknowledged.isEmpty)
            cancelTimer(resendInterval)

          if (stopping && unacknowledged.isEmpty)
            completeStage()
          else
            clearUnacknowledged(ackedSeqNo)
        }
      }

      private def tryResend(): Unit = {
        if (isAvailable(out) && !resending.isEmpty) {
          val env = resending.poll()

          if (log.isDebugEnabled) {
            env.message match {
              case SystemMessageEnvelope(msg, n, _) =>
                log.debug("Resending system message [{}] [{}]", Logging.simpleName(msg), n)
              case _ =>
                log.debug("Resending control message [{}]", Logging.simpleName(env.message))
            }
          }

          if (incarnation != outboundContext.associationState.incarnation) {
            log.debug("Noticed new incarnation of [{}] from tryResend, clear state", remoteAddressLogParam)
            clear()
          }

          pushCopy(env)
        }
      }

      // important to not send the buffered instance, since it's mutable
      private def pushCopy(outboundEnvelope: OutboundEnvelope): Unit = {
        push(out, outboundEnvelope.copy())
      }

      // InHandler
      override def onPush(): Unit = {
        val outboundEnvelope = grab(in)
        outboundEnvelope.message match {
          case msg @ (_: SystemMessage | _: AckedDeliveryMessage) =>
            if (unacknowledged.size < maxBufferSize) {
              if (seqNo == 0) {
                incarnation = outboundContext.associationState.incarnation
              } else if (incarnation != outboundContext.associationState.incarnation) {
                log.debug("Noticed new incarnation of [{}] from onPush, clear state", remoteAddressLogParam)
                clear()
              }
              seqNo += 1
              if (unacknowledged.isEmpty)
                ackTimestamp = System.nanoTime()
              else
                checkGiveUp()
              val sendEnvelope = outboundEnvelope.withMessage(SystemMessageEnvelope(msg, seqNo, localAddress))
              unacknowledged.offer(sendEnvelope)
              scheduleOnce(ResendTick, resendInterval)
              if (resending.isEmpty && isAvailable(out))
                pushCopy(sendEnvelope)
              else {
                resending.offer(sendEnvelope)
                tryResend()
              }
            } else {
              // buffer overflow
              outboundContext.quarantine(reason = s"System message delivery buffer overflow, size [$maxBufferSize]")
              deadLetters ! outboundEnvelope
              pull(in)
            }
          case _: HandshakeReq =>
            // pass on HandshakeReq
            if (isAvailable(out))
              pushCopy(outboundEnvelope)
          case ClearSystemMessageDelivery(i) =>
            if (i <= incarnation) {
              log.debug("Clear system message delivery of [{}]", remoteAddressLogParam)
              clear()
            }
            pull(in)
          case _ =>
            // e.g. ActorSystemTerminating or ActorSelectionMessage with PriorityMessage, no need for acked delivery
            if (resending.isEmpty && isAvailable(out))
              push(out, outboundEnvelope)
            else {
              resending.offer(outboundEnvelope)
              tryResend()
            }
        }
      }

      private def checkGiveUp(): Unit = {
        if (!unacknowledged.isEmpty && (System.nanoTime() - ackTimestamp > giveUpAfterNanos))
          throw new GaveUpSystemMessageException(
            s"Gave up sending system message to [${outboundContext.remoteAddress}] after " +
            s"${outboundContext.settings.Advanced.GiveUpSystemMessageAfter.pretty}.")
      }

      private def clear(): Unit = {
        sendUnacknowledgedToDeadLetters()
        seqNo = 0L // sequence number for the first message will be 1
        incarnation = outboundContext.associationState.incarnation
        unacknowledged.clear()
        resending.clear()
        cancelTimer(resendInterval)
      }

      private def sendUnacknowledgedToDeadLetters(): Unit = {
        val iter = unacknowledged.iterator
        while (iter.hasNext()) {
          deadLetters ! iter.next()
        }
      }

      // OutHandler
      override def onPull(): Unit = {
        if (replyObserverAttached) { // otherwise it will be pulled after attached
          if (resending.isEmpty && !hasBeenPulled(in) && !stopping)
            pull(in)
          else
            tryResend()
        }
      }

      setHandlers(in, out, this)
    }