in remote/src/main/scala/org/apache/pekko/remote/artery/tcp/ArteryTcpTransport.scala [125:244]
override protected def outboundTransportSink(
outboundContext: OutboundContext,
streamId: Int,
bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = {
val host = outboundContext.remoteAddress.host.get
val port = outboundContext.remoteAddress.port.get
val remoteAddress = InetSocketAddress.createUnresolved(host, port)
def connectionFlow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = {
val localAddress = settings.Advanced.Tcp.OutboundClientHostname match {
case None => None
case Some(clientHostname) => Some(new InetSocketAddress(clientHostname, 0))
}
if (tlsEnabled) {
val sslProvider = sslEngineProvider.get
Tcp(system).outgoingConnectionWithTls(
remoteAddress,
createSSLEngine = () => sslProvider.createClientSSLEngine(host, port),
localAddress,
options = Nil,
connectTimeout = settings.Advanced.Tcp.ConnectionTimeout,
idleTimeout = Duration.Inf,
verifySession = session => optionToTry(sslProvider.verifyClientSession(host, session)),
closing = IgnoreComplete)
} else {
Tcp(system).outgoingConnection(
remoteAddress,
localAddress,
halfClose = true, // issue https://github.com/akka/akka/issues/24392 if set to false
connectTimeout = settings.Advanced.Tcp.ConnectionTimeout)
}
}
def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = {
val restartCount = new AtomicInteger(0)
def logConnect(): Unit = {
if (log.isDebugEnabled)
log.debug(
RemoteLogMarker.connect(
outboundContext.remoteAddress,
outboundContext.associationState.uniqueRemoteAddress().map(_.uid)),
"Outbound connection opened to [{}]",
outboundContext.remoteAddress)
}
def logDisconnected(): Unit = {
if (log.isDebugEnabled)
log.debug(
RemoteLogMarker.disconnected(
outboundContext.remoteAddress,
outboundContext.associationState.uniqueRemoteAddress().map(_.uid)),
"Outbound connection closed to [{}]",
outboundContext.remoteAddress)
}
val flowFactory = () => {
val onFailureLogLevel = if (restartCount.incrementAndGet() == 1) Logging.WarningLevel else Logging.DebugLevel
def flow(controlIdleKillSwitch: OptionVal[SharedKillSwitch]) =
Flow[ByteString]
.via(Flow.lazyFlow(() => {
// only open the actual connection if any new messages are sent
logConnect()
flightRecorder.tcpOutboundConnected(outboundContext.remoteAddress, streamName(streamId))
if (controlIdleKillSwitch.isDefined)
outboundContext.asInstanceOf[Association].setControlIdleKillSwitch(controlIdleKillSwitch)
Flow[ByteString].prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))).via(connectionFlow)
}))
.mapError {
case ArteryTransport.ShutdownSignal => ArteryTransport.ShutdownSignal
case e =>
logDisconnected()
e
}
.recoverWithRetries(1, { case ArteryTransport.ShutdownSignal => Source.empty })
.log(name = s"outbound connection to [${outboundContext.remoteAddress}], ${streamName(streamId)} stream")
.addAttributes(Attributes.logLevels(onElement = LogLevels.Off, onFailure = onFailureLogLevel))
if (streamId == ControlStreamId) {
// must replace the KillSwitch when restarted
val controlIdleKillSwitch = KillSwitches.shared("outboundControlStreamIdleKillSwitch")
Flow[ByteString].via(controlIdleKillSwitch.flow).via(flow(OptionVal.Some(controlIdleKillSwitch)))
} else {
flow(OptionVal.None)
}
}
val maxRestarts = if (streamId == ControlStreamId) Int.MaxValue else 3
// Restart of inner connection part important in control stream, since system messages
// are buffered and resent from the outer SystemMessageDelivery stage. No maxRestarts limit for control
// stream. For message stream it's best effort retry a few times.
RestartFlow
.withBackoff[ByteString, ByteString](
RestartSettings(settings.Advanced.OutboundRestartBackoff, settings.Advanced.OutboundRestartBackoff * 5, 0.1)
.withMaxRestarts(maxRestarts, settings.Advanced.OutboundRestartBackoff))(flowFactory)
// silence "Restarting graph due to failure" logging by RestartFlow
.addAttributes(Attributes.logLevels(onFailure = LogLevels.Off))
}
Flow[EnvelopeBuffer]
.map { env =>
val size = env.byteBuffer.limit()
flightRecorder.tcpOutboundSent(size)
// TODO Possible performance improvement, could we reduce the copying of bytes?
val bytes = ByteString(env.byteBuffer)
bufferPool.release(env)
TcpFraming.encodeFrameHeader(size) ++ bytes
}
.via(connectionFlowWithRestart)
.map(_ =>
throw new IllegalStateException(
s"Unexpected incoming bytes in outbound connection to [${outboundContext.remoteAddress}]"))
.toMat(Sink.ignore)(Keep.right)
}