in provisioning/transport/mqtt/src/ProvisioningTransportHandlerMqtt.cs [233:314]
private async Task<RegistrationOperationStatus> ProvisionOverTcpCommonAsync(
ProvisioningTransportRegisterMessage message,
ClientTlsSettings tlsSettings,
CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<RegistrationOperationStatus>();
Func<Stream, SslStream> streamFactory = stream => new SslStream(stream, true, RemoteCertificateValidationCallback);
Bootstrap bootstrap = new Bootstrap()
.Group(s_eventLoopGroup)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default)
.Handler(new ActionChannelInitializer<ISocketChannel>(ch =>
{
ch.Pipeline.AddLast(
new ReadTimeoutHandler(ReadTimeoutSeconds),
new TlsHandler(streamFactory, tlsSettings),
MqttEncoder.Instance,
new MqttDecoder(isServer: false, maxMessageSize: MaxMessageSize),
new LoggingHandler(LogLevel.DEBUG),
new ProvisioningChannelHandlerAdapter(message, tcs, cancellationToken));
}));
if (Logging.IsEnabled)
Logging.Associate(bootstrap, this);
#if NET6_0_OR_GREATER
IPAddress[] addresses = await Dns.GetHostAddressesAsync(message.GlobalDeviceEndpoint, cancellationToken).ConfigureAwait(false);
#else
IPAddress[] addresses = await Dns.GetHostAddressesAsync(message.GlobalDeviceEndpoint).ConfigureAwait(false);
#endif
if (Logging.IsEnabled)
Logging.Info(this, $"DNS resolved {addresses.Length} addresses.");
IChannel channel = null;
Exception lastException = null;
foreach (IPAddress address in addresses)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
if (Logging.IsEnabled)
Logging.Info(this, $"Connecting to {address}.");
channel = await bootstrap.ConnectAsync(address, Port).ConfigureAwait(false);
break;
}
catch (AggregateException ae)
{
ae.Handle((ex) =>
{
if (ex is ConnectException) // We will handle DotNetty.Transport.Channels.ConnectException
{
lastException = ex;
if (Logging.IsEnabled)
Logging.Info(this, $"ConnectException trying to connect to {address}: {ex}");
return true;
}
return false; // Let anything else stop the application.
});
}
}
if (channel == null)
{
string errorMessage = "Cannot connect to Provisioning Service.";
if (Logging.IsEnabled)
Logging.Error(this, errorMessage);
ExceptionDispatchInfo.Capture(lastException).Throw();
}
return await tcs.Task.ConfigureAwait(false);
}