public override async Task RegisterAsync()

in provisioning/transport/amqp/src/ProvisioningTransportHandlerAmqp.cs [75:208]


        public override async Task<DeviceRegistrationResult> RegisterAsync(
            ProvisioningTransportRegisterMessage message,
            CancellationToken cancellationToken)
        {
            if (Logging.IsEnabled)
                Logging.Enter(this, $"{nameof(ProvisioningTransportHandlerAmqp)}.{nameof(RegisterAsync)}");

            if (message == null)
            {
                throw new ArgumentNullException(nameof(message));
            }

            // We need to create a LinkedTokenSource to include both the default timeout and the cancellation token
            // AMQP library started supporting CancellationToken starting from version 2.5.5
            // To preserve current behavior, we will honor both the legacy timeout and the cancellation token parameter.
            using var timeoutTokenSource = new CancellationTokenSource(s_timeoutConstant);
            using var cancellationTokenSourceBundle = CancellationTokenSource.CreateLinkedTokenSource(timeoutTokenSource.Token, cancellationToken);

            CancellationToken bundleCancellationToken = cancellationTokenSourceBundle.Token;

            bundleCancellationToken.ThrowIfCancellationRequested();

            try
            {
                AmqpAuthStrategy authStrategy;

                if (message.Security is SecurityProviderTpm tpm)
                {
                    authStrategy = new AmqpAuthStrategyTpm(tpm);
                }
                else if (message.Security is SecurityProviderX509 x509)
                {
                    authStrategy = new AmqpAuthStrategyX509(x509);
                }
                else if (message.Security is SecurityProviderSymmetricKey key)
                {
                    authStrategy = new AmqpAuthStrategySymmetricKey(key);
                }
                else
                {
                    throw new NotSupportedException(
                        $"{nameof(message.Security)} must be of type {nameof(SecurityProviderTpm)}, " +
                        $"{nameof(SecurityProviderX509)} or {nameof(SecurityProviderSymmetricKey)}");
                }

                if (Logging.IsEnabled)
                    Logging.Associate(authStrategy, this);

                bool useWebSocket = FallbackType == TransportFallbackType.WebSocketOnly;

                var builder = new UriBuilder
                {
                    Scheme = useWebSocket ? WebSocketConstants.Scheme : AmqpConstants.SchemeAmqps,
                    Host = message.GlobalDeviceEndpoint,
                    Port = Port,
                };

                string registrationId = message.Security.GetRegistrationID();
                string linkEndpoint = $"{message.IdScope}/registrations/{registrationId}";

                using AmqpClientConnection connection = authStrategy.CreateConnection(builder.Uri, message.IdScope);

                await authStrategy.OpenConnectionAsync(connection, useWebSocket, Proxy, RemoteCertificateValidationCallback, bundleCancellationToken).ConfigureAwait(false);
                bundleCancellationToken.ThrowIfCancellationRequested();

                await CreateLinksAsync(
                        connection,
                        linkEndpoint,
                        message.ProductInfo,
                        bundleCancellationToken)
                    .ConfigureAwait(false);

                bundleCancellationToken.ThrowIfCancellationRequested();

                string correlationId = Guid.NewGuid().ToString();
                DeviceRegistration deviceRegistration = (message.Payload != null && message.Payload.Length > 0)
                    ? new DeviceRegistration { Payload = new JRaw(message.Payload) }
                    : null;

                RegistrationOperationStatus operation = await RegisterDeviceAsync(connection, correlationId, deviceRegistration, bundleCancellationToken).ConfigureAwait(false);

                // Poll with operationId until registration complete.
                int attempts = 0;
                string operationId = operation.OperationId;

                // Poll with operationId until registration complete.
                while (string.CompareOrdinal(operation.Status, RegistrationOperationStatus.OperationStatusAssigning) == 0
                    || string.CompareOrdinal(operation.Status, RegistrationOperationStatus.OperationStatusUnassigned) == 0)
                {
                    bundleCancellationToken.ThrowIfCancellationRequested();

                    await Task.Delay(
                        operation.RetryAfter ?? RetryJitter.GenerateDelayWithJitterForRetry(s_defaultOperationPollingInterval),
                        bundleCancellationToken).ConfigureAwait(false);

                    try
                    {
                        operation = await OperationStatusLookupAsync(
                            connection,
                            operationId,
                            correlationId,
                            bundleCancellationToken)
                        .ConfigureAwait(false);
                    }
                    catch (ProvisioningTransportException e) when (e.ErrorDetails is ProvisioningErrorDetailsAmqp amqp && e.IsTransient)
                    {
                        operation.RetryAfter = amqp.RetryAfter;
                    }

                    attempts++;
                }

                if (string.CompareOrdinal(operation.Status, RegistrationOperationStatus.OperationStatusAssigned) == 0)
                {
                    authStrategy.SaveCredentials(operation);
                }

                await connection.CloseAsync(bundleCancellationToken).ConfigureAwait(false);

                return ConvertToProvisioningRegistrationResult(operation.RegistrationState);
            }
            catch (Exception ex) when (!(ex is ProvisioningTransportException))
            {
                if (Logging.IsEnabled)
                    Logging.Error(this, $"{nameof(ProvisioningTransportHandlerAmqp)} threw exception {ex}", nameof(RegisterAsync));

                throw new ProvisioningTransportException($"AMQP transport exception", ex, true);
            }
            finally
            {
                if (Logging.IsEnabled)
                    Logging.Exit(this, $"{nameof(ProvisioningTransportHandlerAmqp)}.{nameof(RegisterAsync)}");
            }
        }