in iothub/service/src/Amqp/IotHubConnection.cs [253:351]
private async Task<AmqpSession> CreateSessionAsync(TimeSpan timeout)
{
Logging.Enter(this, timeout, nameof(CreateSessionAsync));
TransportBase transport = null;
try
{
var timeoutHelper = new TimeoutHelper(timeout);
_refreshTokenTimer.Cancel();
AmqpSettings amqpSettings = CreateAmqpSettings();
if (_useWebSocketOnly)
{
// Try only AMQP transport over WebSocket
transport = _clientWebSocketTransport = (ClientWebSocketTransport)await CreateClientWebSocketTransportAsync(timeoutHelper.RemainingTime())
.ConfigureAwait(false);
}
else
{
TlsTransportSettings tlsTransportSettings = CreateTlsTransportSettings();
var amqpTransportInitiator = new AmqpTransportInitiator(amqpSettings, tlsTransportSettings);
try
{
transport = await amqpTransportInitiator.ConnectTaskAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
}
catch (Exception e) when (!(e is AuthenticationException))
{
Logging.Error(this, e, nameof(CreateSessionAsync));
if (Fx.IsFatal(e))
{
throw;
}
// AMQP transport over TCP failed. Retry AMQP transport over WebSocket
if (timeoutHelper.RemainingTime() != TimeSpan.Zero)
{
transport = _clientWebSocketTransport = (ClientWebSocketTransport)await CreateClientWebSocketTransportAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
}
else
{
throw;
}
}
}
Logging.Info(this, $"Initialized {nameof(TransportBase)}, ws={_useWebSocketOnly}");
var amqpConnectionSettings = new AmqpConnectionSettings
{
MaxFrameSize = AmqpConstants.DefaultMaxFrameSize,
ContainerId = Guid.NewGuid().ToString("N", CultureInfo.InvariantCulture), // Use a human readable link name to help with debugging
HostName = Credential.AmqpEndpoint.Host,
};
var amqpConnection = new AmqpConnection(transport, amqpSettings, amqpConnectionSettings);
await amqpConnection.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
Logging.Info(this, $"{nameof(AmqpConnection)} opened.");
var sessionSettings = new AmqpSessionSettings
{
Properties = new Fields(),
};
try
{
AmqpSession amqpSession = amqpConnection.CreateSession(sessionSettings);
await amqpSession.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
Logging.Info(this, $"{nameof(AmqpSession)} opened.");
// This adds itself to amqpConnection.Extensions
var cbsLink = new AmqpCbsLink(amqpConnection);
await SendCbsTokenAsync(cbsLink, timeoutHelper.RemainingTime()).ConfigureAwait(false);
return amqpSession;
}
catch (Exception ex) when (!ex.IsFatal())
{
Logging.Error(this, ex, nameof(CreateSessionAsync));
_clientWebSocketTransport?.Dispose();
_clientWebSocketTransport = null;
if (amqpConnection.TerminalException != null)
{
throw AmqpClientHelper.ToIotHubClientContract(amqpConnection.TerminalException);
}
amqpConnection.SafeClose(ex);
throw;
}
}
finally
{
Logging.Exit(this, timeout, nameof(CreateSessionAsync));
}
}