in src/mpsc_lock_free_queue.c [56:100]
int mpsc_lock_free_queue_enqueue(MPSC_LOCK_FREE_QUEUE_HANDLE mpsc_lock_free_queue, MPSC_LOCK_FREE_QUEUE_ITEM* item)
{
int result;
if ((mpsc_lock_free_queue == NULL) ||
(item == NULL))
{
/* Codes_SRS_MPSC_LOCK_FREE_QUEUE_01_008: [ If mpsc_lock_free_queue or item is NULL, mpsc_lock_free_queue_enqueue shall fail and return a non-zero value. ]*/
LogError("Bad arguments: mpsc_lock_free_queue = %p, item = %p",
mpsc_lock_free_queue, item);
result = MU_FAILURE;
}
else
{
/* Codes_SRS_MPSC_LOCK_FREE_QUEUE_01_006: [ mpsc_lock_free_queue_enqueue shall enqueue the item item in the queue. ]*/
// enqueue simply puts the item in the enqueue_head
// it is assumed that destroy is not called concurrently with enqueue
do
{
/* Codes_SRS_MPSC_LOCK_FREE_QUEUE_01_014: [** Concurrent mpsc_lock_free_queue_enqueue from multiple threads shall be safe. ]*/
// first get the current enqueue_head
MPSC_LOCK_FREE_QUEUE_ITEM* current_head = interlocked_compare_exchange_pointer((void* volatile_atomic*)&mpsc_lock_free_queue->enqueue_head, NULL, NULL);
// now set the item's next to the head
item->next = current_head;
// now simply exchange the head if it has not changed in the meanwhile
if (interlocked_compare_exchange_pointer((void* volatile_atomic*) &mpsc_lock_free_queue->enqueue_head, item, current_head) == current_head)
{
// yay, we have inserted the item, simply bail out, we are done!
break;
}
// unfortunatelly someone else tried to insert while we were replacing the head, so restart the insert
} while (1);
/* Codes_SRS_MPSC_LOCK_FREE_QUEUE_01_007: [ On success it shall return 0. ]*/
result = 0;
}
return result;
}