in iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsTwinReceiverLinkHandler.java [100:175]
protected IotHubTransportMessage protonMessageToIoTHubMessage(AmqpsMessage protonMsg)
{
IotHubTransportMessage iotHubTransportMessage = super.protonMessageToIoTHubMessage(protonMsg);
MessageCallback messageCallback = clientConfiguration.getDeviceTwinMessageCallback();
Object messageContext = clientConfiguration.getDeviceTwinMessageContext();
iotHubTransportMessage.setMessageCallback(messageCallback);
iotHubTransportMessage.setMessageCallbackContext(messageContext);
iotHubTransportMessage.setMessageType(MessageType.DEVICE_TWIN);
iotHubTransportMessage.setDeviceOperationType(DEVICE_OPERATION_UNKNOWN);
MessageAnnotations messageAnnotations = protonMsg.getMessageAnnotations();
if (messageAnnotations != null)
{
for (Map.Entry<Symbol, Object> entry : messageAnnotations.getValue().entrySet())
{
Symbol key = entry.getKey();
Object value = entry.getValue();
if (key.toString().equals(MESSAGE_ANNOTATION_FIELD_KEY_STATUS))
{
iotHubTransportMessage.setStatus(value.toString());
}
else if (key.toString().equals(MESSAGE_ANNOTATION_FIELD_KEY_VERSION))
{
iotHubTransportMessage.setVersion(Integer.parseInt(value.toString()));
}
else if (key.toString().equals(MESSAGE_ANNOTATION_FIELD_KEY_RESOURCE) && value.toString().equals(MESSAGE_ANNOTATION_FIELD_VALUE_PROPERTIES_DESIRED))
{
iotHubTransportMessage.setDeviceOperationType(DEVICE_OPERATION_TWIN_SUBSCRIBE_DESIRED_PROPERTIES_RESPONSE);
}
}
}
Properties properties = protonMsg.getProperties();
if (properties != null && properties.getCorrelationId() != null)
{
iotHubTransportMessage.setCorrelationId(properties.getCorrelationId().toString());
if (twinOperationCorrelationMap.containsKey(properties.getCorrelationId().toString()))
{
DeviceOperations deviceOperations = twinOperationCorrelationMap.get(properties.getCorrelationId().toString());
switch (deviceOperations)
{
case DEVICE_OPERATION_TWIN_GET_REQUEST:
iotHubTransportMessage.setDeviceOperationType(DEVICE_OPERATION_TWIN_GET_RESPONSE);
break;
case DEVICE_OPERATION_TWIN_UPDATE_REPORTED_PROPERTIES_REQUEST:
iotHubTransportMessage.setDeviceOperationType(DEVICE_OPERATION_TWIN_UPDATE_REPORTED_PROPERTIES_RESPONSE);
break;
case DEVICE_OPERATION_TWIN_SUBSCRIBE_DESIRED_PROPERTIES_REQUEST:
iotHubTransportMessage.setDeviceOperationType(DEVICE_OPERATION_TWIN_SUBSCRIBE_DESIRED_PROPERTIES_RESPONSE);
break;
case DEVICE_OPERATION_TWIN_UNSUBSCRIBE_DESIRED_PROPERTIES_REQUEST:
iotHubTransportMessage.setDeviceOperationType(DEVICE_OPERATION_TWIN_UNSUBSCRIBE_DESIRED_PROPERTIES_RESPONSE);
break;
default:
log.error("Unrecognized device operation type during conversion of proton message into an iothub message");
}
this.twinOperationCorrelationMap.remove(properties.getCorrelationId().toString());
}
}
else if (iotHubTransportMessage.getDeviceOperationType() == DEVICE_OPERATION_UNKNOWN)
{
iotHubTransportMessage.setDeviceOperationType(DEVICE_OPERATION_TWIN_SUBSCRIBE_DESIRED_PROPERTIES_RESPONSE);
if (iotHubTransportMessage.getStatus() == null || iotHubTransportMessage.getStatus().isEmpty())
{
iotHubTransportMessage.setStatus(DEFAULT_STATUS_CODE);
}
}
return iotHubTransportMessage;
}