private def attachOutboundStreamRestart()

in remote/src/main/scala/org/apache/pekko/remote/artery/Association.scala [945:1062]


  private def attachOutboundStreamRestart(
      streamName: String,
      queueIndex: Int,
      queueCapacity: Int,
      streamCompleted: Future[Done],
      restart: () => Unit): Unit = {

    def lazyRestart(): Unit = {
      flightRecorder.transportRestartOutbound(remoteAddress, streamName)
      outboundCompressionAccess = Vector.empty
      if (queueIndex == ControlQueueIndex) {
        materializing = new CountDownLatch(1)
        _outboundControlIngress = OptionVal.None
      }
      // LazyQueueWrapper will invoke the `restart` function when first message is offered
      val wrappedRestartFun: () => Unit = () => {
        restart()
      }

      if (!isRemovedAfterQuarantined())
        queues(queueIndex) = LazyQueueWrapper(createQueue(queueCapacity, queueIndex), wrappedRestartFun)

      queuesVisibility = true // volatile write for visibility of the queues array
    }

    implicit val ec = materializer.executionContext
    streamCompleted.foreach { _ =>
      if (transport.isShutdown || isRemovedAfterQuarantined()) {
        // shutdown as expected
        // countDown the latch in case threads are waiting on the latch in outboundControlIngress method
        materializing.countDown()
      } else {
        log.debug("{} to [{}] was completed. It will be restarted if used again.", streamName, remoteAddress)
        lazyRestart()
      }
    }
    streamCompleted.failed.foreach {
      case ArteryTransport.ShutdownSignal =>
        // shutdown as expected
        cancelAllTimers()
        // countDown the latch in case threads are waiting on the latch in outboundControlIngress method
        materializing.countDown()
      case cause if transport.isShutdown || isRemovedAfterQuarantined() =>
        // don't restart after shutdown, but log some details so we notice
        // for the TCP transport the ShutdownSignal is "converted" to StreamTcpException
        if (!cause.isInstanceOf[StreamTcpException])
          log.warning(
            s"{} to [{}] failed after shutdown. {}: {}",
            streamName,
            remoteAddress,
            cause.getClass.getName,
            cause.getMessage)
        cancelAllTimers()
        // countDown the latch in case threads are waiting on the latch in outboundControlIngress method
        materializing.countDown()
      case _: AeronTerminated =>
        // shutdown already in progress
        cancelAllTimers()
      case _: AbruptTerminationException =>
        // ActorSystem shutdown
        cancelAllTimers()
      case cause =>
        // it might have been stopped as expected due to idle or quarantine
        // for the TCP transport the exception is "converted" to StreamTcpException
        val stoppedIdle = cause == OutboundStreamStopIdleSignal ||
          getStopReason(queueIndex).contains(OutboundStreamStopIdleSignal)
        val stoppedQuarantined = cause == OutboundStreamStopQuarantinedSignal ||
          getStopReason(queueIndex).contains(OutboundStreamStopQuarantinedSignal)

        // for some cases restart unconditionally, without counting restarts
        val bypassRestartCounter = cause match {
          case _: GaveUpMessageException => true
          case _                         => stoppedIdle || stoppedQuarantined
        }

        if (queueIndex == ControlQueueIndex && !stoppedQuarantined) {
          cause match {
            case _: HandshakeTimeoutException => // ok, quarantine not possible without UID
            case _                            =>
              // Must quarantine in case all system messages haven't been delivered.
              // See also comment in the stoppedIdle case below
              quarantine(s"Outbound control stream restarted. $cause")
          }
        }

        def isConnectException: Boolean =
          cause.isInstanceOf[StreamTcpException] && cause.getCause != null && cause.getCause
            .isInstanceOf[ConnectException]

        if (stoppedIdle) {
          log.debug("{} to [{}] was idle and stopped. It will be restarted if used again.", streamName, remoteAddress)
          lazyRestart()
        } else if (stoppedQuarantined) {
          log.debug(
            "{} to [{}] was quarantined and stopped. It will be restarted if used again.",
            streamName,
            remoteAddress)
          lazyRestart()
        } else if (bypassRestartCounter || restartCounter.restart()) {
          // ConnectException may happen repeatedly and are already logged in connectionFlowWithRestart
          if (isConnectException)
            log.debug("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause)
          else
            log.warning("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause)
          lazyRestart()
        } else {
          log.error(
            cause,
            s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}",
            streamName,
            remoteAddress,
            advancedSettings.OutboundMaxRestarts,
            advancedSettings.OutboundRestartTimeout.toSeconds)
          cancelAllTimers()
          transport.system.terminate()
        }
    }
  }