in iothub_client/src/iothubtransport_amqp_twin_messenger.c [487:658]
static int parse_incoming_twin_message(MESSAGE_HANDLE message,
char** correlation_id,
bool* has_version, int64_t* version,
bool* has_status_code, int* status_code,
bool* has_twin_report, BINARY_DATA* twin_report)
{
int result;
if (get_message_correlation_id(message, correlation_id) != 0)
{
LogError("Failed retrieving correlation ID from received TWIN message.");
result = MU_FAILURE;
}
else
{
annotations message_annotations;
if (message_get_message_annotations(message, &message_annotations) != 0)
{
LogError("Failed getting TWIN message annotations");
result = MU_FAILURE;
}
else
{
result = RESULT_OK;
if (message_annotations == NULL)
{
*has_version = false;
*has_status_code = false;
}
else
{
uint32_t pair_count;
if (amqpvalue_get_map_pair_count((AMQP_VALUE)message_annotations, &pair_count) != 0)
{
LogError("Failed getting TWIN message annotations count");
result = MU_FAILURE;
}
else
{
uint32_t i;
*has_status_code = false;
*has_version = false;
for (i = 0; i < pair_count; i++)
{
AMQP_VALUE amqp_map_key;
AMQP_VALUE amqp_map_value;
if (amqpvalue_get_map_key_value_pair((AMQP_VALUE)message_annotations, i, &amqp_map_key, &amqp_map_value) != 0)
{
LogError("Failed getting AMQP map key/value pair (%d)", i);
result = MU_FAILURE;
}
else
{
const char* map_key_name;
if (amqpvalue_get_symbol(amqp_map_key, &map_key_name) != 0)
{
LogError("Failed getting AMQP value symbol");
result = MU_FAILURE;
break;
}
else
{
if (strcmp(TWIN_MESSAGE_PROPERTY_STATUS, map_key_name) == 0)
{
if (amqpvalue_get_type(amqp_map_value) != AMQP_TYPE_INT)
{
LogError("TWIN message status property expected to be INT");
result = MU_FAILURE;
break;
}
else if (amqpvalue_get_int(amqp_map_value, status_code) != 0)
{
LogError("Failed getting TWIN message status code value");
result = MU_FAILURE;
break;
}
else
{
*has_status_code = true;
}
}
else if (strcmp(TWIN_MESSAGE_PROPERTY_VERSION, map_key_name) == 0)
{
if (amqpvalue_get_type(amqp_map_value) != AMQP_TYPE_LONG)
{
LogError("TWIN message version property expected to be LONG");
result = MU_FAILURE;
break;
}
else if (amqpvalue_get_long(amqp_map_value, version) != 0)
{
LogError("Failed getting TWIN message version value");
result = MU_FAILURE;
break;
}
else
{
*has_version = true;
}
}
amqpvalue_destroy(amqp_map_value);
}
amqpvalue_destroy(amqp_map_key);
}
}
}
amqpvalue_destroy(message_annotations);
}
if (result == RESULT_OK)
{
MESSAGE_BODY_TYPE body_type;
if (message_get_body_type(message, &body_type) != 0)
{
LogError("Failed getting TWIN message body type");
result = MU_FAILURE;
}
else if (body_type == MESSAGE_BODY_TYPE_DATA)
{
size_t body_count;
if (message_get_body_amqp_data_count(message, &body_count) != 0)
{
LogError("Failed getting TWIN message body count");
result = MU_FAILURE;
}
else if (body_count != 1)
{
LogError("Unexpected number of TWIN message bodies (%lu)", (unsigned long)body_count);
result = MU_FAILURE;
}
else if (message_get_body_amqp_data_in_place(message, 0, twin_report) != 0)
{
LogError("Failed getting TWIN message body");
result = MU_FAILURE;
}
else
{
*has_twin_report = true;
}
}
else if (body_type != MESSAGE_BODY_TYPE_NONE)
{
LogError("Unexpected TWIN message body %d", body_type);
result = MU_FAILURE;
}
else
{
*has_twin_report = false;
}
}
}
if (result != RESULT_OK)
{
free(*correlation_id);
*correlation_id = NULL;
}
}
return result;
}