in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/mqtt/MqttTwin.java [235:351]
public IotHubTransportMessage receive()
{
synchronized (this.receivedMessagesLock)
{
IotHubTransportMessage message = null;
Pair<String, MqttMessage> messagePair = this.receivedMessages.peek();
if (messagePair != null)
{
String topic = messagePair.getKey();
if (topic != null && topic.length() > 0)
{
if (topic.length() > TWIN.length() && topic.startsWith(TWIN))
{
MqttMessage mqttMessage = messagePair.getValue();
byte[] data = mqttMessage.getPayload();
//remove this message from the queue as this is the correct handler
this.receivedMessages.poll();
if (topic.length() > RES.length() && topic.startsWith(RES))
{
// Tokenize on backslash
String[] topicTokens = topic.split(Pattern.quote("/"));
if (data != null && data.length > 0)
{
message = new IotHubTransportMessage(data, MessageType.DEVICE_TWIN);
}
else
{
// Case for $iothub/twin/res/{status}/?$rid={request id}
message = new IotHubTransportMessage(new byte[0], MessageType.DEVICE_TWIN); // empty body
}
message.setQualityOfService(mqttMessage.getQos());
message.setDeviceOperationType(DeviceOperations.DEVICE_OPERATION_UNKNOWN);
// Case for $iothub/twin/res/{status}/?$rid={request id}&$version={new version}
if (topicTokens.length > STATUS_TOKEN)
{
message.setStatus(getStatus(topicTokens[STATUS_TOKEN]));
}
else
{
log.warn("Message received without status");
}
if (topicTokens.length > REQID_TOKEN)
{
String requestId = getRequestId(topicTokens[REQID_TOKEN]);
// MQTT does not have the concept of correlationId for request/response handling but it does have a requestId
// To handle this we are setting the correlationId to the requestId to better handle correlation
// whether we use MQTT or AMQP.
message.setRequestId(requestId);
message.setCorrelationId(requestId);
if (requestMap.containsKey(requestId))
{
switch (requestMap.remove(requestId))
{
case DEVICE_OPERATION_TWIN_GET_REQUEST:
message.setDeviceOperationType(DeviceOperations.DEVICE_OPERATION_TWIN_GET_RESPONSE);
break;
case DEVICE_OPERATION_TWIN_UPDATE_REPORTED_PROPERTIES_REQUEST:
message.setDeviceOperationType(DeviceOperations.DEVICE_OPERATION_TWIN_UPDATE_REPORTED_PROPERTIES_RESPONSE);
break;
default:
message.setDeviceOperationType(DeviceOperations.DEVICE_OPERATION_UNKNOWN);
}
}
else
{
log.warn("Request ID cannot be null");
}
}
if (topicTokens.length > VERSION_TOKEN)
{
String version = getVersion(topicTokens[VERSION_TOKEN]);
if (version != null && !version.isEmpty())
{
message.setVersion(Integer.parseInt(version));
}
}
}
else if (topic.length() > PATCH.length() && topic.startsWith(PATCH))
{
if (topic.startsWith(PATCH + BACKSLASH + PROPERTIES + BACKSLASH + DESIRED))
{
if (data != null)
{
message = new IotHubTransportMessage(data, MessageType.DEVICE_TWIN);
message.setDeviceOperationType(DeviceOperations.DEVICE_OPERATION_TWIN_SUBSCRIBE_DESIRED_PROPERTIES_RESPONSE);
message.setQualityOfService(mqttMessage.getQos());
}
// Case for $iothub/twin/PATCH/properties/desired/?$version={new version}
// Tokenize on backslash
String[] topicTokens = topic.split(Pattern.quote("/"));
if (topicTokens.length > PATCH_VERSION_TOKEN)
{
if (message != null)
{
message.setVersion(Integer.parseInt(getVersion(topicTokens[PATCH_VERSION_TOKEN])));
}
}
}
}
}
}
}
return message;
}
}