int mpsc_lock_free_queue_enqueue()

in src/mpsc_lock_free_queue.c [56:100]


int mpsc_lock_free_queue_enqueue(MPSC_LOCK_FREE_QUEUE_HANDLE mpsc_lock_free_queue, MPSC_LOCK_FREE_QUEUE_ITEM* item)
{
    int result;

    if ((mpsc_lock_free_queue == NULL) ||
        (item == NULL))
    {
        /* Codes_SRS_MPSC_LOCK_FREE_QUEUE_01_008: [ If mpsc_lock_free_queue or item is NULL, mpsc_lock_free_queue_enqueue shall fail and return a non-zero value. ]*/
        LogError("Bad arguments: mpsc_lock_free_queue = %p, item = %p",
            mpsc_lock_free_queue, item);
        result = MU_FAILURE;
    }
    else
    {
        /* Codes_SRS_MPSC_LOCK_FREE_QUEUE_01_006: [ mpsc_lock_free_queue_enqueue shall enqueue the item item in the queue. ]*/
        // enqueue simply puts the item in the enqueue_head

        // it is assumed that destroy is not called concurrently with enqueue

        do
        {
            /* Codes_SRS_MPSC_LOCK_FREE_QUEUE_01_014: [** Concurrent mpsc_lock_free_queue_enqueue from multiple threads shall be safe. ]*/

            // first get the current enqueue_head
            MPSC_LOCK_FREE_QUEUE_ITEM* current_head = interlocked_compare_exchange_pointer((void* volatile_atomic*)&mpsc_lock_free_queue->enqueue_head, NULL, NULL);

            // now set the item's next to the head
            item->next = current_head;

            // now simply exchange the head if it has not changed in the meanwhile
            if (interlocked_compare_exchange_pointer((void* volatile_atomic*) &mpsc_lock_free_queue->enqueue_head, item, current_head) == current_head)
            {
                // yay, we have inserted the item, simply bail out, we are done!
                break;
            }

            // unfortunatelly someone else tried to insert while we were replacing the head, so restart the insert
        } while (1);

        /* Codes_SRS_MPSC_LOCK_FREE_QUEUE_01_007: [ On success it shall return 0. ]*/
        result = 0;
    }

    return result;
}