static void decode_message_value_callback()

in src/message_receiver.c [37:226]


static void decode_message_value_callback(void* context, AMQP_VALUE decoded_value)
{
    MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
    MESSAGE_HANDLE decoded_message = message_receiver->decoded_message;
    AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value);

    if (is_application_properties_type_by_descriptor(descriptor))
    {
        if (message_set_application_properties(decoded_message, decoded_value) != 0)
        {
            LogError("Error setting application properties on received message");
            message_receiver->decode_error = true;
        }
    }
    else if (is_properties_type_by_descriptor(descriptor))
    {
        PROPERTIES_HANDLE properties;
        if (amqpvalue_get_properties(decoded_value, &properties) != 0)
        {
            LogError("Error getting message properties");
            message_receiver->decode_error = true;
        }
        else
        {
            if (message_set_properties(decoded_message, properties) != 0)
            {
                LogError("Error setting message properties on received message");
                message_receiver->decode_error = true;
            }

            properties_destroy(properties);
        }
    }
    else if (is_delivery_annotations_type_by_descriptor(descriptor))
    {
        annotations delivery_annotations = amqpvalue_get_inplace_described_value(decoded_value);
        if (delivery_annotations == NULL)
        {
            LogError("Error getting delivery annotations");
            message_receiver->decode_error = true;
        }
        else
        {
            if (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0)
            {
                LogError("Error setting delivery annotations on received message");
                message_receiver->decode_error = true;
            }
        }
    }
    else if (is_message_annotations_type_by_descriptor(descriptor))
    {
        annotations message_annotations = amqpvalue_get_inplace_described_value(decoded_value);
        if (message_annotations == NULL)
        {
            LogError("Error getting message annotations");
            message_receiver->decode_error = true;
        }
        else
        {
            if (message_set_message_annotations(decoded_message, message_annotations) != 0)
            {
                LogError("Error setting message annotations on received message");
                message_receiver->decode_error = true;
            }
        }
    }
    else if (is_header_type_by_descriptor(descriptor))
    {
        HEADER_HANDLE header;
        if (amqpvalue_get_header(decoded_value, &header) != 0)
        {
            LogError("Error getting message header");
            message_receiver->decode_error = true;
        }
        else
        {
            if (message_set_header(decoded_message, header) != 0)
            {
                LogError("Error setting message header on received message");
                message_receiver->decode_error = true;
            }

            header_destroy(header);
        }
    }
    else if (is_footer_type_by_descriptor(descriptor))
    {
        annotations footer = amqpvalue_get_inplace_described_value(decoded_value);
        if (footer == NULL)
        {
            LogError("Error getting message footer");
            message_receiver->decode_error = true;
        }
        else
        {
            if (message_set_footer(decoded_message, footer) != 0)
            {
                LogError("Error setting message footer on received message");
                message_receiver->decode_error = true;
            }
        }
    }
    else if (is_amqp_value_type_by_descriptor(descriptor))
    {
        MESSAGE_BODY_TYPE body_type;
        if (message_get_body_type(decoded_message, &body_type) != 0)
        {
            LogError("Error getting message body type");
            message_receiver->decode_error = true;
        }
        else
        {
            if (body_type != MESSAGE_BODY_TYPE_NONE)
            {
                LogError("Body already set on received message");
                message_receiver->decode_error = true;
            }
            else
            {
                AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value);
                if (body_amqp_value == NULL)
                {
                    LogError("Error getting body AMQP value");
                    message_receiver->decode_error = true;
                }
                else
                {
                    if (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0)
                    {
                        LogError("Error setting body AMQP value on received message");
                        message_receiver->decode_error = true;
                    }
                }
            }
        }
    }
    else if (is_data_type_by_descriptor(descriptor))
    {
        MESSAGE_BODY_TYPE body_type;
        if (message_get_body_type(decoded_message, &body_type) != 0)
        {
            LogError("Error getting message body type");
            message_receiver->decode_error = true;
        }
        else
        {
            if ((body_type != MESSAGE_BODY_TYPE_NONE) &&
                (body_type != MESSAGE_BODY_TYPE_DATA))
            {
                LogError("Message body type already set to something different than AMQP DATA");
                message_receiver->decode_error = true;
            }
            else
            {
                AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value);
                if (body_data_value == NULL)
                {
                    LogError("Error getting body DATA value");
                    message_receiver->decode_error = true;
                }
                else
                {
                    data data_value;
                    if (amqpvalue_get_data(body_data_value, &data_value) != 0)
                    {
                        LogError("Error getting body DATA AMQP value");
                        message_receiver->decode_error = true;
                    }
                    else
                    {
                        BINARY_DATA binary_data;
                        binary_data.bytes = (const unsigned char*)data_value.bytes;
                        binary_data.length = data_value.length;
                        if (message_add_body_amqp_data(decoded_message, binary_data) != 0)
                        {
                            LogError("Error adding body DATA to received message");
                            message_receiver->decode_error = true;
                        }
                    }
                }
            }
        }
    }
    else
    {
        LogError("Failed decoding descriptor");
        message_receiver->decode_error = true;
    }
}