in iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs [326:404]
private async Task ConnectAsync(IChannelHandlerContext context)
{
if (Logging.IsEnabled)
Logging.Enter(this, context.Name, nameof(ConnectAsync));
try
{
string id = string.IsNullOrWhiteSpace(_moduleId) ? _deviceId : $"{_deviceId}/{_moduleId}";
string password = null;
if (_passwordProvider != null)
{
password = await _passwordProvider.GetPasswordAsync().ConfigureAwait(true);
}
else
{
Debug.Assert(_mqttTransportSettings.ClientCertificate != null);
}
string usernameString = $"{_iotHubHostName}/{id}/?{ClientApiVersionHelper.ApiVersionQueryStringLatest}&{DeviceClientTypeParam}={Uri.EscapeDataString(_productInfo.ToString())}";
if (!string.IsNullOrWhiteSpace(_mqttTransportSettings.AuthenticationChain))
{
usernameString += $"&{AuthChainParam}={Uri.EscapeDataString(_mqttTransportSettings.AuthenticationChain)}";
}
// This check is added to enable the device or module client to available plug and play features. For devices or modules that pass in the model Id,
// the SDK will enable plug and play features by appending the model Id to the MQTT CONNECT packet (in the username).
if (!string.IsNullOrWhiteSpace(_options?.ModelId))
{
usernameString += $"&{ModelIdParam}={Uri.EscapeDataString(_options.ModelId)}";
}
if (Logging.IsEnabled)
Logging.Info(this, $"{nameof(usernameString)}={usernameString}", nameof(ConnectAsync));
var connectPacket = new ConnectPacket
{
ClientId = id,
HasUsername = true,
Username = usernameString,
HasPassword = !string.IsNullOrEmpty(password),
Password = password,
KeepAliveInSeconds = _mqttTransportSettings.KeepAliveInSeconds,
CleanSession = _mqttTransportSettings.CleanSession,
HasWill = _mqttTransportSettings.HasWill
};
if (connectPacket.HasWill)
{
Message message = _willMessage.Message;
QualityOfService publishToServerQoS = _mqttTransportSettings.PublishToServerQoS;
string topicName = GetTelemetryTopicName();
PublishPacket will = await ComposePublishPacketAsync(context, message, publishToServerQoS, topicName).ConfigureAwait(true);
connectPacket.WillMessage = will.Payload;
connectPacket.WillQualityOfService = _willMessage.QoS;
connectPacket.WillRetain = false;
connectPacket.WillTopicName = will.TopicName;
}
_stateFlags = StateFlags.Connecting;
ScheduleCheckConnectTimeoutAsync(context);
await WriteMessageAsync(context, connectPacket, s_shutdownOnWriteErrorHandler).ConfigureAwait(true);
_lastChannelActivityTime = DateTime.UtcNow;
ScheduleKeepConnectionAliveAsync(context);
}
catch (Exception ex) when (!ex.IsFatal())
{
if (Logging.IsEnabled)
Logging.Error(this, $"A non-fatal exception occurred while opening the MQTT connection, will shut down: {ex}", nameof(ConnectAsync));
ShutdownOnErrorAsync(context, ex);
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, context.Name, nameof(ConnectAsync));
}
}