in remote/src/main/scala/org/apache/pekko/remote/artery/Handshake.scala [77:225]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
import OutboundHandshake._
private var handshakeState: HandshakeState = Start
private var pendingMessage: OptionVal[OutboundEnvelope] = OptionVal.None
private var injectHandshakeTickScheduled = false
private val uniqueRemoteAddressAsyncCallback = getAsyncCallback[UniqueAddress] { _ =>
if (handshakeState != Completed) {
handshakeCompleted()
if (isAvailable(out))
pull(in)
}
}
// this must be a `val` because function equality is used when removing in postStop
private val uniqueRemoteAddressListener: UniqueAddress => Unit =
peer => uniqueRemoteAddressAsyncCallback.invoke(peer)
override protected def logSource: Class[_] = classOf[OutboundHandshake]
override def preStart(): Unit = {
scheduleOnce(HandshakeTimeout, timeout)
livenessProbeInterval match {
case d: FiniteDuration => scheduleWithFixedDelay(LivenessProbeTick, d, d)
case _ => // only used in control stream
}
}
override def postStop(): Unit = {
outboundContext.associationState.removeUniqueRemoteAddressListener(uniqueRemoteAddressListener)
super.postStop()
}
// InHandler
override def onPush(): Unit = {
if (handshakeState != Completed)
throw new IllegalStateException(s"onPush before handshake completed, was [$handshakeState].")
// inject a HandshakeReq once in a while to trigger a new handshake when destination
// system has been restarted
if (injectHandshakeTickScheduled) {
// out is always available here, except for if a liveness HandshakeReq was just pushed
if (isAvailable(out))
push(out, grab(in))
else {
if (pendingMessage.isDefined)
throw new IllegalStateException(s"pendingMessage expected to be empty")
pendingMessage = OptionVal.Some(grab(in))
}
} else {
pushHandshakeReq()
pendingMessage = OptionVal.Some(grab(in))
}
}
// OutHandler
override def onPull(): Unit = {
handshakeState match {
case Completed =>
pendingMessage match {
case OptionVal.Some(p) =>
push(out, p)
pendingMessage = OptionVal.None
case _ =>
if (!hasBeenPulled(in))
pull(in)
}
case Start =>
outboundContext.associationState.uniqueRemoteAddress() match {
case Some(_) =>
handshakeCompleted()
case None =>
// will pull when handshake reply is received (uniqueRemoteAddress populated)
handshakeState = ReqInProgress
scheduleWithFixedDelay(HandshakeRetryTick, retryInterval, retryInterval)
// The InboundHandshake stage will complete the AssociationState.uniqueRemoteAddress
// when it receives the HandshakeRsp reply
outboundContext.associationState.addUniqueRemoteAddressListener(uniqueRemoteAddressListener)
}
// always push a HandshakeReq as the first message
pushHandshakeReq()
case ReqInProgress => // will pull when handshake reply is received
}
}
private def pushHandshakeReq(): Unit = {
injectHandshakeTickScheduled = true
scheduleOnce(InjectHandshakeTick, injectHandshakeInterval)
outboundContext.associationState.lastUsedTimestamp.set(System.nanoTime())
if (isAvailable(out))
push(out, createHandshakeReqEnvelope())
}
private def pushLivenessProbeReq(): Unit = {
// The associationState.lastUsedTimestamp will be updated when the HandshakeRsp is received
// and that is the confirmation that the other system is alive, and will not be quarantined
// by the quarantine-idle-outbound-after even though no real messages have been sent.
if (handshakeState == Completed && isAvailable(out) && pendingMessage.isEmpty) {
val lastUsedDuration = (System.nanoTime() - outboundContext.associationState.lastUsedTimestamp.get()).nanos
if (lastUsedDuration >= livenessProbeInterval) {
log.info(
"Association to [{}] has been idle for [{}] seconds, sending HandshakeReq to validate liveness",
outboundContext.remoteAddress,
lastUsedDuration.toSeconds)
push(out, createHandshakeReqEnvelope())
}
}
}
private def createHandshakeReqEnvelope(): OutboundEnvelope = {
outboundEnvelopePool
.acquire()
.init(
recipient = OptionVal.None,
message = HandshakeReq(outboundContext.localAddress, outboundContext.remoteAddress),
sender = OptionVal.None)
}
private def handshakeCompleted(): Unit = {
handshakeState = Completed
cancelTimer(HandshakeRetryTick)
cancelTimer(HandshakeTimeout)
}
override protected def onTimer(timerKey: Any): Unit =
timerKey match {
case InjectHandshakeTick =>
// next onPush message will trigger sending of HandshakeReq
injectHandshakeTickScheduled = false
case LivenessProbeTick =>
pushLivenessProbeReq()
case HandshakeRetryTick =>
if (isAvailable(out))
pushHandshakeReq()
case HandshakeTimeout =>
failStage(
new HandshakeTimeoutException(
s"Handshake with [${outboundContext.remoteAddress}] did not complete within ${timeout.toMillis} ms"))
case unknown =>
throw new IllegalArgumentException(s"Unknown timer key: $unknown")
}
setHandlers(in, out, this)
}