in src/message_sender.c [216:640]
static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send, MESSAGE_HANDLE message)
{
SEND_ONE_MESSAGE_RESULT result;
size_t encoded_size;
size_t total_encoded_size = 0;
MESSAGE_BODY_TYPE message_body_type;
message_format message_format;
if ((message_get_body_type(message, &message_body_type) != 0) ||
(message_get_message_format(message, &message_format) != 0))
{
LogError("Failure getting message body type and/or message format");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
// header
HEADER_HANDLE header = NULL;
AMQP_VALUE header_amqp_value = NULL;
PROPERTIES_HANDLE properties = NULL;
AMQP_VALUE properties_amqp_value = NULL;
AMQP_VALUE application_properties = NULL;
AMQP_VALUE application_properties_value = NULL;
AMQP_VALUE body_amqp_value = NULL;
size_t body_data_count = 0;
AMQP_VALUE msg_annotations = NULL;
bool is_error = false;
// message header
if ((message_get_header(message, &header) == 0) &&
(header != NULL))
{
header_amqp_value = amqpvalue_create_header(header);
if (header_amqp_value == NULL)
{
LogError("Cannot create header AMQP value");
is_error = true;
}
else
{
if (amqpvalue_get_encoded_size(header_amqp_value, &encoded_size) != 0)
{
LogError("Cannot obtain header encoded size");
is_error = true;
}
else
{
total_encoded_size += encoded_size;
}
}
}
// message annotations
if ((!is_error) &&
(message_get_message_annotations(message, &msg_annotations) == 0) &&
(msg_annotations != NULL))
{
if (amqpvalue_get_encoded_size(msg_annotations, &encoded_size) != 0)
{
LogError("Cannot obtain message annotations encoded size");
is_error = true;
}
else
{
total_encoded_size += encoded_size;
}
}
// properties
if ((!is_error) &&
(message_get_properties(message, &properties) == 0) &&
(properties != NULL))
{
properties_amqp_value = amqpvalue_create_properties(properties);
if (properties_amqp_value == NULL)
{
LogError("Cannot create message properties AMQP value");
is_error = true;
}
else
{
if (amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size) != 0)
{
LogError("Cannot obtain message properties encoded size");
is_error = true;
}
else
{
total_encoded_size += encoded_size;
}
}
}
// application properties
if ((!is_error) &&
(message_get_application_properties(message, &application_properties) == 0) &&
(application_properties != NULL))
{
application_properties_value = amqpvalue_create_application_properties(application_properties);
if (application_properties_value == NULL)
{
LogError("Cannot create application properties AMQP value");
is_error = true;
}
else
{
if (amqpvalue_get_encoded_size(application_properties_value, &encoded_size) != 0)
{
LogError("Cannot obtain application properties encoded size");
is_error = true;
}
else
{
total_encoded_size += encoded_size;
}
}
}
if (is_error)
{
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
result = SEND_ONE_MESSAGE_OK;
// body - amqp data
switch (message_body_type)
{
default:
LogError("Unknown body type");
result = SEND_ONE_MESSAGE_ERROR;
break;
case MESSAGE_BODY_TYPE_VALUE:
{
AMQP_VALUE message_body_amqp_value;
if (message_get_body_amqp_value_in_place(message, &message_body_amqp_value) != 0)
{
LogError("Cannot obtain AMQP value from body");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
if (body_amqp_value == NULL)
{
LogError("Cannot create body AMQP value");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
if (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0)
{
LogError("Cannot get body AMQP value encoded size");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
total_encoded_size += encoded_size;
}
}
}
break;
}
case MESSAGE_BODY_TYPE_DATA:
{
BINARY_DATA binary_data;
size_t i;
if (message_get_body_amqp_data_count(message, &body_data_count) != 0)
{
LogError("Cannot get body AMQP data count");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
if (body_data_count == 0)
{
LogError("Body data count is zero");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
for (i = 0; i < body_data_count; i++)
{
if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
{
LogError("Cannot get body AMQP data %u", (unsigned int)i);
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
AMQP_VALUE body_amqp_data;
amqp_binary binary_value;
binary_value.bytes = binary_data.bytes;
binary_value.length = (uint32_t)binary_data.length;
body_amqp_data = amqpvalue_create_data(binary_value);
if (body_amqp_data == NULL)
{
LogError("Cannot create body AMQP data");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
{
LogError("Cannot get body AMQP data encoded size");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
total_encoded_size += encoded_size;
}
amqpvalue_destroy(body_amqp_data);
}
}
}
}
}
break;
}
}
if (result == 0)
{
void* data_bytes = malloc(total_encoded_size);
PAYLOAD payload;
payload.bytes = (const unsigned char*)data_bytes;
payload.length = 0;
result = SEND_ONE_MESSAGE_OK;
if (header != NULL)
{
if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
{
LogError("Cannot encode header value");
result = SEND_ONE_MESSAGE_ERROR;
}
log_message_chunk(message_sender, "Header:", header_amqp_value);
}
if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL))
{
if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0)
{
LogError("Cannot encode message annotations value");
result = SEND_ONE_MESSAGE_ERROR;
}
log_message_chunk(message_sender, "Message Annotations:", msg_annotations);
}
if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
{
if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
{
LogError("Cannot encode message properties value");
result = SEND_ONE_MESSAGE_ERROR;
}
log_message_chunk(message_sender, "Properties:", properties_amqp_value);
}
if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
{
if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
{
LogError("Cannot encode application properties value");
result = SEND_ONE_MESSAGE_ERROR;
}
log_message_chunk(message_sender, "Application properties:", application_properties_value);
}
if (result == SEND_ONE_MESSAGE_OK)
{
switch (message_body_type)
{
default:
LogError("Unknown message type");
result = SEND_ONE_MESSAGE_ERROR;
break;
case MESSAGE_BODY_TYPE_VALUE:
{
if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
{
LogError("Cannot encode body AMQP value");
result = SEND_ONE_MESSAGE_ERROR;
}
log_message_chunk(message_sender, "Body - amqp value:", body_amqp_value);
break;
}
case MESSAGE_BODY_TYPE_DATA:
{
BINARY_DATA binary_data;
size_t i;
for (i = 0; i < body_data_count; i++)
{
if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
{
LogError("Cannot get AMQP data %u", (unsigned int)i);
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
AMQP_VALUE body_amqp_data;
amqp_binary binary_value;
binary_value.bytes = binary_data.bytes;
binary_value.length = (uint32_t)binary_data.length;
body_amqp_data = amqpvalue_create_data(binary_value);
if (body_amqp_data == NULL)
{
LogError("Cannot create body AMQP data %u", (unsigned int)i);
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
{
LogError("Cannot encode body AMQP data %u", (unsigned int)i);
result = SEND_ONE_MESSAGE_ERROR;
break;
}
amqpvalue_destroy(body_amqp_data);
}
}
}
break;
}
}
}
if (result == SEND_ONE_MESSAGE_OK)
{
ASYNC_OPERATION_HANDLE transfer_async_operation;
LINK_TRANSFER_RESULT link_transfer_error;
MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send);
message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;
transfer_async_operation = link_transfer_async(message_sender->link, message_format, &payload, 1, on_delivery_settled, pending_send, &link_transfer_error, message_with_callback->timeout);
if (transfer_async_operation == NULL)
{
if (link_transfer_error == LINK_TRANSFER_BUSY)
{
message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
result = SEND_ONE_MESSAGE_BUSY;
}
else
{
LogError("Error in link transfer");
result = SEND_ONE_MESSAGE_ERROR;
}
}
else
{
// For messages that get atomically sent and settled by link_transfer_async,
// on_delivery_settled is invoked and the message destroyed.
// So at this point we shall verify if the message still exists and is in the queue.
if (is_message_in_queue(message_sender, pending_send))
{
message_with_callback->transfer_async_operation = transfer_async_operation;
}
result = SEND_ONE_MESSAGE_OK;
}
}
free(data_bytes);
if (body_amqp_value != NULL)
{
amqpvalue_destroy(body_amqp_value);
}
}
}
if (header != NULL)
{
header_destroy(header);
}
if (header_amqp_value != NULL)
{
amqpvalue_destroy(header_amqp_value);
}
if (msg_annotations != NULL)
{
annotations_destroy(msg_annotations);
}
if (application_properties != NULL)
{
amqpvalue_destroy(application_properties);
}
if (application_properties_value != NULL)
{
amqpvalue_destroy(application_properties_value);
}
if (properties_amqp_value != NULL)
{
amqpvalue_destroy(properties_amqp_value);
}
if (properties != NULL)
{
properties_destroy(properties);
}
}
return result;
}