in remote/src/main/scala/org/apache/pekko/remote/artery/SystemMessageDelivery.scala [96:323]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler with ControlMessageObserver with StageLogging {
private var replyObserverAttached = false
private var seqNo = 0L // sequence number for the first message will be 1
private var incarnation = outboundContext.associationState.incarnation
private val unacknowledged = new ArrayDeque[OutboundEnvelope]
private var resending = new ArrayDeque[OutboundEnvelope]
private var stopping = false
private val giveUpAfterNanos = outboundContext.settings.Advanced.GiveUpSystemMessageAfter.toNanos
private var ackTimestamp = System.nanoTime()
private def localAddress = outboundContext.localAddress
private def remoteAddress = outboundContext.remoteAddress
private def remoteAddressLogParam: String =
outboundContext.associationState.uniqueRemoteAddress().getOrElse(remoteAddress).toString
override protected def logSource: Class[_] = classOf[SystemMessageDelivery]
override def preStart(): Unit = {
val callback = getAsyncCallback[Done] { _ =>
replyObserverAttached = true
if (isAvailable(out))
pull(in) // onPull from downstream already called
}
outboundContext.controlSubject.attach(this).foreach(callback.invoke)(ExecutionContexts.parasitic)
}
override def postStop(): Unit = {
val pendingCount = unacknowledged.size
sendUnacknowledgedToDeadLetters()
unacknowledged.clear()
if (pendingCount > 0)
outboundContext.quarantine(s"SystemMessageDelivery stopped with [$pendingCount] pending system messages.")
outboundContext.controlSubject.detach(this)
}
override def onUpstreamFinish(): Unit = {
if (unacknowledged.isEmpty)
super.onUpstreamFinish()
else
stopping = true
}
override protected def onTimer(timerKey: Any): Unit =
timerKey match {
case ResendTick =>
checkGiveUp()
if (resending.isEmpty && !unacknowledged.isEmpty) {
resending = unacknowledged.clone()
tryResend()
}
if (!unacknowledged.isEmpty)
scheduleOnce(ResendTick, resendInterval)
case other =>
throw new IllegalArgumentException(s"Unknown timer key: $other")
}
// ControlMessageObserver, external call
override def notify(inboundEnvelope: InboundEnvelope): Unit = {
inboundEnvelope.message match {
case ack: Ack => if (ack.from.address == remoteAddress) ackCallback.invoke(ack)
case nack: Nack => if (nack.from.address == remoteAddress) nackCallback.invoke(nack)
case _ => // not interested
}
}
// ControlMessageObserver, external call but on graph logic machinery thread (getAsyncCallback safe)
override def controlSubjectCompleted(signal: Try[Done]): Unit = {
getAsyncCallback[Try[Done]] {
case Success(_) => completeStage()
case Failure(cause) => failStage(cause)
}.invoke(signal)
}
private val ackCallback = getAsyncCallback[Ack] { reply =>
ack(reply.seqNo)
}
private val nackCallback = getAsyncCallback[Nack] { reply =>
if (reply.seqNo <= seqNo) {
ack(reply.seqNo)
log.warning(
"Received negative acknowledgement of system message from [{}], highest acknowledged [{}]",
outboundContext.remoteAddress,
reply.seqNo)
// Nack should be very rare (connection issue) so no urgency of resending, it will be resent
// by the scheduled tick.
}
}
private def ack(n: Long): Unit = {
ackTimestamp = System.nanoTime()
if (n <= seqNo)
clearUnacknowledged(n)
}
@tailrec private def clearUnacknowledged(ackedSeqNo: Long): Unit = {
if (!unacknowledged.isEmpty &&
unacknowledged.peek().message.asInstanceOf[SystemMessageEnvelope].seqNo <= ackedSeqNo) {
unacknowledged.removeFirst()
if (unacknowledged.isEmpty)
cancelTimer(resendInterval)
if (stopping && unacknowledged.isEmpty)
completeStage()
else
clearUnacknowledged(ackedSeqNo)
}
}
private def tryResend(): Unit = {
if (isAvailable(out) && !resending.isEmpty) {
val env = resending.poll()
if (log.isDebugEnabled) {
env.message match {
case SystemMessageEnvelope(msg, n, _) =>
log.debug("Resending system message [{}] [{}]", Logging.simpleName(msg), n)
case _ =>
log.debug("Resending control message [{}]", Logging.simpleName(env.message))
}
}
if (incarnation != outboundContext.associationState.incarnation) {
log.debug("Noticed new incarnation of [{}] from tryResend, clear state", remoteAddressLogParam)
clear()
}
pushCopy(env)
}
}
// important to not send the buffered instance, since it's mutable
private def pushCopy(outboundEnvelope: OutboundEnvelope): Unit = {
push(out, outboundEnvelope.copy())
}
// InHandler
override def onPush(): Unit = {
val outboundEnvelope = grab(in)
outboundEnvelope.message match {
case msg @ (_: SystemMessage | _: AckedDeliveryMessage) =>
if (unacknowledged.size < maxBufferSize) {
if (seqNo == 0) {
incarnation = outboundContext.associationState.incarnation
} else if (incarnation != outboundContext.associationState.incarnation) {
log.debug("Noticed new incarnation of [{}] from onPush, clear state", remoteAddressLogParam)
clear()
}
seqNo += 1
if (unacknowledged.isEmpty)
ackTimestamp = System.nanoTime()
else
checkGiveUp()
val sendEnvelope = outboundEnvelope.withMessage(SystemMessageEnvelope(msg, seqNo, localAddress))
unacknowledged.offer(sendEnvelope)
scheduleOnce(ResendTick, resendInterval)
if (resending.isEmpty && isAvailable(out))
pushCopy(sendEnvelope)
else {
resending.offer(sendEnvelope)
tryResend()
}
} else {
// buffer overflow
outboundContext.quarantine(reason = s"System message delivery buffer overflow, size [$maxBufferSize]")
deadLetters ! outboundEnvelope
pull(in)
}
case _: HandshakeReq =>
// pass on HandshakeReq
if (isAvailable(out))
pushCopy(outboundEnvelope)
case ClearSystemMessageDelivery(i) =>
if (i <= incarnation) {
log.debug("Clear system message delivery of [{}]", remoteAddressLogParam)
clear()
}
pull(in)
case _ =>
// e.g. ActorSystemTerminating or ActorSelectionMessage with PriorityMessage, no need for acked delivery
if (resending.isEmpty && isAvailable(out))
push(out, outboundEnvelope)
else {
resending.offer(outboundEnvelope)
tryResend()
}
}
}
private def checkGiveUp(): Unit = {
if (!unacknowledged.isEmpty && (System.nanoTime() - ackTimestamp > giveUpAfterNanos))
throw new GaveUpSystemMessageException(
s"Gave up sending system message to [${outboundContext.remoteAddress}] after " +
s"${outboundContext.settings.Advanced.GiveUpSystemMessageAfter.pretty}.")
}
private def clear(): Unit = {
sendUnacknowledgedToDeadLetters()
seqNo = 0L // sequence number for the first message will be 1
incarnation = outboundContext.associationState.incarnation
unacknowledged.clear()
resending.clear()
cancelTimer(resendInterval)
}
private def sendUnacknowledgedToDeadLetters(): Unit = {
val iter = unacknowledged.iterator
while (iter.hasNext()) {
deadLetters ! iter.next()
}
}
// OutHandler
override def onPull(): Unit = {
if (replyObserverAttached) { // otherwise it will be pulled after attached
if (resending.isEmpty && !hasBeenPulled(in) && !stopping)
pull(in)
else
tryResend()
}
}
setHandlers(in, out, this)
}