in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsSessionHandler.java [350:436]
SendResult sendMessage(Message message)
{
if (!this.clientConfiguration.getDeviceId().equals(message.getConnectionDeviceId()))
{
// This should never happen since this session handler was chosen from a map of device Id -> session handler
// so it should have the same device Id as in the map it was grabbed from.
log.warn("Failed to send the message because this session belongs to a different device");
return SendResult.WRONG_DEVICE;
}
MessageType messageType = message.getMessageType();
if (messageType == null)
{
// Twin and method messages have a message type assigned to them when they are constructed by this SDK
// (users can't construct twin/method messages directly), but telemetry messages don't necessarily have this
// type assigned since users may create telemetry messages. By default, assume any messages with an
// unassigned type are telemetry messages.
messageType = DEVICE_TELEMETRY;
}
// Check if the message being sent is a subscription change message. If so, open the corresponding links.
if (message instanceof IotHubTransportMessage)
{
IotHubTransportMessage transportMessage = (IotHubTransportMessage) message;
DeviceOperations subscriptionType = transportMessage.getDeviceOperationType();
if (subscriptionType == DEVICE_OPERATION_METHOD_SUBSCRIBE_REQUEST)
{
return handleMethodSubscriptionRequest(transportMessage);
}
if (subscriptionType == DEVICE_OPERATION_TWIN_SUBSCRIBE_DESIRED_PROPERTIES_REQUEST)
{
return handleTwinSubscriptionRequest(transportMessage);
}
}
AmqpsSenderLinkHandler senderLinkHandler = this.senderLinkHandlers.get(messageType);
if (senderLinkHandler == null)
{
// no sender link handler saved for this message type, so it can't be sent
// Should never happen since telemetry links are always opened, and twin/method messages can't be sent
// before their respective subscription messages have already opened their links.
return SendResult.LINKS_NOT_OPEN;
}
if (messageType == DEVICE_TWIN)
{
if (explicitInProgressTwinSubscriptionMessage != null)
{
// When this variable is not null, it means there is a subscription on twin in progress. These are initiated
// by the user when they call startTwin.
//
// Don't send any twin messages while a twin subscription is in progress. Wait until the subscription
// has been acknowledged by the service before sending it.
return SendResult.SUBSCRIPTION_IN_PROGRESS;
}
for (SubscriptionType subscriptionType : this.implicitInProgressSubscriptionMessages.values())
{
if (subscriptionType == SubscriptionType.DESIRED_PROPERTIES_SUBSCRIPTION)
{
// When there is at least one desired properties subscription in the implicitInProgressSubscriptionMessages
// value set, then that means there is a desired properties subscription message that has been sent
// to the service, but has not been acknowledged yet. These implicit subscriptions happen when a
// session loses connectivity temporarily, and the session handler sends out subscription messages
// to the service to re-establish all subscritptions that were active prior to the disconnection.
//
// Don't send any twin messages while a twin subscription is in progress. Reject this message until
// the subscription has been acknowledged by the service. The connection layer will requeue this message
// and it will have another chance to send when the timer task that checks for outgoing queued messages
// executes again.
return SendResult.SUBSCRIPTION_IN_PROGRESS;
}
}
}
AmqpsSendResult amqpsSendResult = senderLinkHandler.sendMessageAndGetDeliveryTag(message);
if (amqpsSendResult.isDeliverySuccessful())
{
return SendResult.SUCCESS;
}
return SendResult.UNKNOWN_FAILURE;
}