in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnection.java [940:1008]
private void sendQueuedMessages()
{
int messagesAttemptedToBeProcessed = 0;
Message message = messagesToSend.poll();
while (message != null && messagesAttemptedToBeProcessed < MAX_MESSAGES_TO_SEND_PER_CALLBACK)
{
messagesAttemptedToBeProcessed++;
SendResult sendResult = sendQueuedMessage(message);
// If no active device sessions are responsible for sending messages for the device that the transport message belongs to,
// then either the device session is reconnecting and the message should be requeued, or the device session
// was unregistered by the user and the message should report that it failed to send before the device session was unregistered.
if (sendResult == SendResult.WRONG_DEVICE)
{
AmqpsSessionHandler reconnectingDeviceSessionHandler = this.reconnectingDeviceSessionHandlers.get(message.getConnectionDeviceId());
if (reconnectingDeviceSessionHandler != null)
{
log.trace("Amqp message failed to send because its AMQP session is currently reconnecting. Adding it back to messages to send queue ({})", message);
TransportException transportException = new TransportException("Amqp message failed to send because its AMQP session is currently reconnecting");
transportException.setRetryable(true);
this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
}
else
{
TransportException transportException = new TransportException("Message failed to send because it belonged to a device that was unregistered from the AMQP connetion");
transportException.setRetryable(false);
this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
}
}
else if (sendResult == SendResult.DUPLICATE_SUBSCRIPTION_MESSAGE)
{
// No need to send a twin/method subscription message if you are already subscribed or if the subscription is in progress
log.trace("Attempted to send subscription message while the subscription was already in progress. Discarding the message ({})", message);
}
else if (sendResult == SendResult.SUBSCRIPTION_IN_PROGRESS)
{
// Proton-j doesn't handle the scenario of sending twin/method messages on links that haven't been opened remotely yet, so hold
// off on sending them until the subscription has finished.
log.trace("Attempted to send twin/method message while the twin/method subscription was in progress. Adding it back to messages to send queue to try again after the subscription has finished ({})", message);
TransportException transportException = new TransportException("Subscription in progress needs to be completed before this message can be sent");
transportException.setRetryable(true);
this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
}
else if (sendResult == SendResult.LINKS_NOT_OPEN)
{
// Shouldn't happen. If it does, it signals that we have a bug in this SDK.
log.warn("Failed to send a message because its AMQP links were not open yet. Adding it back to messages to send queue ({})", message);
TransportException transportException = new TransportException("Amqp links not open for this message");
transportException.setRetryable(true);
this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
}
else if (sendResult == SendResult.UNKNOWN_FAILURE)
{
// Shouldn't happen. If it does, it signals that we have a bug in this SDK.
log.warn("Unknown failure occurred while attempting to send. Adding it back to messages to send queue ({})", message);
TransportException transportException = new TransportException("Unknown failure");
transportException.setRetryable(true);
this.listener.onMessageSent(message, message.getConnectionDeviceId(), transportException);
}
message = messagesToSend.poll();
}
if (message != null)
{
//message was polled out of list, but loop exited from processing too many messages before it could process this message, so re-queue it for later
messagesToSend.add(message);
}
}