in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/mqtt/MqttIotHubConnection.java [442:507]
public void onMessageArrived(int messageId)
{
IotHubTransportMessage transportMessage = this.directMethod.receive();
if (transportMessage != null)
{
log.trace("Received MQTT device method message ({})", transportMessage);
}
else
{
transportMessage = deviceTwin.receive();
if (transportMessage != null)
{
log.trace("Received MQTT device twin message ({})", transportMessage);
}
else
{
transportMessage = deviceMessaging.receive();
if (transportMessage != null)
{
log.trace("Received MQTT device messaging message ({})", transportMessage);
}
}
}
if (transportMessage == null)
{
//Ack is not sent to service for this message because we cannot interpret the message. Service will likely re-send
this.listener.onMessageReceived(null, new TransportException("Message sent from service could not be parsed"));
log.warn("Received message that could not be parsed. That message has been ignored.");
}
else
{
if (transportMessage.getQualityOfService() == 0)
{
// Direct method messages and Twin messages are always sent with QoS 0, so there is no need for this SDK
// to acknowledge them.
log.trace("MQTT received message with QoS 0 so it has not been added to the messages to acknowledge list ({})", transportMessage);
}
else
{
log.trace("MQTT received message so it has been added to the messages to acknowledge list ({})", transportMessage);
this.receivedMessagesToAcknowledge.put(transportMessage, messageId);
}
switch (transportMessage.getMessageType())
{
case DEVICE_TWIN:
transportMessage.setMessageCallback(this.config.getDeviceTwinMessageCallback());
transportMessage.setMessageCallbackContext(this.config.getDeviceTwinMessageContext());
break;
case DEVICE_METHODS:
transportMessage.setMessageCallback(this.config.getDirectMethodsMessageCallback());
transportMessage.setMessageCallbackContext(this.config.getDirectMethodsMessageContext());
break;
case DEVICE_TELEMETRY:
transportMessage.setMessageCallback(this.config.getDeviceTelemetryMessageCallback(transportMessage.getInputName()));
transportMessage.setMessageCallbackContext(this.config.getDeviceTelemetryMessageContext(transportMessage.getInputName()));
break;
case UNKNOWN:
default:
//do nothing
}
this.listener.onMessageReceived(transportMessage, null);
}
}