in iothub/device/src/Transport/AmqpIot/AmqpIotSession.cs [278:348]
private static async Task<AmqpIotReceivingLink> OpenReceivingAmqpLinkAsync(
IDeviceIdentity deviceIdentity,
AmqpSession amqpSession,
byte? senderSettleMode,
byte? receiverSettleMode,
string deviceTemplate,
string moduleTemplate,
string linkSuffix,
string correlationId,
CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
Logging.Enter(typeof(AmqpIotSession), deviceIdentity, $"{nameof(OpenReceivingAmqpLinkAsync)}");
uint prefetchCount = deviceIdentity.AmqpTransportSettings.PrefetchCount;
var amqpLinkSettings = new AmqpLinkSettings
{
LinkName = linkSuffix,
Role = true,
TotalLinkCredit = prefetchCount,
AutoSendFlow = prefetchCount > 0,
Source = new Source { Address = BuildLinkAddress(deviceIdentity, deviceTemplate, moduleTemplate) },
Target = new Target { Address = deviceIdentity.IotHubConnectionString.DeviceId },
SndSettleMode = senderSettleMode,
RcvSettleMode = receiverSettleMode,
};
amqpLinkSettings.AddProperty(AmqpIotConstants.ClientVersion, deviceIdentity.ProductInfo.ToString());
amqpLinkSettings.AddProperty(AmqpIotConstants.ApiVersion, ClientApiVersionHelper.ApiVersionString);
if (!string.IsNullOrWhiteSpace(deviceIdentity.AmqpTransportSettings.AuthenticationChain))
{
amqpLinkSettings.AddProperty(AmqpIotConstants.AuthChain, deviceIdentity.AmqpTransportSettings.AuthenticationChain);
}
if (correlationId != null)
{
amqpLinkSettings.AddProperty(AmqpIotConstants.ChannelCorrelationId, correlationId);
}
try
{
var receivingLink = new ReceivingAmqpLink(amqpLinkSettings);
receivingLink.AttachTo(amqpSession);
await receivingLink.OpenAsync(cancellationToken).ConfigureAwait(false);
return new AmqpIotReceivingLink(receivingLink);
}
catch (Exception e) when (!e.IsFatal())
{
Exception ex = AmqpIotExceptionAdapter.ConvertToIotHubException(e, amqpSession);
if (ReferenceEquals(e, ex))
{
throw;
}
else
{
if (ex is AmqpIotResourceException)
{
amqpSession.SafeClose();
throw new IotHubCommunicationException(ex.Message, ex);
}
throw ex;
}
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(typeof(AmqpIotSession), deviceIdentity, $"{nameof(OpenReceivingAmqpLinkAsync)}");
}
}