SESSION_SEND_TRANSFER_RESULT session_send_transfer()

in src/session.c [1447:1695]


SESSION_SEND_TRANSFER_RESULT session_send_transfer(LINK_ENDPOINT_HANDLE link_endpoint, TRANSFER_HANDLE transfer, PAYLOAD* payloads, size_t payload_count, delivery_number* delivery_id, ON_SEND_COMPLETE on_send_complete, void* callback_context)
{
    SESSION_SEND_TRANSFER_RESULT result;

    /* Codes_S_R_S_SESSION_01_054: [If link_endpoint or transfer is NULL, session_send_transfer shall fail and return a non-zero value.] */
    if ((link_endpoint == NULL) ||
        (transfer == NULL))
    {
        result = SESSION_SEND_TRANSFER_ERROR;
    }
    else
    {
        LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint;
        SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)link_endpoint_instance->session;

        /* Codes_S_R_S_SESSION_01_059: [When session_send_transfer is called while the session is not in the MAPPED state, session_send_transfer shall fail and return a non-zero value.] */
        if (session_instance->session_state != SESSION_STATE_MAPPED)
        {
            result = SESSION_SEND_TRANSFER_ERROR;
        }
        else
        {
            size_t payload_size = 0;
            size_t i;

            for (i = 0; i < payload_count; i++)
            {
                if ((payloads[i].length > UINT32_MAX) ||
                    (payload_size + payloads[i].length < payload_size))
                {
                    break;
                }

                payload_size += payloads[i].length;
            }

            if ((i < payload_count) ||
                (payload_size > UINT32_MAX))
            {
                result = SESSION_SEND_TRANSFER_ERROR;
            }
            else
            {
                if (session_instance->remote_incoming_window == 0)
                {
                    result = SESSION_SEND_TRANSFER_BUSY;
                }
                else
                {
                    /* Codes_S_R_S_SESSION_01_012: [The session endpoint assigns each outgoing transfer frame an implicit transfer-id from a session scoped sequence.] */
                    /* Codes_S_R_S_SESSION_01_027: [sending a transfer Upon sending a transfer, the sending endpoint will increment its next-outgoing-id] */
                    *delivery_id = session_instance->next_outgoing_id;
                    if ((transfer_set_handle(transfer, link_endpoint_instance->output_handle) != 0) ||
                        (transfer_set_delivery_id(transfer, *delivery_id) != 0) ||
                        (transfer_set_more(transfer, false) != 0))
                    {
                        /* Codes_S_R_S_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */
                        result = SESSION_SEND_TRANSFER_ERROR;
                    }
                    else
                    {
                        AMQP_VALUE transfer_value;

                        transfer_value = amqpvalue_create_transfer(transfer);
                        if (transfer_value == NULL)
                        {
                            /* Codes_S_R_S_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */
                            result = SESSION_SEND_TRANSFER_ERROR;
                        }
                        else
                        {
                            uint32_t available_frame_size;
                            size_t encoded_size;

                            if ((connection_get_remote_max_frame_size(session_instance->connection, &available_frame_size) != 0) ||
                                (amqpvalue_get_encoded_size(transfer_value, &encoded_size) != 0))
                            {
                                result = SESSION_SEND_TRANSFER_ERROR;
                            }
                            else
                            {
                                payload_size = 0;

                                for (i = 0; i < payload_count; i++)
                                {
                                    payload_size += payloads[i].length;
                                }

                                available_frame_size -= (uint32_t)encoded_size;
                                available_frame_size -= 8;

                                if (available_frame_size >= payload_size)
                                {
                                    /* Codes_S_R_S_SESSION_01_055: [The encoding of the frame shall be done by calling connection_encode_frame and passing as arguments: the connection handle associated with the session, the transfer performative and the payload chunks passed to session_send_transfer.] */
                                    if (connection_encode_frame(session_instance->endpoint, transfer_value, payloads, payload_count, on_send_complete, callback_context) != 0)
                                    {
                                        /* Codes_S_R_S_SESSION_01_056: [If connection_encode_frame fails then session_send_transfer shall fail and return a non-zero value.] */
                                        result = SESSION_SEND_TRANSFER_ERROR;
                                    }
                                    else
                                    {
                                        /* Codes_S_R_S_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */
                                        session_instance->next_outgoing_id++;
                                        session_instance->remote_incoming_window--;
                                        session_instance->outgoing_window--;

                                        /* Codes_S_R_S_SESSION_01_053: [On success, session_send_transfer shall return 0.] */
                                        result = SESSION_SEND_TRANSFER_OK;
                                    }
                                }
                                else
                                {
                                    size_t current_payload_index = 0;
                                    uint32_t current_payload_pos = 0;

                                    /* break it down into different deliveries */
                                    while (payload_size > 0)
                                    {
                                        uint32_t transfer_frame_payload_count = 0;
                                        uint32_t current_transfer_frame_payload_size = (uint32_t)payload_size;
                                        uint32_t byte_counter;
                                        size_t temp_current_payload_index = current_payload_index;
                                        uint32_t temp_current_payload_pos = current_payload_pos;
                                        AMQP_VALUE multi_transfer_amqp_value;
                                        PAYLOAD* transfer_frame_payloads;
                                        bool more;

                                        if (current_transfer_frame_payload_size > available_frame_size)
                                        {
                                            current_transfer_frame_payload_size = available_frame_size;
                                        }

                                        if (available_frame_size >= payload_size)
                                        {
                                            more = false;
                                        }
                                        else
                                        {
                                            more = true;
                                        }

                                        if (transfer_set_more(transfer, more) != 0)
                                        {
                                            break;
                                        }

                                        multi_transfer_amqp_value = amqpvalue_create_transfer(transfer);
                                        if (multi_transfer_amqp_value == NULL)
                                        {
                                            break;
                                        }

                                        byte_counter = current_transfer_frame_payload_size;
                                        while (byte_counter > 0)
                                        {
                                            if (payloads[temp_current_payload_index].length - temp_current_payload_pos >= byte_counter)
                                            {
                                                /* more data than we need */
                                                temp_current_payload_pos += byte_counter;
                                                byte_counter = 0;
                                            }
                                            else
                                            {
                                                byte_counter -= (uint32_t)payloads[temp_current_payload_index].length - temp_current_payload_pos;
                                                temp_current_payload_index++;
                                                temp_current_payload_pos = 0;
                                            }
                                        }

                                        //transfer_frame_payload_len = (uint32_t)(temp_current_payload_index - current_payload_index + 1); // use safe int
                                        size_t payload_len = safe_subtract_size_t(temp_current_payload_index, current_payload_index);
                                        payload_len = safe_add_size_t(payload_len, 1);
                                        uint32_t transfer_frame_payload_len = payload_len < UINT32_MAX ? (uint32_t)payload_len : UINT32_MAX;

                                        if (transfer_frame_payload_len == UINT32_MAX ||
                                           (transfer_frame_payloads = (PAYLOAD*)calloc(1, (transfer_frame_payload_len * sizeof(PAYLOAD)))) == NULL)
                                        {
                                            amqpvalue_destroy(multi_transfer_amqp_value);
                                            break;
                                        }

                                        /* copy data */
                                        byte_counter = current_transfer_frame_payload_size;
                                        transfer_frame_payload_count = 0;

                                        while (byte_counter > 0 && transfer_frame_payload_count < transfer_frame_payload_len)
                                        {
                                            if (payloads[current_payload_index].length - current_payload_pos > byte_counter)
                                            {
                                                /* more data than we need */
                                                transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos;
                                                transfer_frame_payloads[transfer_frame_payload_count].length = byte_counter;
                                                current_payload_pos += byte_counter;
                                                byte_counter = 0;
                                            }
                                            else
                                            {
                                                /* copy entire payload and move to the next */
                                                transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos;
                                                transfer_frame_payloads[transfer_frame_payload_count].length = payloads[current_payload_index].length - current_payload_pos;
                                                byte_counter -= (uint32_t)payloads[current_payload_index].length - current_payload_pos;
                                                current_payload_index++;
                                                current_payload_pos = 0;
                                            }

                                            transfer_frame_payload_count++;
                                        }

                                        if (connection_encode_frame(session_instance->endpoint, multi_transfer_amqp_value, transfer_frame_payloads, transfer_frame_payload_count,
                                            /* only fire the send complete calllback on the last frame of the multi frame transfer */
                                            more ? NULL : on_send_complete,
                                            callback_context) != 0)
                                        {
                                            free(transfer_frame_payloads);
                                            amqpvalue_destroy(multi_transfer_amqp_value);
                                            break;
                                        }

                                        free(transfer_frame_payloads);
                                        amqpvalue_destroy(multi_transfer_amqp_value);
                                        payload_size -= current_transfer_frame_payload_size;
                                    }

                                    if (payload_size > 0)
                                    {
                                        result = SESSION_SEND_TRANSFER_ERROR;
                                    }
                                    else
                                    {
                                        /* Codes_S_R_S_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */
                                        session_instance->next_outgoing_id++;
                                        session_instance->remote_incoming_window--;
                                        session_instance->outgoing_window--;

                                        result = SESSION_SEND_TRANSFER_OK;
                                    }
                                }
                            }

                            amqpvalue_destroy(transfer_value);
                        }
                    }
                }
            }
        }
    }

    return result;
}