private void sendQueuedMessages()

in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnection.java [940:1008]


    private void sendQueuedMessages()
    {
        int messagesAttemptedToBeProcessed = 0;
        Message message = messagesToSend.poll();
        while (message != null && messagesAttemptedToBeProcessed < MAX_MESSAGES_TO_SEND_PER_CALLBACK)
        {
            messagesAttemptedToBeProcessed++;
            SendResult sendResult = sendQueuedMessage(message);

            // If no active device sessions are responsible for sending messages for the device that the transport message belongs to,
            // then either the device session is reconnecting and the message should be requeued, or the device session
            // was unregistered by the user and the message should report that it failed to send before the device session was unregistered.
            if (sendResult == SendResult.WRONG_DEVICE)
            {
                AmqpsSessionHandler reconnectingDeviceSessionHandler = this.reconnectingDeviceSessionHandlers.get(message.getConnectionDeviceId());
                if (reconnectingDeviceSessionHandler != null)
                {
                    log.trace("Amqp message failed to send because its AMQP session is currently reconnecting. Adding it back to messages to send queue ({})", message);
                    TransportException transportException = new TransportException("Amqp message failed to send because its AMQP session is currently reconnecting");
                    transportException.setRetryable(true);
                    this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
                }
                else
                {
                    TransportException transportException = new TransportException("Message failed to send because it belonged to a device that was unregistered from the AMQP connetion");
                    transportException.setRetryable(false);
                    this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
                }
            }
            else if (sendResult == SendResult.DUPLICATE_SUBSCRIPTION_MESSAGE)
            {
                // No need to send a twin/method subscription message if you are already subscribed or if the subscription is in progress
                log.trace("Attempted to send subscription message while the subscription was already in progress. Discarding the message ({})", message);
            }
            else if (sendResult == SendResult.SUBSCRIPTION_IN_PROGRESS)
            {
                // Proton-j doesn't handle the scenario of sending twin/method messages on links that haven't been opened remotely yet, so hold
                // off on sending them until the subscription has finished.
                log.trace("Attempted to send twin/method message while the twin/method subscription was in progress. Adding it back to messages to send queue to try again after the subscription has finished ({})", message);
                TransportException transportException = new TransportException("Subscription in progress needs to be completed before this message can be sent");
                transportException.setRetryable(true);
                this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
            }
            else if (sendResult == SendResult.LINKS_NOT_OPEN)
            {
                // Shouldn't happen. If it does, it signals that we have a bug in this SDK.
                log.warn("Failed to send a message because its AMQP links were not open yet. Adding it back to messages to send queue ({})", message);
                TransportException transportException = new TransportException("Amqp links not open for this message");
                transportException.setRetryable(true);
                this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
            }
            else if (sendResult == SendResult.UNKNOWN_FAILURE)
            {
                // Shouldn't happen. If it does, it signals that we have a bug in this SDK.
                log.warn("Unknown failure occurred while attempting to send. Adding it back to messages to send queue ({})", message);
                TransportException transportException = new TransportException("Unknown failure");
                transportException.setRetryable(true);
                this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
            }

            message = messagesToSend.poll();
        }

        if (message != null)
        {
            //message was polled out of list, but loop exited from processing too many messages before it could process this message, so re-queue it for later
            messagesToSend.add(message);
        }
    }