in common/inc/c_pal/tqueue_ll.h [399:503]
TQUEUE_POP_RESULT TQUEUE_LL_POP(C)(TQUEUE_LL(T) tqueue, T* item, void* copy_item_function_context, TQUEUE_CONDITION_FUNC(T) condition_function, void* condition_function_context) \
{ \
TQUEUE_POP_RESULT result; \
if ( \
/* Codes_SRS_TQUEUE_01_025: [ If tqueue is NULL then TQUEUE_POP(T) shall fail and return TQUEUE_POP_INVALID_ARG. ]*/ \
(tqueue == NULL) || \
/* Codes_SRS_TQUEUE_01_027: [ If item is NULL then TQUEUE_POP(T) shall fail and return TQUEUE_POP_INVALID_ARG. ]*/ \
(item == NULL) \
) \
{ \
LogError("Invalid arguments: TQUEUE_LL(" MU_TOSTRING(T) ") tqueue=%p, " MU_TOSTRING(T) "*=%p, void* copy_item_function_context, TQUEUE_CONDITION_FUNC(T) condition_function, void* condition_function_context", \
tqueue, item); \
result = TQUEUE_POP_INVALID_ARG; \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_072: [ TQUEUE_POP(T) shall acquire in shared mode the lock used to guard the growing of the queue. ]*/ \
TQUEUE_TYPEDEF_NAME(T)* tqueue_ptr = THANDLE_GET_T(TQUEUE_TYPEDEF_NAME(T))(tqueue); \
srw_lock_ll_acquire_shared(&tqueue_ptr->resize_lock); \
{ \
/* Codes_SRS_TQUEUE_01_026: [ TQUEUE_POP(T) shall execute the following actions until it is either able to pop the item from the queue or the queue is empty: ] */ \
do \
{ \
/* Codes_SRS_TQUEUE_01_028: [ TQUEUE_POP(T) shall obtain the current head queue by calling interlocked_add_64. ]*/ \
int64_t current_head = interlocked_add_64(&tqueue_ptr->head, 0); \
/* Codes_SRS_TQUEUE_01_029: [ TQUEUE_POP(T) shall obtain the current tail queue by calling interlocked_add_64. ]*/ \
int64_t current_tail = interlocked_add_64(&tqueue_ptr->tail, 0); \
if (current_tail >= current_head) \
{ \
/* Codes_SRS_TQUEUE_01_035: [ If the queue is empty (current tail >= current head), TQUEUE_POP(T) shall return TQUEUE_POP_QUEUE_EMPTY. ]*/ \
result = TQUEUE_POP_QUEUE_EMPTY; \
break; \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_030: [ Using interlocked_compare_exchange, TQUEUE_PUSH(T) shall set the tail array entry state to POPPING (from USED). ]*/ \
uint32_t index = (uint32_t)(current_tail % tqueue_ptr->queue_size); \
if (interlocked_compare_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_POPPING, QUEUE_ENTRY_STATE_USED) != QUEUE_ENTRY_STATE_USED) \
{ \
/* Codes_SRS_TQUEUE_01_036: [ If the state of the array entry corresponding to the tail is not USED, TQUEUE_POP(T) shall try again. ]*/ \
continue; \
} \
else \
{ \
bool should_pop; \
/* Codes_SRS_TQUEUE_01_039: [ If condition_function is not NULL: ]*/ \
if (condition_function != NULL) \
{ \
/* Codes_SRS_TQUEUE_01_040: [ TQUEUE_POP(T) shall call condition_function with condition_function_context and a pointer to the array entry value whose state was changed to POPPING. ] */ \
should_pop = condition_function(condition_function_context, (T*)&tqueue_ptr->queue[index].value); \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_042: [ Otherwise, shall proceed with the pop. ]*/ \
should_pop = true; \
} \
if (!should_pop) \
{ \
/* Codes_SRS_TQUEUE_01_041: [ If condition_function returns false, TQUEUE_POP(T) shall set the state to USED by using interlocked_exchange and return TQUEUE_POP_REJECTED. ]*/ \
(void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_USED); \
result = TQUEUE_POP_REJECTED; \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_031: [ TQUEUE_POP(T) shall replace the tail value with the tail value obtained earlier + 1 by using interlocked_exchange_64. ]*/ \
if (interlocked_compare_exchange_64(&tqueue_ptr->tail, current_tail + 1, current_tail) != current_tail) \
{ \
/* Codes_SRS_TQUEUE_01_044: [ If incrementing the tail by using interlocked_compare_exchange_64 does not succeed, TQUEUE_POP(T) shall revert the state of the array entry to USED and retry. ]*/ \
(void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_USED); \
continue; \
} \
else \
{ \
if (tqueue_ptr->copy_item_function == NULL) \
{ \
/* Codes_SRS_TQUEUE_01_032: [ If a copy_item_function was not specified in TQUEUE_CREATE(T): ]*/ \
/* Codes_SRS_TQUEUE_01_033: [ TQUEUE_POP(T) shall copy array entry value whose state was changed to POPPING to item. ]*/ \
(void)memcpy((void*)item, (void*)&tqueue_ptr->queue[index].value, sizeof(T)); \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_037: [ If copy_item_function and sispose_item_function were specified in TQUEUE_CREATE(T): ]*/ \
/* Codes_SRS_TQUEUE_01_038: [ TQUEUE_POP(T) shall call copy_item_function with copy_item_function_context as context, the array entry value whose state was changed to POPPING to item as pop_src and item as pop_dst. ]*/ \
tqueue_ptr->copy_item_function(copy_item_function_context, item, (T*)&tqueue_ptr->queue[index].value); \
} \
if (tqueue_ptr->dispose_item_function != NULL) \
{ \
/* Codes_SRS_TQUEUE_01_045: [ TQUEUE_POP(T) shall call dispose_item_function with dispose_item_function_context as context and the array entry value whose state was changed to POPPING as item. ]*/ \
tqueue_ptr->dispose_item_function(tqueue_ptr->dispose_item_function_context, (T*)&tqueue_ptr->queue[index].value); \
} \
/* Codes_SRS_TQUEUE_01_034: [ TQUEUE_POP(T) shall set the state to NOT_USED by using interlocked_exchange, succeed and return TQUEUE_POP_OK. ]*/ \
(void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_NOT_USED); \
result = TQUEUE_POP_OK; \
} \
} \
break; \
} \
} \
} while (1); \
/* Codes_SRS_TQUEUE_01_073: [ TQUEUE_POP(T) shall release in shared mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_release_shared(&tqueue_ptr->resize_lock); \
} \
} \
return result; \
} \