in src/message_receiver.c [37:226]
static void decode_message_value_callback(void* context, AMQP_VALUE decoded_value)
{
MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
MESSAGE_HANDLE decoded_message = message_receiver->decoded_message;
AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(decoded_value);
if (is_application_properties_type_by_descriptor(descriptor))
{
if (message_set_application_properties(decoded_message, decoded_value) != 0)
{
LogError("Error setting application properties on received message");
message_receiver->decode_error = true;
}
}
else if (is_properties_type_by_descriptor(descriptor))
{
PROPERTIES_HANDLE properties;
if (amqpvalue_get_properties(decoded_value, &properties) != 0)
{
LogError("Error getting message properties");
message_receiver->decode_error = true;
}
else
{
if (message_set_properties(decoded_message, properties) != 0)
{
LogError("Error setting message properties on received message");
message_receiver->decode_error = true;
}
properties_destroy(properties);
}
}
else if (is_delivery_annotations_type_by_descriptor(descriptor))
{
annotations delivery_annotations = amqpvalue_get_inplace_described_value(decoded_value);
if (delivery_annotations == NULL)
{
LogError("Error getting delivery annotations");
message_receiver->decode_error = true;
}
else
{
if (message_set_delivery_annotations(decoded_message, delivery_annotations) != 0)
{
LogError("Error setting delivery annotations on received message");
message_receiver->decode_error = true;
}
}
}
else if (is_message_annotations_type_by_descriptor(descriptor))
{
annotations message_annotations = amqpvalue_get_inplace_described_value(decoded_value);
if (message_annotations == NULL)
{
LogError("Error getting message annotations");
message_receiver->decode_error = true;
}
else
{
if (message_set_message_annotations(decoded_message, message_annotations) != 0)
{
LogError("Error setting message annotations on received message");
message_receiver->decode_error = true;
}
}
}
else if (is_header_type_by_descriptor(descriptor))
{
HEADER_HANDLE header;
if (amqpvalue_get_header(decoded_value, &header) != 0)
{
LogError("Error getting message header");
message_receiver->decode_error = true;
}
else
{
if (message_set_header(decoded_message, header) != 0)
{
LogError("Error setting message header on received message");
message_receiver->decode_error = true;
}
header_destroy(header);
}
}
else if (is_footer_type_by_descriptor(descriptor))
{
annotations footer = amqpvalue_get_inplace_described_value(decoded_value);
if (footer == NULL)
{
LogError("Error getting message footer");
message_receiver->decode_error = true;
}
else
{
if (message_set_footer(decoded_message, footer) != 0)
{
LogError("Error setting message footer on received message");
message_receiver->decode_error = true;
}
}
}
else if (is_amqp_value_type_by_descriptor(descriptor))
{
MESSAGE_BODY_TYPE body_type;
if (message_get_body_type(decoded_message, &body_type) != 0)
{
LogError("Error getting message body type");
message_receiver->decode_error = true;
}
else
{
if (body_type != MESSAGE_BODY_TYPE_NONE)
{
LogError("Body already set on received message");
message_receiver->decode_error = true;
}
else
{
AMQP_VALUE body_amqp_value = amqpvalue_get_inplace_described_value(decoded_value);
if (body_amqp_value == NULL)
{
LogError("Error getting body AMQP value");
message_receiver->decode_error = true;
}
else
{
if (message_set_body_amqp_value(decoded_message, body_amqp_value) != 0)
{
LogError("Error setting body AMQP value on received message");
message_receiver->decode_error = true;
}
}
}
}
}
else if (is_data_type_by_descriptor(descriptor))
{
MESSAGE_BODY_TYPE body_type;
if (message_get_body_type(decoded_message, &body_type) != 0)
{
LogError("Error getting message body type");
message_receiver->decode_error = true;
}
else
{
if ((body_type != MESSAGE_BODY_TYPE_NONE) &&
(body_type != MESSAGE_BODY_TYPE_DATA))
{
LogError("Message body type already set to something different than AMQP DATA");
message_receiver->decode_error = true;
}
else
{
AMQP_VALUE body_data_value = amqpvalue_get_inplace_described_value(decoded_value);
if (body_data_value == NULL)
{
LogError("Error getting body DATA value");
message_receiver->decode_error = true;
}
else
{
data data_value;
if (amqpvalue_get_data(body_data_value, &data_value) != 0)
{
LogError("Error getting body DATA AMQP value");
message_receiver->decode_error = true;
}
else
{
BINARY_DATA binary_data;
binary_data.bytes = (const unsigned char*)data_value.bytes;
binary_data.length = data_value.length;
if (message_add_body_amqp_data(decoded_message, binary_data) != 0)
{
LogError("Error adding body DATA to received message");
message_receiver->decode_error = true;
}
}
}
}
}
}
else
{
LogError("Failed decoding descriptor");
message_receiver->decode_error = true;
}
}