in remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala [946:1063]
private def attachOutboundStreamRestart(
streamName: String,
queueIndex: Int,
queueCapacity: Int,
streamCompleted: Future[Done],
restart: () => Unit): Unit = {
def lazyRestart(): Unit = {
flightRecorder.transportRestartOutbound(remoteAddress, streamName)
outboundCompressionAccess = Vector.empty
if (queueIndex == ControlQueueIndex) {
materializing = new CountDownLatch(1)
_outboundControlIngress = OptionVal.None
}
// LazyQueueWrapper will invoke the `restart` function when first message is offered
val wrappedRestartFun: () => Unit = () => {
restart()
}
if (!isRemovedAfterQuarantined())
queues(queueIndex) = LazyQueueWrapper(createQueue(queueCapacity, queueIndex), wrappedRestartFun)
queuesVisibility = true // volatile write for visibility of the queues array
}
implicit val ec = materializer.executionContext
streamCompleted.foreach { _ =>
if (transport.isShutdown || isRemovedAfterQuarantined()) {
// shutdown as expected
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
materializing.countDown()
} else {
log.debug("{} to [{}] was completed. It will be restarted if used again.", streamName, remoteAddress)
lazyRestart()
}
}
streamCompleted.failed.foreach {
case ArteryTransport.ShutdownSignal =>
// shutdown as expected
cancelAllTimers()
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
materializing.countDown()
case cause if transport.isShutdown || isRemovedAfterQuarantined() =>
// don't restart after shutdown, but log some details so we notice
// for the TCP transport the ShutdownSignal is "converted" to StreamTcpException
if (!cause.isInstanceOf[StreamTcpException])
log.warning(
s"{} to [{}] failed after shutdown. {}: {}",
streamName,
remoteAddress,
cause.getClass.getName,
cause.getMessage)
cancelAllTimers()
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
materializing.countDown()
case _: AeronTerminated =>
// shutdown already in progress
cancelAllTimers()
case _: AbruptTerminationException =>
// ActorSystem shutdown
cancelAllTimers()
case cause =>
// it might have been stopped as expected due to idle or quarantine
// for the TCP transport the exception is "converted" to StreamTcpException
val stoppedIdle = cause == OutboundStreamStopIdleSignal ||
getStopReason(queueIndex).contains(OutboundStreamStopIdleSignal)
val stoppedQuarantined = cause == OutboundStreamStopQuarantinedSignal ||
getStopReason(queueIndex).contains(OutboundStreamStopQuarantinedSignal)
// for some cases restart unconditionally, without counting restarts
val bypassRestartCounter = cause match {
case _: GaveUpMessageException => true
case _ => stoppedIdle || stoppedQuarantined
}
if (queueIndex == ControlQueueIndex && !stoppedQuarantined) {
cause match {
case _: HandshakeTimeoutException => // ok, quarantine not possible without UID
case _ =>
// Must quarantine in case all system messages haven't been delivered.
// See also comment in the stoppedIdle case below
quarantine(s"Outbound control stream restarted. $cause")
}
}
def isConnectException: Boolean =
cause.isInstanceOf[StreamTcpException] && cause.getCause != null && cause.getCause
.isInstanceOf[ConnectException]
if (stoppedIdle) {
log.debug("{} to [{}] was idle and stopped. It will be restarted if used again.", streamName, remoteAddress)
lazyRestart()
} else if (stoppedQuarantined) {
log.debug(
"{} to [{}] was quarantined and stopped. It will be restarted if used again.",
streamName,
remoteAddress)
lazyRestart()
} else if (bypassRestartCounter || restartCounter.restart()) {
// ConnectException may happen repeatedly and are already logged in connectionFlowWithRestart
if (isConnectException)
log.debug("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause)
else
log.warning("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause)
lazyRestart()
} else {
log.error(
cause,
s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}",
streamName,
remoteAddress,
advancedSettings.OutboundMaxRestarts,
advancedSettings.OutboundRestartTimeout.toSeconds)
cancelAllTimers()
transport.system.terminate()
}
}
}