TQUEUE_POP_RESULT TQUEUE_LL_POP()

in common/inc/c_pal/tqueue_ll.h [399:503]


TQUEUE_POP_RESULT TQUEUE_LL_POP(C)(TQUEUE_LL(T) tqueue, T* item, void* copy_item_function_context, TQUEUE_CONDITION_FUNC(T) condition_function, void* condition_function_context) \
{                                                                                                                                                                   \
    TQUEUE_POP_RESULT result;                                                                                                                                       \
    if (                                                                                                                                                            \
        /* Codes_SRS_TQUEUE_01_025: [ If tqueue is NULL then TQUEUE_POP(T) shall fail and return TQUEUE_POP_INVALID_ARG. ]*/                                        \
        (tqueue == NULL) ||                                                                                                                                         \
        /* Codes_SRS_TQUEUE_01_027: [ If item is NULL then TQUEUE_POP(T) shall fail and return TQUEUE_POP_INVALID_ARG. ]*/                                          \
        (item == NULL)                                                                                                                                              \
       )                                                                                                                                                            \
    {                                                                                                                                                               \
        LogError("Invalid arguments: TQUEUE_LL(" MU_TOSTRING(T) ") tqueue=%p, " MU_TOSTRING(T) "*=%p, void* copy_item_function_context, TQUEUE_CONDITION_FUNC(T) condition_function, void* condition_function_context", \
            tqueue, item);                                                                                                                                          \
        result = TQUEUE_POP_INVALID_ARG;                                                                                                                            \
    }                                                                                                                                                               \
    else                                                                                                                                                            \
    {                                                                                                                                                               \
        /* Codes_SRS_TQUEUE_01_072: [ TQUEUE_POP(T) shall acquire in shared mode the lock used to guard the growing of the queue. ]*/                               \
        TQUEUE_TYPEDEF_NAME(T)* tqueue_ptr = THANDLE_GET_T(TQUEUE_TYPEDEF_NAME(T))(tqueue);                                                                         \
        srw_lock_ll_acquire_shared(&tqueue_ptr->resize_lock);                                                                                                       \
        {                                                                                                                                                           \
            /* Codes_SRS_TQUEUE_01_026: [ TQUEUE_POP(T) shall execute the following actions until it is either able to pop the item from the queue or the queue is empty: ] */ \
            do                                                                                                                                                      \
            {                                                                                                                                                       \
                /* Codes_SRS_TQUEUE_01_028: [ TQUEUE_POP(T) shall obtain the current head queue by calling interlocked_add_64. ]*/                                  \
                int64_t current_head = interlocked_add_64(&tqueue_ptr->head, 0);                                                                                    \
                /* Codes_SRS_TQUEUE_01_029: [ TQUEUE_POP(T) shall obtain the current tail queue by calling interlocked_add_64. ]*/                                  \
                int64_t current_tail = interlocked_add_64(&tqueue_ptr->tail, 0);                                                                                    \
                if (current_tail >= current_head)                                                                                                                   \
                {                                                                                                                                                   \
                    /* Codes_SRS_TQUEUE_01_035: [ If the queue is empty (current tail >= current head), TQUEUE_POP(T) shall return TQUEUE_POP_QUEUE_EMPTY. ]*/      \
                    result = TQUEUE_POP_QUEUE_EMPTY;                                                                                                                \
                    break;                                                                                                                                          \
                }                                                                                                                                                   \
                else                                                                                                                                                \
                {                                                                                                                                                   \
                    /* Codes_SRS_TQUEUE_01_030: [ Using interlocked_compare_exchange, TQUEUE_PUSH(T) shall set the tail array entry state to POPPING (from USED). ]*/ \
                    uint32_t index = (uint32_t)(current_tail % tqueue_ptr->queue_size);                                                                             \
                    if (interlocked_compare_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_POPPING, QUEUE_ENTRY_STATE_USED) != QUEUE_ENTRY_STATE_USED) \
                    {                                                                                                                                               \
                        /* Codes_SRS_TQUEUE_01_036: [ If the state of the array entry corresponding to the tail is not USED, TQUEUE_POP(T) shall try again. ]*/     \
                        continue;                                                                                                                                   \
                    }                                                                                                                                               \
                    else                                                                                                                                            \
                    {                                                                                                                                               \
                        bool should_pop;                                                                                                                            \
                        /* Codes_SRS_TQUEUE_01_039: [ If condition_function is not NULL: ]*/                                                                        \
                        if (condition_function != NULL)                                                                                                             \
                        {                                                                                                                                           \
                            /* Codes_SRS_TQUEUE_01_040: [ TQUEUE_POP(T) shall call condition_function with condition_function_context and a pointer to the array entry value whose state was changed to POPPING. ] */ \
                            should_pop = condition_function(condition_function_context, (T*)&tqueue_ptr->queue[index].value);                                       \
                        }                                                                                                                                           \
                        else                                                                                                                                        \
                        {                                                                                                                                           \
                            /* Codes_SRS_TQUEUE_01_042: [ Otherwise, shall proceed with the pop. ]*/                                                                \
                            should_pop = true;                                                                                                                      \
                        }                                                                                                                                           \
                        if (!should_pop)                                                                                                                            \
                        {                                                                                                                                           \
                            /* Codes_SRS_TQUEUE_01_041: [ If condition_function returns false, TQUEUE_POP(T) shall set the state to USED by using interlocked_exchange and return TQUEUE_POP_REJECTED. ]*/ \
                            (void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_USED);                                                    \
                            result = TQUEUE_POP_REJECTED;                                                                                                           \
                        }                                                                                                                                           \
                        else                                                                                                                                        \
                        {                                                                                                                                           \
                            /* Codes_SRS_TQUEUE_01_031: [ TQUEUE_POP(T) shall replace the tail value with the tail value obtained earlier + 1 by using interlocked_exchange_64. ]*/ \
                            if (interlocked_compare_exchange_64(&tqueue_ptr->tail, current_tail + 1, current_tail) != current_tail)                                 \
                            {                                                                                                                                       \
                                /* Codes_SRS_TQUEUE_01_044: [ If incrementing the tail by using interlocked_compare_exchange_64 does not succeed, TQUEUE_POP(T) shall revert the state of the array entry to USED and retry. ]*/ \
                                (void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_USED);                                                \
                                continue;                                                                                                                           \
                            }                                                                                                                                       \
                            else                                                                                                                                    \
                            {                                                                                                                                       \
                                if (tqueue_ptr->copy_item_function == NULL)                                                                                         \
                                {                                                                                                                                   \
                                    /* Codes_SRS_TQUEUE_01_032: [ If a copy_item_function was not specified in TQUEUE_CREATE(T): ]*/                                \
                                    /* Codes_SRS_TQUEUE_01_033: [ TQUEUE_POP(T) shall copy array entry value whose state was changed to POPPING to item. ]*/        \
                                    (void)memcpy((void*)item, (void*)&tqueue_ptr->queue[index].value, sizeof(T));                                                   \
                                }                                                                                                                                   \
                                else                                                                                                                                \
                                {                                                                                                                                   \
                                    /* Codes_SRS_TQUEUE_01_037: [ If copy_item_function and sispose_item_function were specified in TQUEUE_CREATE(T): ]*/           \
                                    /* Codes_SRS_TQUEUE_01_038: [ TQUEUE_POP(T) shall call copy_item_function with copy_item_function_context as context, the array entry value whose state was changed to POPPING to item as pop_src and item as pop_dst. ]*/ \
                                    tqueue_ptr->copy_item_function(copy_item_function_context, item, (T*)&tqueue_ptr->queue[index].value);                          \
                                }                                                                                                                                   \
                                if (tqueue_ptr->dispose_item_function != NULL)                                                                                      \
                                {                                                                                                                                   \
                                    /* Codes_SRS_TQUEUE_01_045: [ TQUEUE_POP(T) shall call dispose_item_function with dispose_item_function_context as context and the array entry value whose state was changed to POPPING as item. ]*/ \
                                    tqueue_ptr->dispose_item_function(tqueue_ptr->dispose_item_function_context, (T*)&tqueue_ptr->queue[index].value);              \
                                }                                                                                                                                   \
                                /* Codes_SRS_TQUEUE_01_034: [ TQUEUE_POP(T) shall set the state to NOT_USED by using interlocked_exchange, succeed and return TQUEUE_POP_OK. ]*/ \
                                (void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_NOT_USED);                      \
                                result = TQUEUE_POP_OK;                                                                                                             \
                            }                                                                                                                                       \
                        }                                                                                                                                           \
                        break;                                                                                                                                      \
                    }                                                                                                                                               \
                }                                                                                                                                                   \
            } while (1);                                                                                                                                            \
            /* Codes_SRS_TQUEUE_01_073: [ TQUEUE_POP(T) shall release in shared mode the lock used to guard the growing of the queue. ] */                          \
            srw_lock_ll_release_shared(&tqueue_ptr->resize_lock);                                                                                                   \
        }                                                                                                                                                           \
    }                                                                                                                                                               \
    return result;                                                                                                                                                  \
}                                                                                                                                                                   \