public IotHubTransportMessage receive()

in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/mqtt/MqttTwin.java [235:351]


    public IotHubTransportMessage receive()
    {
        synchronized (this.receivedMessagesLock)
        {
            IotHubTransportMessage message = null;

            Pair<String, MqttMessage> messagePair = this.receivedMessages.peek();

            if (messagePair != null)
            {
                String topic = messagePair.getKey();

                if (topic != null && topic.length() > 0)
                {
                    if (topic.length() > TWIN.length() && topic.startsWith(TWIN))
                    {
                        MqttMessage mqttMessage = messagePair.getValue();
                        byte[] data = mqttMessage.getPayload();

                        //remove this message from the queue as this is the correct handler
                        this.receivedMessages.poll();

                        if (topic.length() > RES.length() && topic.startsWith(RES))
                        {
                            // Tokenize on backslash
                            String[] topicTokens = topic.split(Pattern.quote("/"));
                            if (data != null && data.length > 0)
                            {
                                message = new IotHubTransportMessage(data, MessageType.DEVICE_TWIN);
                            }
                            else
                            {
                                // Case for $iothub/twin/res/{status}/?$rid={request id}
                                message = new IotHubTransportMessage(new byte[0], MessageType.DEVICE_TWIN); // empty body
                            }

                            message.setQualityOfService(mqttMessage.getQos());

                            message.setDeviceOperationType(DeviceOperations.DEVICE_OPERATION_UNKNOWN);

                            // Case for $iothub/twin/res/{status}/?$rid={request id}&$version={new version}
                            if (topicTokens.length > STATUS_TOKEN)
                            {
                                message.setStatus(getStatus(topicTokens[STATUS_TOKEN]));
                            }
                            else
                            {
                                log.warn("Message received without status");
                            }

                            if (topicTokens.length > REQID_TOKEN)
                            {
                                String requestId = getRequestId(topicTokens[REQID_TOKEN]);
                                // MQTT does not have the concept of correlationId for request/response handling but it does have a requestId
                                // To handle this we are setting the correlationId to the requestId to better handle correlation
                                // whether we use MQTT or AMQP.
                                message.setRequestId(requestId);
                                message.setCorrelationId(requestId);
                                if (requestMap.containsKey(requestId))
                                {
                                    switch (requestMap.remove(requestId))
                                    {
                                        case DEVICE_OPERATION_TWIN_GET_REQUEST:
                                            message.setDeviceOperationType(DeviceOperations.DEVICE_OPERATION_TWIN_GET_RESPONSE);
                                            break;
                                        case DEVICE_OPERATION_TWIN_UPDATE_REPORTED_PROPERTIES_REQUEST:
                                            message.setDeviceOperationType(DeviceOperations.DEVICE_OPERATION_TWIN_UPDATE_REPORTED_PROPERTIES_RESPONSE);
                                            break;
                                        default:
                                            message.setDeviceOperationType(DeviceOperations.DEVICE_OPERATION_UNKNOWN);
                                    }
                                }
                                else
                                {
                                    log.warn("Request ID cannot be null");
                                }
                            }

                            if (topicTokens.length > VERSION_TOKEN)
                            {
                                String version = getVersion(topicTokens[VERSION_TOKEN]);
                                if (version != null && !version.isEmpty())
                                {
                                    message.setVersion(Integer.parseInt(version));
                                }
                            }
                        }
                        else if (topic.length() > PATCH.length() && topic.startsWith(PATCH))
                        {
                            if (topic.startsWith(PATCH + BACKSLASH + PROPERTIES + BACKSLASH + DESIRED))
                            {
                                if (data != null)
                                {
                                    message = new IotHubTransportMessage(data, MessageType.DEVICE_TWIN);
                                    message.setDeviceOperationType(DeviceOperations.DEVICE_OPERATION_TWIN_SUBSCRIBE_DESIRED_PROPERTIES_RESPONSE);
                                    message.setQualityOfService(mqttMessage.getQos());
                                }

                                // Case for $iothub/twin/PATCH/properties/desired/?$version={new version}
                                // Tokenize on backslash
                                String[] topicTokens = topic.split(Pattern.quote("/"));
                                if (topicTokens.length > PATCH_VERSION_TOKEN)
                                {
                                    if (message != null)
                                    {
                                        message.setVersion(Integer.parseInt(getVersion(topicTokens[PATCH_VERSION_TOKEN])));
                                    }
                                }
                            }
                        }
                    }
                }
            }

            return message;
        }
    }