static SEND_ONE_MESSAGE_RESULT send_one_message()

in src/message_sender.c [216:640]


static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message_sender, ASYNC_OPERATION_HANDLE pending_send, MESSAGE_HANDLE message)
{
    SEND_ONE_MESSAGE_RESULT result;

    size_t encoded_size;
    size_t total_encoded_size = 0;
    MESSAGE_BODY_TYPE message_body_type;
    message_format message_format;

    if ((message_get_body_type(message, &message_body_type) != 0) ||
        (message_get_message_format(message, &message_format) != 0))
    {
        LogError("Failure getting message body type and/or message format");
        result = SEND_ONE_MESSAGE_ERROR;
    }
    else
    {
        // header
        HEADER_HANDLE header = NULL;
        AMQP_VALUE header_amqp_value = NULL;
        PROPERTIES_HANDLE properties = NULL;
        AMQP_VALUE properties_amqp_value = NULL;
        AMQP_VALUE application_properties = NULL;
        AMQP_VALUE application_properties_value = NULL;
        AMQP_VALUE body_amqp_value = NULL;
        size_t body_data_count = 0;
        AMQP_VALUE msg_annotations = NULL;
        bool is_error = false;

        // message header
        if ((message_get_header(message, &header) == 0) &&
            (header != NULL))
        {
            header_amqp_value = amqpvalue_create_header(header);
            if (header_amqp_value == NULL)
            {
                LogError("Cannot create header AMQP value");
                is_error = true;
            }
            else
            {
                if (amqpvalue_get_encoded_size(header_amqp_value, &encoded_size) != 0)
                {
                    LogError("Cannot obtain header encoded size");
                    is_error = true;
                }
                else
                {
                    total_encoded_size += encoded_size;
                }
            }

        }

        // message annotations
        if ((!is_error) &&
            (message_get_message_annotations(message, &msg_annotations) == 0) &&
            (msg_annotations != NULL))
        {
            if (amqpvalue_get_encoded_size(msg_annotations, &encoded_size) != 0)
            {
                LogError("Cannot obtain message annotations encoded size");
                is_error = true;
            }
            else
            {
                total_encoded_size += encoded_size;
            }
        }

        // properties
        if ((!is_error) &&
            (message_get_properties(message, &properties) == 0) &&
            (properties != NULL))
        {
            properties_amqp_value = amqpvalue_create_properties(properties);
            if (properties_amqp_value == NULL)
            {
                LogError("Cannot create message properties AMQP value");
                is_error = true;
            }
            else
            {
                if (amqpvalue_get_encoded_size(properties_amqp_value, &encoded_size) != 0)
                {
                    LogError("Cannot obtain message properties encoded size");
                    is_error = true;
                }
                else
                {
                    total_encoded_size += encoded_size;
                }
            }
        }

        // application properties
        if ((!is_error) &&
            (message_get_application_properties(message, &application_properties) == 0) &&
            (application_properties != NULL))
        {
            application_properties_value = amqpvalue_create_application_properties(application_properties);
            if (application_properties_value == NULL)
            {
                LogError("Cannot create application properties AMQP value");
                is_error = true;
            }
            else
            {
                if (amqpvalue_get_encoded_size(application_properties_value, &encoded_size) != 0)
                {
                    LogError("Cannot obtain application properties encoded size");
                    is_error = true;
                }
                else
                {
                    total_encoded_size += encoded_size;
                }
            }
        }

        if (is_error)
        {
            result = SEND_ONE_MESSAGE_ERROR;
        }
        else
        {
            result = SEND_ONE_MESSAGE_OK;

            // body - amqp data
            switch (message_body_type)
            {
            default:
                LogError("Unknown body type");
                result = SEND_ONE_MESSAGE_ERROR;
                break;

            case MESSAGE_BODY_TYPE_VALUE:
            {
                AMQP_VALUE message_body_amqp_value;
                if (message_get_body_amqp_value_in_place(message, &message_body_amqp_value) != 0)
                {
                    LogError("Cannot obtain AMQP value from body");
                    result = SEND_ONE_MESSAGE_ERROR;
                }
                else
                {
                    body_amqp_value = amqpvalue_create_amqp_value(message_body_amqp_value);
                    if (body_amqp_value == NULL)
                    {
                        LogError("Cannot create body AMQP value");
                        result = SEND_ONE_MESSAGE_ERROR;
                    }
                    else
                    {
                        if (amqpvalue_get_encoded_size(body_amqp_value, &encoded_size) != 0)
                        {
                            LogError("Cannot get body AMQP value encoded size");
                            result = SEND_ONE_MESSAGE_ERROR;
                        }
                        else
                        {
                            total_encoded_size += encoded_size;
                        }
                    }
                }

                break;
            }

            case MESSAGE_BODY_TYPE_DATA:
            {
                BINARY_DATA binary_data;
                size_t i;

                if (message_get_body_amqp_data_count(message, &body_data_count) != 0)
                {
                    LogError("Cannot get body AMQP data count");
                    result = SEND_ONE_MESSAGE_ERROR;
                }
                else
                {
                    if (body_data_count == 0)
                    {
                        LogError("Body data count is zero");
                        result = SEND_ONE_MESSAGE_ERROR;
                    }
                    else
                    {
                        for (i = 0; i < body_data_count; i++)
                        {
                            if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
                            {
                                LogError("Cannot get body AMQP data %u", (unsigned int)i);
                                result = SEND_ONE_MESSAGE_ERROR;
                            }
                            else
                            {
                                AMQP_VALUE body_amqp_data;
                                amqp_binary binary_value;
                                binary_value.bytes = binary_data.bytes;
                                binary_value.length = (uint32_t)binary_data.length;
                                body_amqp_data = amqpvalue_create_data(binary_value);
                                if (body_amqp_data == NULL)
                                {
                                    LogError("Cannot create body AMQP data");
                                    result = SEND_ONE_MESSAGE_ERROR;
                                }
                                else
                                {
                                    if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0)
                                    {
                                        LogError("Cannot get body AMQP data encoded size");
                                        result = SEND_ONE_MESSAGE_ERROR;
                                    }
                                    else
                                    {
                                        total_encoded_size += encoded_size;
                                    }

                                    amqpvalue_destroy(body_amqp_data);
                                }
                            }
                        }
                    }
                }
                break;
            }
            }

            if (result == 0)
            {
                void* data_bytes = malloc(total_encoded_size);
                PAYLOAD payload;
                payload.bytes = (const unsigned char*)data_bytes;
                payload.length = 0;
                result = SEND_ONE_MESSAGE_OK;

                if (header != NULL)
                {
                    if (amqpvalue_encode(header_amqp_value, encode_bytes, &payload) != 0)
                    {
                        LogError("Cannot encode header value");
                        result = SEND_ONE_MESSAGE_ERROR;
                    }

                    log_message_chunk(message_sender, "Header:", header_amqp_value);
                }

                if ((result == SEND_ONE_MESSAGE_OK) && (msg_annotations != NULL))
                {
                    if (amqpvalue_encode(msg_annotations, encode_bytes, &payload) != 0)
                    {
                        LogError("Cannot encode message annotations value");
                        result = SEND_ONE_MESSAGE_ERROR;
                    }

                    log_message_chunk(message_sender, "Message Annotations:", msg_annotations);
                }

                if ((result == SEND_ONE_MESSAGE_OK) && (properties != NULL))
                {
                    if (amqpvalue_encode(properties_amqp_value, encode_bytes, &payload) != 0)
                    {
                        LogError("Cannot encode message properties value");
                        result = SEND_ONE_MESSAGE_ERROR;
                    }

                    log_message_chunk(message_sender, "Properties:", properties_amqp_value);
                }

                if ((result == SEND_ONE_MESSAGE_OK) && (application_properties != NULL))
                {
                    if (amqpvalue_encode(application_properties_value, encode_bytes, &payload) != 0)
                    {
                        LogError("Cannot encode application properties value");
                        result = SEND_ONE_MESSAGE_ERROR;
                    }

                    log_message_chunk(message_sender, "Application properties:", application_properties_value);
                }

                if (result == SEND_ONE_MESSAGE_OK)
                {
                    switch (message_body_type)
                    {
                    default:
                        LogError("Unknown message type");
                        result = SEND_ONE_MESSAGE_ERROR;
                        break;

                    case MESSAGE_BODY_TYPE_VALUE:
                    {
                        if (amqpvalue_encode(body_amqp_value, encode_bytes, &payload) != 0)
                        {
                            LogError("Cannot encode body AMQP value");
                            result = SEND_ONE_MESSAGE_ERROR;
                        }

                        log_message_chunk(message_sender, "Body - amqp value:", body_amqp_value);
                        break;
                    }
                    case MESSAGE_BODY_TYPE_DATA:
                    {
                        BINARY_DATA binary_data;
                        size_t i;

                        for (i = 0; i < body_data_count; i++)
                        {
                            if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0)
                            {
                                LogError("Cannot get AMQP data %u", (unsigned int)i);
                                result = SEND_ONE_MESSAGE_ERROR;
                            }
                            else
                            {
                                AMQP_VALUE body_amqp_data;
                                amqp_binary binary_value;
                                binary_value.bytes = binary_data.bytes;
                                binary_value.length = (uint32_t)binary_data.length;
                                body_amqp_data = amqpvalue_create_data(binary_value);
                                if (body_amqp_data == NULL)
                                {
                                    LogError("Cannot create body AMQP data %u", (unsigned int)i);
                                    result = SEND_ONE_MESSAGE_ERROR;
                                }
                                else
                                {
                                    if (amqpvalue_encode(body_amqp_data, encode_bytes, &payload) != 0)
                                    {
                                        LogError("Cannot encode body AMQP data %u", (unsigned int)i);
                                        result = SEND_ONE_MESSAGE_ERROR;
                                        break;
                                    }

                                    amqpvalue_destroy(body_amqp_data);
                                }
                            }
                        }
                        break;
                    }
                    }
                }

                if (result == SEND_ONE_MESSAGE_OK)
                {
                    ASYNC_OPERATION_HANDLE transfer_async_operation;
                    LINK_TRANSFER_RESULT link_transfer_error;
                    MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send);
                    message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING;

                    transfer_async_operation = link_transfer_async(message_sender->link, message_format, &payload, 1, on_delivery_settled, pending_send, &link_transfer_error, message_with_callback->timeout);
                    if (transfer_async_operation == NULL)
                    {
                        if (link_transfer_error == LINK_TRANSFER_BUSY)
                        {
                            message_with_callback->message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
                            result = SEND_ONE_MESSAGE_BUSY;
                        }
                        else
                        {
                            LogError("Error in link transfer");
                            result = SEND_ONE_MESSAGE_ERROR;
                        }
                    }
                    else
                    {
                        // For messages that get atomically sent and settled by link_transfer_async,
                        // on_delivery_settled is invoked and the message destroyed.
                        // So at this point we shall verify if the message still exists and is in the queue.
                        if (is_message_in_queue(message_sender, pending_send))
                        {
                            message_with_callback->transfer_async_operation = transfer_async_operation;
                        }

                        result = SEND_ONE_MESSAGE_OK;
                    }
                }

                free(data_bytes);

                if (body_amqp_value != NULL)
                {
                    amqpvalue_destroy(body_amqp_value);
                }
            }
        }

        if (header != NULL)
        {
            header_destroy(header);
        }

        if (header_amqp_value != NULL)
        {
            amqpvalue_destroy(header_amqp_value);
        }

        if (msg_annotations != NULL)
        {
            annotations_destroy(msg_annotations);
        }

        if (application_properties != NULL)
        {
            amqpvalue_destroy(application_properties);
        }

        if (application_properties_value != NULL)
        {
            amqpvalue_destroy(application_properties_value);
        }

        if (properties_amqp_value != NULL)
        {
            amqpvalue_destroy(properties_amqp_value);
        }

        if (properties != NULL)
        {
            properties_destroy(properties);
        }
    }

    return result;
}