override def createLogic()

in remote/src/main/scala/org/apache/pekko/remote/artery/Handshake.scala [77:225]


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
      import OutboundHandshake._

      private var handshakeState: HandshakeState = Start
      private var pendingMessage: OptionVal[OutboundEnvelope] = OptionVal.None
      private var injectHandshakeTickScheduled = false

      private val uniqueRemoteAddressAsyncCallback = getAsyncCallback[UniqueAddress] { _ =>
        if (handshakeState != Completed) {
          handshakeCompleted()
          if (isAvailable(out))
            pull(in)
        }
      }
      // this must be a `val` because function equality is used when removing in postStop
      private val uniqueRemoteAddressListener: UniqueAddress => Unit =
        peer => uniqueRemoteAddressAsyncCallback.invoke(peer)

      override protected def logSource: Class[_] = classOf[OutboundHandshake]

      override def preStart(): Unit = {
        scheduleOnce(HandshakeTimeout, timeout)
        livenessProbeInterval match {
          case d: FiniteDuration => scheduleWithFixedDelay(LivenessProbeTick, d, d)
          case _                 => // only used in control stream
        }
      }

      override def postStop(): Unit = {
        outboundContext.associationState.removeUniqueRemoteAddressListener(uniqueRemoteAddressListener)
        super.postStop()
      }

      // InHandler
      override def onPush(): Unit = {
        if (handshakeState != Completed)
          throw new IllegalStateException(s"onPush before handshake completed, was [$handshakeState].")

        // inject a HandshakeReq once in a while to trigger a new handshake when destination
        // system has been restarted
        if (injectHandshakeTickScheduled) {
          // out is always available here, except for if a liveness HandshakeReq was just pushed
          if (isAvailable(out))
            push(out, grab(in))
          else {
            if (pendingMessage.isDefined)
              throw new IllegalStateException(s"pendingMessage expected to be empty")
            pendingMessage = OptionVal.Some(grab(in))
          }
        } else {
          pushHandshakeReq()
          pendingMessage = OptionVal.Some(grab(in))
        }
      }

      // OutHandler
      override def onPull(): Unit = {
        handshakeState match {
          case Completed =>
            pendingMessage match {
              case OptionVal.Some(p) =>
                push(out, p)
                pendingMessage = OptionVal.None
              case _ =>
                if (!hasBeenPulled(in))
                  pull(in)
            }

          case Start =>
            outboundContext.associationState.uniqueRemoteAddress() match {
              case Some(_) =>
                handshakeCompleted()
              case None =>
                // will pull when handshake reply is received (uniqueRemoteAddress populated)
                handshakeState = ReqInProgress
                scheduleWithFixedDelay(HandshakeRetryTick, retryInterval, retryInterval)

                // The InboundHandshake stage will complete the AssociationState.uniqueRemoteAddress
                // when it receives the HandshakeRsp reply
                outboundContext.associationState.addUniqueRemoteAddressListener(uniqueRemoteAddressListener)
            }

            // always push a HandshakeReq as the first message
            pushHandshakeReq()

          case ReqInProgress => // will pull when handshake reply is received
        }
      }

      private def pushHandshakeReq(): Unit = {
        injectHandshakeTickScheduled = true
        scheduleOnce(InjectHandshakeTick, injectHandshakeInterval)
        outboundContext.associationState.lastUsedTimestamp.set(System.nanoTime())
        if (isAvailable(out))
          push(out, createHandshakeReqEnvelope())
      }

      private def pushLivenessProbeReq(): Unit = {
        // The associationState.lastUsedTimestamp will be updated when the HandshakeRsp is received
        // and that is the confirmation that the other system is alive, and will not be quarantined
        // by the quarantine-idle-outbound-after even though no real messages have been sent.
        if (handshakeState == Completed && isAvailable(out) && pendingMessage.isEmpty) {
          val lastUsedDuration = (System.nanoTime() - outboundContext.associationState.lastUsedTimestamp.get()).nanos
          if (lastUsedDuration >= livenessProbeInterval) {
            log.info(
              "Association to [{}] has been idle for [{}] seconds, sending HandshakeReq to validate liveness",
              outboundContext.remoteAddress,
              lastUsedDuration.toSeconds)
            push(out, createHandshakeReqEnvelope())
          }
        }
      }

      private def createHandshakeReqEnvelope(): OutboundEnvelope = {
        outboundEnvelopePool
          .acquire()
          .init(
            recipient = OptionVal.None,
            message = HandshakeReq(outboundContext.localAddress, outboundContext.remoteAddress),
            sender = OptionVal.None)
      }

      private def handshakeCompleted(): Unit = {
        handshakeState = Completed
        cancelTimer(HandshakeRetryTick)
        cancelTimer(HandshakeTimeout)
      }

      override protected def onTimer(timerKey: Any): Unit =
        timerKey match {
          case InjectHandshakeTick =>
            // next onPush message will trigger sending of HandshakeReq
            injectHandshakeTickScheduled = false
          case LivenessProbeTick =>
            pushLivenessProbeReq()
          case HandshakeRetryTick =>
            if (isAvailable(out))
              pushHandshakeReq()
          case HandshakeTimeout =>
            failStage(
              new HandshakeTimeoutException(
                s"Handshake with [${outboundContext.remoteAddress}] did not complete within ${timeout.toMillis} ms"))
          case unknown =>
            throw new IllegalArgumentException(s"Unknown timer key: $unknown")
        }

      setHandlers(in, out, this)
    }