TQUEUE_PUSH_RESULT TQUEUE_LL_PUSH()

in common/inc/c_pal/tqueue_ll.h [240:395]


TQUEUE_PUSH_RESULT TQUEUE_LL_PUSH(C)(TQUEUE_LL(T) tqueue, T* item, void* copy_item_function_context)                                                                \
{                                                                                                                                                                   \
    TQUEUE_PUSH_RESULT result;                                                                                                                                      \
    if (                                                                                                                                                            \
        /* Codes_SRS_TQUEUE_01_012: [ If tqueue is NULL then TQUEUE_PUSH(T) shall fail and return TQUEUE_PUSH_INVALID_ARG. ]*/                                      \
        (tqueue == NULL) ||                                                                                                                                         \
        /* Codes_SRS_TQUEUE_01_013: [ If item is NULL then TQUEUE_PUSH(T) shall fail and return TQUEUE_PUSH_INVALID_ARG. ]*/                                        \
        (item == NULL)                                                                                                                                              \
       )                                                                                                                                                            \
    {                                                                                                                                                               \
        LogError("Invalid arguments: TQUEUE_LL(" MU_TOSTRING(T) ") tqueue=%p, const " MU_TOSTRING(T) "* item=%p, void* copy_item_function_context=%p",              \
            tqueue, item, copy_item_function_context);                                                                                                              \
        result = TQUEUE_PUSH_INVALID_ARG;                                                                                                                           \
    }                                                                                                                                                               \
    else                                                                                                                                                            \
    {                                                                                                                                                               \
        TQUEUE_TYPEDEF_NAME(T)* tqueue_ptr = THANDLE_GET_T(TQUEUE_TYPEDEF_NAME(T))(tqueue);                                                                         \
        bool unlock_needed = true;                                                                                                                                  \
        /* Codes_SRS_TQUEUE_01_058: [ TQUEUE_PUSH(T) shall acquire in shared mode the lock used to guard the growing of the queue. ] */                             \
        srw_lock_ll_acquire_shared(&tqueue_ptr->resize_lock);                                                                                                       \
        /* Codes_SRS_TQUEUE_01_014: [ TQUEUE_PUSH(T) shall execute the following actions until it is either able to push the item in the queue or the queue is full: ]*/ \
        do                                                                                                                                                          \
        {                                                                                                                                                           \
            /* Codes_SRS_TQUEUE_01_015: [ TQUEUE_PUSH(T) shall obtain the current head queue by calling interlocked_add_64. ]*/                                     \
            int64_t current_head = interlocked_add_64(&tqueue_ptr->head, 0);                                                                                        \
            /* Codes_SRS_TQUEUE_01_016: [ TQUEUE_PUSH(T) shall obtain the current tail queue by calling interlocked_add_64. ]*/                                     \
            int64_t current_tail = interlocked_add_64(&tqueue_ptr->tail, 0);                                                                                        \
            /* Codes_SRS_TQUEUE_01_060: [ If the queue is full (current head >= current tail + queue size): ]*/                                                     \
            if (current_head >= current_tail + tqueue_ptr->queue_size)                                                                                              \
            {                                                                                                                                                       \
                /* greater cannot really happen */                                                                                                                  \
                if (tqueue_ptr->queue_size >= tqueue_ptr->max_size)                                                                                                 \
                {                                                                                                                                                   \
                    /* Codes_SRS_TQUEUE_01_061: [ If the current queue size is equal to the max queue size, TQUEUE_PUSH(T) shall return TQUEUE_PUSH_QUEUE_FULL. ]*/ \
                    result = TQUEUE_PUSH_QUEUE_FULL;                                                                                                                \
                    break;                                                                                                                                          \
                }                                                                                                                                                   \
                else                                                                                                                                                \
                {                                                                                                                                                   \
                    /* Codes_SRS_TQUEUE_01_062: [ If the current queue size is less than the max queue size: ] */                                                   \
                    /* Codes_SRS_TQUEUE_01_063: [ TQUEUE_PUSH(T) shall release in shared mode the lock used to guard the growing of the queue. ] */                 \
                    srw_lock_ll_release_shared(&tqueue_ptr->resize_lock);                                                                                           \
                    /* Codes_SRS_TQUEUE_01_064: [ TQUEUE_PUSH(T) shall acquire in exclusive mode the lock used to guard the growing of the queue. ] */              \
                    srw_lock_ll_acquire_exclusive(&tqueue_ptr->resize_lock);                                                                                        \
                    /* Codes_SRS_TQUEUE_01_075: [ TQUEUE_PUSH(T) shall obtain again the current head or the queue. ]*/                                              \
                    current_head = interlocked_add_64(&tqueue_ptr->head, 0);                                                                                        \
                    /* Codes_SRS_TQUEUE_01_076: [ TQUEUE_PUSH(T) shall obtain again the current tail or the queue. ]*/                                              \
                    current_tail = interlocked_add_64(&tqueue_ptr->tail, 0);                                                                                        \
                    /* Codes_SRS_TQUEUE_01_074: [ If the size of the queue did not change after acquiring the lock in shared mode: ]*/                              \
                    if (current_head < current_tail + tqueue_ptr->queue_size)                                                                                       \
                    {                                                                                                                                               \
                        /* queue was resized by another thread or now there's space, do nothing */                                                                  \
                    }                                                                                                                                               \
                    else                                                                                                                                            \
                    {                                                                                                                                               \
                        /* Codes_SRS_TQUEUE_01_067: [ TQUEUE_PUSH(T) shall double the size of the queue. ]*/                                                        \
                        uint32_t new_queue_size = tqueue_ptr->queue_size * 2;                                                                                       \
                        if (new_queue_size > tqueue_ptr->max_size)                                                                                                  \
                        {                                                                                                                                           \
                            /* Codes_SRS_TQUEUE_01_070: [ If the newly computed queue size is higher than the max_queue_size value passed to TQUEUE_CREATE(T), TQUEUE_PUSH(T) shall use max_queue_size as the new queue size. ]*/ \
                            new_queue_size = tqueue_ptr->max_size;                                                                                                  \
                        }                                                                                                                                           \
                        /* Codes_SRS_TQUEUE_01_068: [ TQUEUE_PUSH(T) shall reallocate the array used to store the queue items based on the newly computed size. ]*/ \
                        TQUEUE_ENTRY_STRUCT_TYPE_NAME(T)* temp_queue = realloc_2(tqueue_ptr->queue, new_queue_size, sizeof(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T)));      \
                        if (temp_queue == NULL)                                                                                                                     \
                        {                                                                                                                                           \
                            /* Codes_SRS_TQUEUE_01_069: [ If reallocation fails, TQUEUE_PUSH(T) shall return TQUEUE_PUSH_ERROR. ]*/                                 \
                            LogError("realloc_2(tqueue_ptr->queue=%p, new_queue_size=%" PRIu32 ", sizeof(" MU_TOSTRING(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T)) ")=%zu) failed", \
                                tqueue_ptr->queue, new_queue_size, sizeof(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T)));                                                       \
                            result = TQUEUE_PUSH_ERROR;                                                                                                             \
                            /* Codes_SRS_TQUEUE_01_065: [ TQUEUE_PUSH(T) shall release in exclusive mode the lock used to guard the growing of the queue. ] */      \
                            srw_lock_ll_release_exclusive(&tqueue_ptr->resize_lock);                                                                                \
                            /* No need to take the lock again, we are going to return anyway */                                                                     \
                            unlock_needed = false;                                                                                                                  \
                            break;                                                                                                                                  \
                        }                                                                                                                                           \
                        else                                                                                                                                        \
                        {                                                                                                                                           \
                            tqueue_ptr->queue = temp_queue;                                                                                                         \
                            /* Codes_SRS_TQUEUE_01_077: [ TQUEUE_PUSH(T) shall move the entries between the tail index and the array end like below: ]*/            \
                            uint32_t elements_in_queue = (uint32_t)(current_head - current_tail);                                                                   \
                            uint32_t tail_index = current_tail % tqueue_ptr->queue_size;                                                                            \
                            uint32_t copy_item_count = tqueue_ptr->queue_size - tail_index;                                                                         \
                            uint32_t new_tail_index = new_queue_size - copy_item_count;                                                                             \
                            /* Codes_SRS_TQUEUE_01_078: [ Entries at the tail shall be moved to the end of the resized array ]*/                                    \
                            /* Please see diagram in the spec */                                                                                                    \
                            if (copy_item_count > 0)                                                                                                                \
                            {                                                                                                                                       \
                                (void)memmove(&tqueue_ptr->queue[new_tail_index], &tqueue_ptr->queue[tail_index], sizeof(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T)) * copy_item_count); \
                            }                                                                                                                                       \
                            for (uint32_t i = tail_index; i < new_tail_index; i++)                                                                                  \
                            {                                                                                                                                       \
                                (void)interlocked_exchange(&tqueue_ptr->queue[i].state, QUEUE_ENTRY_STATE_NOT_USED);                                                \
                            }                                                                                                                                       \
                            (void)interlocked_exchange_64(&tqueue_ptr->tail, new_tail_index);                                                                       \
                            (void)interlocked_exchange_64(&tqueue_ptr->head, new_tail_index + elements_in_queue);                                                   \
                            tqueue_ptr->queue_size = new_queue_size;                                                                                                \
                        }                                                                                                                                           \
                    }                                                                                                                                               \
                                                                                                                                                                    \
                    /* Codes_SRS_TQUEUE_01_065: [ TQUEUE_PUSH(T) shall release in exclusive mode the lock used to guard the growing of the queue. ] */              \
                    srw_lock_ll_release_exclusive(&tqueue_ptr->resize_lock);                                                                                        \
                    /* Codes_SRS_TQUEUE_01_066: [ TQUEUE_PUSH(T) shall acquire in shared mode the lock used to guard the growing of the queue. ] */                 \
                    srw_lock_ll_acquire_shared(&tqueue_ptr->resize_lock);                                                                                           \
                    continue;                                                                                                                                       \
                }                                                                                                                                                   \
            }                                                                                                                                                       \
            else                                                                                                                                                    \
            {                                                                                                                                                       \
                uint32_t index = (uint32_t)(current_head % tqueue_ptr->queue_size);                                                                                 \
                /* Codes_SRS_TQUEUE_01_017: [ Using interlocked_compare_exchange, TQUEUE_PUSH(T) shall change the head array entry state to PUSHING (from NOT_USED). ]*/ \
                if (interlocked_compare_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_PUSHING, QUEUE_ENTRY_STATE_NOT_USED) != QUEUE_ENTRY_STATE_NOT_USED) \
                {                                                                                                                                                   \
                    /* Codes_SRS_TQUEUE_01_023: [ If the state of the array entry corresponding to the head is not NOT_USED, TQUEUE_PUSH(T) shall retry the whole push. ]*/ \
                    /* likely queue full */                                                                                                                         \
                    continue;                                                                                                                                       \
                }                                                                                                                                                   \
                else                                                                                                                                                \
                {                                                                                                                                                   \
                    /* Codes_SRS_TQUEUE_01_018: [ Using interlocked_compare_exchange_64, TQUEUE_PUSH(T) shall replace the head value with the head value obtained earlier + 1. ]*/ \
                    if (interlocked_compare_exchange_64(&tqueue_ptr->head, current_head + 1, current_head) != current_head)                                         \
                    {                                                                                                                                               \
                        /* Codes_SRS_TQUEUE_01_043: [ If the queue head has changed, TQUEUE_PUSH(T) shall set the state back to NOT_USED and retry the push. ]*/    \
                        (void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_NOT_USED);                                                    \
                        continue;                                                                                                                                   \
                    }                                                                                                                                               \
                    else                                                                                                                                            \
                    {                                                                                                                                               \
                        if (tqueue_ptr->copy_item_function == NULL)                                                                                                 \
                        {                                                                                                                                           \
                            /* Codes_SRS_TQUEUE_01_019: [ If no copy_item_function was specified in TQUEUE_CREATE(T), TQUEUE_PUSH(T) shall copy the value of item into the array entry value whose state was changed to PUSHING. ]*/ \
                            (void)memcpy((void*)&tqueue_ptr->queue[index].value, (void*)item, sizeof(T));                                                           \
                        }                                                                                                                                           \
                        else                                                                                                                                        \
                        {                                                                                                                                           \
                            /* Codes_SRS_TQUEUE_01_024: [ If a copy_item_function was specified in TQUEUE_CREATE(T), TQUEUE_PUSH(T) shall call the copy_item_function with copy_item_function_context as context, a pointer to the array entry value whose state was changed to PUSHING as push_dst and item as push_src. ] */ \
                            tqueue_ptr->copy_item_function(copy_item_function_context, &tqueue_ptr->queue[index].value, item);                                      \
                        }                                                                                                                                           \
                        /* Codes_SRS_TQUEUE_01_020: [ TQUEUE_PUSH(T) shall set the state to USED by using interlocked_exchange. ]*/                                 \
                        (void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_USED);                                                        \
                        /* Codes_SRS_TQUEUE_01_021: [ TQUEUE_PUSH(T) shall succeed and return TQUEUE_PUSH_OK. ]*/                                                   \
                        result = TQUEUE_PUSH_OK;                                                                                                                    \
                        break;                                                                                                                                      \
                    }                                                                                                                                               \
                }                                                                                                                                                   \
            }                                                                                                                                                       \
        } while (1);                                                                                                                                                \
                                                                                                                                                                    \
        /* Codes_SRS_TQUEUE_01_059: [ TQUEUE_PUSH(T) shall release in shared mode the lock used to guard the growing of the queue. ] */                             \
        if (unlock_needed)                                                                                                                                          \
        {                                                                                                                                                           \
            srw_lock_ll_release_shared(&tqueue_ptr->resize_lock);                                                                                                   \
        }                                                                                                                                                           \
    }                                                                                                                                                               \
    return result;                                                                                                                                                  \
}                                                                                                                                                                   \