static int parse_incoming_twin_message()

in iothub_client/src/iothubtransport_amqp_twin_messenger.c [487:658]


static int parse_incoming_twin_message(MESSAGE_HANDLE message,
    char** correlation_id,
    bool* has_version, int64_t* version,
    bool* has_status_code, int* status_code,
    bool* has_twin_report, BINARY_DATA* twin_report)
{
    int result;

    if (get_message_correlation_id(message, correlation_id) != 0)
    {
        LogError("Failed retrieving correlation ID from received TWIN message.");
        result = MU_FAILURE;
    }
    else
    {
        annotations message_annotations;

        if (message_get_message_annotations(message, &message_annotations) != 0)
        {
            LogError("Failed getting TWIN message annotations");
            result = MU_FAILURE;
        }
        else
        {
            result = RESULT_OK;

            if (message_annotations == NULL)
            {
                *has_version = false;
                *has_status_code = false;
            }
            else
            {
                uint32_t pair_count;
                if (amqpvalue_get_map_pair_count((AMQP_VALUE)message_annotations, &pair_count) != 0)
                {
                    LogError("Failed getting TWIN message annotations count");
                    result = MU_FAILURE;
                }
                else
                {
                    uint32_t i;

                    *has_status_code = false;
                    *has_version = false;

                    for (i = 0; i < pair_count; i++)
                    {
                        AMQP_VALUE amqp_map_key;
                        AMQP_VALUE amqp_map_value;

                        if (amqpvalue_get_map_key_value_pair((AMQP_VALUE)message_annotations, i, &amqp_map_key, &amqp_map_value) != 0)
                        {
                            LogError("Failed getting AMQP map key/value pair (%d)", i);
                            result = MU_FAILURE;
                        }
                        else
                        {
                            const char* map_key_name;

                            if (amqpvalue_get_symbol(amqp_map_key, &map_key_name) != 0)
                            {
                                LogError("Failed getting AMQP value symbol");
                                result = MU_FAILURE;
                                break;
                            }
                            else
                            {
                                if (strcmp(TWIN_MESSAGE_PROPERTY_STATUS, map_key_name) == 0)
                                {
                                    if (amqpvalue_get_type(amqp_map_value) != AMQP_TYPE_INT)
                                    {
                                        LogError("TWIN message status property expected to be INT");
                                        result = MU_FAILURE;
                                        break;
                                    }
                                    else if (amqpvalue_get_int(amqp_map_value, status_code) != 0)
                                    {
                                        LogError("Failed getting TWIN message status code value");
                                        result = MU_FAILURE;
                                        break;
                                    }
                                    else
                                    {
                                        *has_status_code = true;
                                    }
                                }
                                else if (strcmp(TWIN_MESSAGE_PROPERTY_VERSION, map_key_name) == 0)
                                {
                                    if (amqpvalue_get_type(amqp_map_value) != AMQP_TYPE_LONG)
                                    {
                                        LogError("TWIN message version property expected to be LONG");
                                        result = MU_FAILURE;
                                        break;
                                    }
                                    else if (amqpvalue_get_long(amqp_map_value, version) != 0)
                                    {
                                        LogError("Failed getting TWIN message version value");
                                        result = MU_FAILURE;
                                        break;
                                    }
                                    else
                                    {
                                        *has_version = true;
                                    }
                                }

                                amqpvalue_destroy(amqp_map_value);
                            }

                            amqpvalue_destroy(amqp_map_key);
                        }
                    }
                }

                amqpvalue_destroy(message_annotations);
            }

            if (result == RESULT_OK)
            {
                MESSAGE_BODY_TYPE body_type;

                if (message_get_body_type(message, &body_type) != 0)
                {
                    LogError("Failed getting TWIN message body type");
                    result = MU_FAILURE;
                }
                else if (body_type == MESSAGE_BODY_TYPE_DATA)
                {
                    size_t body_count;

                    if (message_get_body_amqp_data_count(message, &body_count) != 0)
                    {
                        LogError("Failed getting TWIN message body count");
                        result = MU_FAILURE;
                    }
                    else if (body_count != 1)
                    {
                        LogError("Unexpected number of TWIN message bodies (%lu)", (unsigned long)body_count);
                        result = MU_FAILURE;
                    }
                    else if (message_get_body_amqp_data_in_place(message, 0, twin_report) != 0)
                    {
                        LogError("Failed getting TWIN message body");
                        result = MU_FAILURE;
                    }
                    else
                    {
                        *has_twin_report = true;
                    }
                }
                else if (body_type != MESSAGE_BODY_TYPE_NONE)
                {
                    LogError("Unexpected TWIN message body %d", body_type);
                    result = MU_FAILURE;
                }
                else
                {
                    *has_twin_report = false;
                }
            }
        }

        if (result != RESULT_OK)
        {
            free(*correlation_id);
            *correlation_id = NULL;
        }
    }

    return result;
}