override protected def outboundTransportSink()

in remote/src/main/scala/org/apache/pekko/remote/artery/tcp/ArteryTcpTransport.scala [125:244]


  override protected def outboundTransportSink(
      outboundContext: OutboundContext,
      streamId: Int,
      bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = {

    val host = outboundContext.remoteAddress.host.get
    val port = outboundContext.remoteAddress.port.get
    val remoteAddress = InetSocketAddress.createUnresolved(host, port)

    def connectionFlow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = {
      val localAddress = settings.Advanced.Tcp.OutboundClientHostname match {
        case None                 => None
        case Some(clientHostname) => Some(new InetSocketAddress(clientHostname, 0))
      }
      if (tlsEnabled) {
        val sslProvider = sslEngineProvider.get
        Tcp(system).outgoingConnectionWithTls(
          remoteAddress,
          createSSLEngine = () => sslProvider.createClientSSLEngine(host, port),
          localAddress,
          options = Nil,
          connectTimeout = settings.Advanced.Tcp.ConnectionTimeout,
          idleTimeout = Duration.Inf,
          verifySession = session => optionToTry(sslProvider.verifyClientSession(host, session)),
          closing = IgnoreComplete)
      } else {
        Tcp(system).outgoingConnection(
          remoteAddress,
          localAddress,
          halfClose = true, // issue https://github.com/akka/akka/issues/24392 if set to false
          connectTimeout = settings.Advanced.Tcp.ConnectionTimeout)
      }
    }

    def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = {
      val restartCount = new AtomicInteger(0)

      def logConnect(): Unit = {
        if (log.isDebugEnabled)
          log.debug(
            RemoteLogMarker.connect(
              outboundContext.remoteAddress,
              outboundContext.associationState.uniqueRemoteAddress().map(_.uid)),
            "Outbound connection opened to [{}]",
            outboundContext.remoteAddress)
      }

      def logDisconnected(): Unit = {
        if (log.isDebugEnabled)
          log.debug(
            RemoteLogMarker.disconnected(
              outboundContext.remoteAddress,
              outboundContext.associationState.uniqueRemoteAddress().map(_.uid)),
            "Outbound connection closed to [{}]",
            outboundContext.remoteAddress)
      }

      val flowFactory = () => {
        val onFailureLogLevel = if (restartCount.incrementAndGet() == 1) Logging.WarningLevel else Logging.DebugLevel

        def flow(controlIdleKillSwitch: OptionVal[SharedKillSwitch]) =
          Flow[ByteString]
            .via(Flow.lazyFlow(() => {
              // only open the actual connection if any new messages are sent
              logConnect()
              flightRecorder.tcpOutboundConnected(outboundContext.remoteAddress, streamName(streamId))
              if (controlIdleKillSwitch.isDefined)
                outboundContext.asInstanceOf[Association].setControlIdleKillSwitch(controlIdleKillSwitch)

              Flow[ByteString].prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))).via(connectionFlow)
            }))
            .mapError {
              case ArteryTransport.ShutdownSignal => ArteryTransport.ShutdownSignal
              case e =>
                logDisconnected()
                e
            }
            .recoverWithRetries(1, { case ArteryTransport.ShutdownSignal => Source.empty })
            .log(name = s"outbound connection to [${outboundContext.remoteAddress}], ${streamName(streamId)} stream")
            .addAttributes(Attributes.logLevels(onElement = LogLevels.Off, onFailure = onFailureLogLevel))

        if (streamId == ControlStreamId) {
          // must replace the KillSwitch when restarted
          val controlIdleKillSwitch = KillSwitches.shared("outboundControlStreamIdleKillSwitch")
          Flow[ByteString].via(controlIdleKillSwitch.flow).via(flow(OptionVal.Some(controlIdleKillSwitch)))
        } else {
          flow(OptionVal.None)
        }
      }

      val maxRestarts = if (streamId == ControlStreamId) Int.MaxValue else 3
      // Restart of inner connection part important in control stream, since system messages
      // are buffered and resent from the outer SystemMessageDelivery stage. No maxRestarts limit for control
      // stream. For message stream it's best effort retry a few times.
      RestartFlow
        .withBackoff[ByteString, ByteString](
          RestartSettings(settings.Advanced.OutboundRestartBackoff, settings.Advanced.OutboundRestartBackoff * 5, 0.1)
            .withMaxRestarts(maxRestarts, settings.Advanced.OutboundRestartBackoff))(flowFactory)
        // silence "Restarting graph due to failure" logging by RestartFlow
        .addAttributes(Attributes.logLevels(onFailure = LogLevels.Off))

    }

    Flow[EnvelopeBuffer]
      .map { env =>
        val size = env.byteBuffer.limit()
        flightRecorder.tcpOutboundSent(size)

        // TODO Possible performance improvement, could we reduce the copying of bytes?
        val bytes = ByteString(env.byteBuffer)
        bufferPool.release(env)

        TcpFraming.encodeFrameHeader(size) ++ bytes
      }
      .via(connectionFlowWithRestart)
      .map(_ =>
        throw new IllegalStateException(
          s"Unexpected incoming bytes in outbound connection to [${outboundContext.remoteAddress}]"))
      .toMat(Sink.ignore)(Keep.right)
  }