ASYNC_OPERATION_HANDLE link_transfer_async()

in src/link.c [1355:1544]


ASYNC_OPERATION_HANDLE link_transfer_async(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context, LINK_TRANSFER_RESULT* link_transfer_error, tickcounter_ms_t timeout)
{
    ASYNC_OPERATION_HANDLE result;

    if ((link == NULL) ||
        (link_transfer_error == NULL))
    {
        if (link_transfer_error != NULL)
        {
            *link_transfer_error = LINK_TRANSFER_ERROR;
        }

        LogError("Invalid arguments: link = %p, link_transfer_error = %p",
            link, link_transfer_error);
        result = NULL;
    }
    else
    {
        if (link->role != role_sender)
        {
            LogError("Link is not a sender link");
            *link_transfer_error = LINK_TRANSFER_ERROR;
            result = NULL;
        }
        else if (link->link_state != LINK_STATE_ATTACHED)
        {
            LogError("Link is not attached");
            *link_transfer_error = LINK_TRANSFER_ERROR;
            result = NULL;
        }
        else if (link->current_link_credit == 0)
        {
            *link_transfer_error = LINK_TRANSFER_BUSY;
            result = NULL;
        }
        else
        {
            result = CREATE_ASYNC_OPERATION(DELIVERY_INSTANCE, link_transfer_cancel_handler);
            if (result == NULL)
            {
                LogError("Error creating async operation");
                *link_transfer_error = LINK_TRANSFER_ERROR;
            }
            else
            {
                TRANSFER_HANDLE transfer = transfer_create(0);
                if (transfer == NULL)
                {
                    LogError("Error creating transfer");
                    *link_transfer_error = LINK_TRANSFER_ERROR;
                    async_operation_destroy(result);
                    result = NULL;
                }
                else
                {
                    sequence_no delivery_count = link->delivery_count + 1;
                    unsigned char delivery_tag_bytes[sizeof(delivery_count)];
                    delivery_tag delivery_tag;
                    bool settled;

                    (void)memcpy(delivery_tag_bytes, &delivery_count, sizeof(delivery_count));

                    delivery_tag.bytes = &delivery_tag_bytes;
                    delivery_tag.length = sizeof(delivery_tag_bytes);

                    if (link->snd_settle_mode == sender_settle_mode_unsettled)
                    {
                        settled = false;
                    }
                    else
                    {
                        settled = true;
                    }

                    if (transfer_set_delivery_tag(transfer, delivery_tag) != 0)
                    {
                        LogError("Failed setting delivery tag");
                        *link_transfer_error = LINK_TRANSFER_ERROR;
                        async_operation_destroy(result);
                        result = NULL;
                    }
                    else if (transfer_set_message_format(transfer, message_format) != 0)
                    {
                        LogError("Failed setting message format");
                        *link_transfer_error = LINK_TRANSFER_ERROR;
                        async_operation_destroy(result);
                        result = NULL;
                    }
                    else if (transfer_set_settled(transfer, settled) != 0)
                    {
                        LogError("Failed setting settled flag");
                        *link_transfer_error = LINK_TRANSFER_ERROR;
                        async_operation_destroy(result);
                        result = NULL;
                    }
                    else
                    {
                        AMQP_VALUE transfer_value = amqpvalue_create_transfer(transfer);
                        if (transfer_value == NULL)
                        {
                            LogError("Failed creating transfer performative AMQP value");
                            *link_transfer_error = LINK_TRANSFER_ERROR;
                            async_operation_destroy(result);
                            result = NULL;
                        }
                        else
                        {
                            DELIVERY_INSTANCE* pending_delivery = GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, result);
                            if (pending_delivery == NULL)
                            {
                                LogError("Failed getting pending delivery");
                                *link_transfer_error = LINK_TRANSFER_ERROR;
                                async_operation_destroy(result);
                                result = NULL;
                            }
                            else
                            {
                                if (tickcounter_get_current_ms(link->tick_counter, &pending_delivery->start_tick) != 0)
                                {
                                    LogError("Failed getting current tick");
                                    *link_transfer_error = LINK_TRANSFER_ERROR;
                                    async_operation_destroy(result);
                                    result = NULL;
                                }
                                else
                                {
                                    LIST_ITEM_HANDLE delivery_instance_list_item;
                                    pending_delivery->timeout = timeout;
                                    pending_delivery->on_delivery_settled = on_delivery_settled;
                                    pending_delivery->callback_context = callback_context;
                                    pending_delivery->link = link;
                                    delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, result);

                                    if (delivery_instance_list_item == NULL)
                                    {
                                        LogError("Failed adding delivery to list");
                                        *link_transfer_error = LINK_TRANSFER_ERROR;
                                        async_operation_destroy(result);
                                        result = NULL;
                                    }
                                    else
                                    {
                                        /* here we should feed data to the transfer frame */
                                        switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item))
                                        {
                                        default:
                                        case SESSION_SEND_TRANSFER_ERROR:
                                            LogError("Failed session send transfer");
                                            if (singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item) == 0)
                                            {
                                                async_operation_destroy(result);
                                            }

                                            *link_transfer_error = LINK_TRANSFER_ERROR;
                                            result = NULL;
                                            break;

                                        case SESSION_SEND_TRANSFER_BUSY:
                                            /* Ensure we remove from list again since sender will attempt to transfer again on flow on */
                                            LogError("Failed session send transfer");
                                            if (singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item) == 0)
                                            {
                                                async_operation_destroy(result);
                                            }

                                            *link_transfer_error = LINK_TRANSFER_BUSY;
                                            result = NULL;
                                            break;

                                        case SESSION_SEND_TRANSFER_OK:
                                            link->delivery_count = delivery_count;
                                            link->current_link_credit--;
                                            break;
                                        }
                                    }
                                }
                            }

                            amqpvalue_destroy(transfer_value);
                        }
                    }

                    transfer_destroy(transfer);
                }
            }
        }
    }

    return result;
}