in src/link.c [1355:1544]
ASYNC_OPERATION_HANDLE link_transfer_async(LINK_HANDLE link, message_format message_format, PAYLOAD* payloads, size_t payload_count, ON_DELIVERY_SETTLED on_delivery_settled, void* callback_context, LINK_TRANSFER_RESULT* link_transfer_error, tickcounter_ms_t timeout)
{
ASYNC_OPERATION_HANDLE result;
if ((link == NULL) ||
(link_transfer_error == NULL))
{
if (link_transfer_error != NULL)
{
*link_transfer_error = LINK_TRANSFER_ERROR;
}
LogError("Invalid arguments: link = %p, link_transfer_error = %p",
link, link_transfer_error);
result = NULL;
}
else
{
if (link->role != role_sender)
{
LogError("Link is not a sender link");
*link_transfer_error = LINK_TRANSFER_ERROR;
result = NULL;
}
else if (link->link_state != LINK_STATE_ATTACHED)
{
LogError("Link is not attached");
*link_transfer_error = LINK_TRANSFER_ERROR;
result = NULL;
}
else if (link->current_link_credit == 0)
{
*link_transfer_error = LINK_TRANSFER_BUSY;
result = NULL;
}
else
{
result = CREATE_ASYNC_OPERATION(DELIVERY_INSTANCE, link_transfer_cancel_handler);
if (result == NULL)
{
LogError("Error creating async operation");
*link_transfer_error = LINK_TRANSFER_ERROR;
}
else
{
TRANSFER_HANDLE transfer = transfer_create(0);
if (transfer == NULL)
{
LogError("Error creating transfer");
*link_transfer_error = LINK_TRANSFER_ERROR;
async_operation_destroy(result);
result = NULL;
}
else
{
sequence_no delivery_count = link->delivery_count + 1;
unsigned char delivery_tag_bytes[sizeof(delivery_count)];
delivery_tag delivery_tag;
bool settled;
(void)memcpy(delivery_tag_bytes, &delivery_count, sizeof(delivery_count));
delivery_tag.bytes = &delivery_tag_bytes;
delivery_tag.length = sizeof(delivery_tag_bytes);
if (link->snd_settle_mode == sender_settle_mode_unsettled)
{
settled = false;
}
else
{
settled = true;
}
if (transfer_set_delivery_tag(transfer, delivery_tag) != 0)
{
LogError("Failed setting delivery tag");
*link_transfer_error = LINK_TRANSFER_ERROR;
async_operation_destroy(result);
result = NULL;
}
else if (transfer_set_message_format(transfer, message_format) != 0)
{
LogError("Failed setting message format");
*link_transfer_error = LINK_TRANSFER_ERROR;
async_operation_destroy(result);
result = NULL;
}
else if (transfer_set_settled(transfer, settled) != 0)
{
LogError("Failed setting settled flag");
*link_transfer_error = LINK_TRANSFER_ERROR;
async_operation_destroy(result);
result = NULL;
}
else
{
AMQP_VALUE transfer_value = amqpvalue_create_transfer(transfer);
if (transfer_value == NULL)
{
LogError("Failed creating transfer performative AMQP value");
*link_transfer_error = LINK_TRANSFER_ERROR;
async_operation_destroy(result);
result = NULL;
}
else
{
DELIVERY_INSTANCE* pending_delivery = GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, result);
if (pending_delivery == NULL)
{
LogError("Failed getting pending delivery");
*link_transfer_error = LINK_TRANSFER_ERROR;
async_operation_destroy(result);
result = NULL;
}
else
{
if (tickcounter_get_current_ms(link->tick_counter, &pending_delivery->start_tick) != 0)
{
LogError("Failed getting current tick");
*link_transfer_error = LINK_TRANSFER_ERROR;
async_operation_destroy(result);
result = NULL;
}
else
{
LIST_ITEM_HANDLE delivery_instance_list_item;
pending_delivery->timeout = timeout;
pending_delivery->on_delivery_settled = on_delivery_settled;
pending_delivery->callback_context = callback_context;
pending_delivery->link = link;
delivery_instance_list_item = singlylinkedlist_add(link->pending_deliveries, result);
if (delivery_instance_list_item == NULL)
{
LogError("Failed adding delivery to list");
*link_transfer_error = LINK_TRANSFER_ERROR;
async_operation_destroy(result);
result = NULL;
}
else
{
/* here we should feed data to the transfer frame */
switch (session_send_transfer(link->link_endpoint, transfer, payloads, payload_count, &pending_delivery->delivery_id, (settled) ? on_send_complete : NULL, delivery_instance_list_item))
{
default:
case SESSION_SEND_TRANSFER_ERROR:
LogError("Failed session send transfer");
if (singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item) == 0)
{
async_operation_destroy(result);
}
*link_transfer_error = LINK_TRANSFER_ERROR;
result = NULL;
break;
case SESSION_SEND_TRANSFER_BUSY:
/* Ensure we remove from list again since sender will attempt to transfer again on flow on */
LogError("Failed session send transfer");
if (singlylinkedlist_remove(link->pending_deliveries, delivery_instance_list_item) == 0)
{
async_operation_destroy(result);
}
*link_transfer_error = LINK_TRANSFER_BUSY;
result = NULL;
break;
case SESSION_SEND_TRANSFER_OK:
link->delivery_count = delivery_count;
link->current_link_credit--;
break;
}
}
}
}
amqpvalue_destroy(transfer_value);
}
}
transfer_destroy(transfer);
}
}
}
}
return result;
}