in provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs [75:208]
public override async Task<DeviceRegistrationResult> RegisterAsync(
ProvisioningTransportRegisterMessage message,
CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
Logging.Enter(this, $"{nameof(ProvisioningTransportHandlerAmqp)}.{nameof(RegisterAsync)}");
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
// We need to create a LinkedTokenSource to include both the default timeout and the cancellation token
// AMQP library started supporting CancellationToken starting from version 2.5.5
// To preserve current behavior, we will honor both the legacy timeout and the cancellation token parameter.
using var timeoutTokenSource = new CancellationTokenSource(s_timeoutConstant);
using var cancellationTokenSourceBundle = CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSource.Token, cancellationToken);
CancellationToken bundleCancellationToken = cancellationTokenSourceBundle.Token;
bundleCancellationToken.ThrowIfCancellationRequested();
try
{
AmqpAuthStrategy authStrategy;
if (message.Security is SecurityProviderTpm tpm)
{
authStrategy = new AmqpAuthStrategyTpm(tpm);
}
else if (message.Security is SecurityProviderX509 x509)
{
authStrategy = new AmqpAuthStrategyX509(x509);
}
else if (message.Security is SecurityProviderSymmetricKey key)
{
authStrategy = new AmqpAuthStrategySymmetricKey(key);
}
else
{
throw new NotSupportedException(
$"{nameof(message.Security)} must be of type {nameof(SecurityProviderTpm)}, " +
$"{nameof(SecurityProviderX509)} or {nameof(SecurityProviderSymmetricKey)}");
}
if (Logging.IsEnabled)
Logging.Associate(authStrategy, this);
bool useWebSocket = FallbackType == TransportFallbackType.WebSocketOnly;
var builder = new UriBuilder
{
Scheme = useWebSocket ? WebSocketConstants.Scheme : AmqpConstants.SchemeAmqps,
Host = message.GlobalDeviceEndpoint,
Port = Port,
};
string registrationId = message.Security.GetRegistrationID();
string linkEndpoint = $"{message.IdScope}/registrations/{registrationId}";
using AmqpClientConnection connection = authStrategy.CreateConnection(builder.Uri, message.IdScope);
await authStrategy.OpenConnectionAsync(connection, useWebSocket, Proxy, RemoteCertificateValidationCallback, bundleCancellationToken).ConfigureAwait(false);
bundleCancellationToken.ThrowIfCancellationRequested();
await CreateLinksAsync(
connection,
linkEndpoint,
message.ProductInfo,
bundleCancellationToken)
.ConfigureAwait(false);
bundleCancellationToken.ThrowIfCancellationRequested();
string correlationId = Guid.NewGuid().ToString();
DeviceRegistration deviceRegistration = (message.Payload != null && message.Payload.Length > 0)
? new DeviceRegistration { Payload = new JRaw(message.Payload) }
: null;
RegistrationOperationStatus operation = await RegisterDeviceAsync(connection, correlationId, deviceRegistration, bundleCancellationToken).ConfigureAwait(false);
// Poll with operationId until registration complete.
int attempts = 0;
string operationId = operation.OperationId;
// Poll with operationId until registration complete.
while (string.CompareOrdinal(operation.Status, RegistrationOperationStatus.OperationStatusAssigning) == 0
|| string.CompareOrdinal(operation.Status, RegistrationOperationStatus.OperationStatusUnassigned) == 0)
{
bundleCancellationToken.ThrowIfCancellationRequested();
await Task.Delay(
operation.RetryAfter ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPollingInterval),
bundleCancellationToken).ConfigureAwait(false);
try
{
operation = await OperationStatusLookupAsync(
connection,
operationId,
correlationId,
bundleCancellationToken)
.ConfigureAwait(false);
}
catch (ProvisioningTransportException e) when (e.ErrorDetails is ProvisioningErrorDetailsAmqp amqp && e.IsTransient)
{
operation.RetryAfter = amqp.RetryAfter;
}
attempts++;
}
if (string.CompareOrdinal(operation.Status, RegistrationOperationStatus.OperationStatusAssigned) == 0)
{
authStrategy.SaveCredentials(operation);
}
await connection.CloseAsync(bundleCancellationToken).ConfigureAwait(false);
return ConvertToProvisioningRegistrationResult(operation.RegistrationState);
}
catch (Exception ex) when (!(ex is ProvisioningTransportException))
{
if (Logging.IsEnabled)
Logging.Error(this, $"{nameof(ProvisioningTransportHandlerAmqp)} threw exception {ex}", nameof(RegisterAsync));
throw new ProvisioningTransportException($"AMQP transport exception", ex, true);
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, $"{nameof(ProvisioningTransportHandlerAmqp)}.{nameof(RegisterAsync)}");
}
}