in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/mqtt/MqttDirectMethod.java [119:179]
public IotHubTransportMessage receive()
{
synchronized (this.receivedMessagesLock)
{
IotHubTransportMessage message = null;
Pair<String, MqttMessage> messagePair = this.receivedMessages.peek();
if (messagePair != null)
{
String topic = messagePair.getKey();
if (topic != null && topic.length() > 0)
{
MqttMessage mqttMessage = messagePair.getValue();
byte[] data = mqttMessage.getPayload();
if (topic.length() > METHOD.length() && topic.startsWith(METHOD))
{
if (topic.length() > POST.length() && topic.startsWith(POST))
{
//remove this message from the queue as this is the correct handler
this.receivedMessages.poll();
// Case for $iothub/methods/POST/{method name}/?$rid={request id}
TopicParser topicParser = new TopicParser(topic);
if (data != null && data.length > 0)
{
message = new IotHubTransportMessage(data, MessageType.DEVICE_METHODS);
}
else
{
message = new IotHubTransportMessage(new byte[0], MessageType.DEVICE_METHODS);
}
message.setDeviceOperationType(DeviceOperations.DEVICE_OPERATION_UNKNOWN);
message.setQualityOfService(mqttMessage.getQos());
String methodName = topicParser.getMethodName(METHOD_TOKEN);
message.setMethodName(methodName);
String reqId = topicParser.getRequestId(REQID_TOKEN);
if (reqId != null)
{
message.setRequestId(reqId);
message.setDeviceOperationType(DeviceOperations.DEVICE_OPERATION_METHOD_RECEIVE_REQUEST);
}
else
{
log.warn("Request ID cannot be null");
}
}
}
}
}
return message;
}
}