in common/inc/c_pal/tqueue_ll.h [240:395]
TQUEUE_PUSH_RESULT TQUEUE_LL_PUSH(C)(TQUEUE_LL(T) tqueue, T* item, void* copy_item_function_context) \
{ \
TQUEUE_PUSH_RESULT result; \
if ( \
/* Codes_SRS_TQUEUE_01_012: [ If tqueue is NULL then TQUEUE_PUSH(T) shall fail and return TQUEUE_PUSH_INVALID_ARG. ]*/ \
(tqueue == NULL) || \
/* Codes_SRS_TQUEUE_01_013: [ If item is NULL then TQUEUE_PUSH(T) shall fail and return TQUEUE_PUSH_INVALID_ARG. ]*/ \
(item == NULL) \
) \
{ \
LogError("Invalid arguments: TQUEUE_LL(" MU_TOSTRING(T) ") tqueue=%p, const " MU_TOSTRING(T) "* item=%p, void* copy_item_function_context=%p", \
tqueue, item, copy_item_function_context); \
result = TQUEUE_PUSH_INVALID_ARG; \
} \
else \
{ \
TQUEUE_TYPEDEF_NAME(T)* tqueue_ptr = THANDLE_GET_T(TQUEUE_TYPEDEF_NAME(T))(tqueue); \
bool unlock_needed = true; \
/* Codes_SRS_TQUEUE_01_058: [ TQUEUE_PUSH(T) shall acquire in shared mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_acquire_shared(&tqueue_ptr->resize_lock); \
/* Codes_SRS_TQUEUE_01_014: [ TQUEUE_PUSH(T) shall execute the following actions until it is either able to push the item in the queue or the queue is full: ]*/ \
do \
{ \
/* Codes_SRS_TQUEUE_01_015: [ TQUEUE_PUSH(T) shall obtain the current head queue by calling interlocked_add_64. ]*/ \
int64_t current_head = interlocked_add_64(&tqueue_ptr->head, 0); \
/* Codes_SRS_TQUEUE_01_016: [ TQUEUE_PUSH(T) shall obtain the current tail queue by calling interlocked_add_64. ]*/ \
int64_t current_tail = interlocked_add_64(&tqueue_ptr->tail, 0); \
/* Codes_SRS_TQUEUE_01_060: [ If the queue is full (current head >= current tail + queue size): ]*/ \
if (current_head >= current_tail + tqueue_ptr->queue_size) \
{ \
/* greater cannot really happen */ \
if (tqueue_ptr->queue_size >= tqueue_ptr->max_size) \
{ \
/* Codes_SRS_TQUEUE_01_061: [ If the current queue size is equal to the max queue size, TQUEUE_PUSH(T) shall return TQUEUE_PUSH_QUEUE_FULL. ]*/ \
result = TQUEUE_PUSH_QUEUE_FULL; \
break; \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_062: [ If the current queue size is less than the max queue size: ] */ \
/* Codes_SRS_TQUEUE_01_063: [ TQUEUE_PUSH(T) shall release in shared mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_release_shared(&tqueue_ptr->resize_lock); \
/* Codes_SRS_TQUEUE_01_064: [ TQUEUE_PUSH(T) shall acquire in exclusive mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_acquire_exclusive(&tqueue_ptr->resize_lock); \
/* Codes_SRS_TQUEUE_01_075: [ TQUEUE_PUSH(T) shall obtain again the current head or the queue. ]*/ \
current_head = interlocked_add_64(&tqueue_ptr->head, 0); \
/* Codes_SRS_TQUEUE_01_076: [ TQUEUE_PUSH(T) shall obtain again the current tail or the queue. ]*/ \
current_tail = interlocked_add_64(&tqueue_ptr->tail, 0); \
/* Codes_SRS_TQUEUE_01_074: [ If the size of the queue did not change after acquiring the lock in shared mode: ]*/ \
if (current_head < current_tail + tqueue_ptr->queue_size) \
{ \
/* queue was resized by another thread or now there's space, do nothing */ \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_067: [ TQUEUE_PUSH(T) shall double the size of the queue. ]*/ \
uint32_t new_queue_size = tqueue_ptr->queue_size * 2; \
if (new_queue_size > tqueue_ptr->max_size) \
{ \
/* Codes_SRS_TQUEUE_01_070: [ If the newly computed queue size is higher than the max_queue_size value passed to TQUEUE_CREATE(T), TQUEUE_PUSH(T) shall use max_queue_size as the new queue size. ]*/ \
new_queue_size = tqueue_ptr->max_size; \
} \
/* Codes_SRS_TQUEUE_01_068: [ TQUEUE_PUSH(T) shall reallocate the array used to store the queue items based on the newly computed size. ]*/ \
TQUEUE_ENTRY_STRUCT_TYPE_NAME(T)* temp_queue = realloc_2(tqueue_ptr->queue, new_queue_size, sizeof(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T))); \
if (temp_queue == NULL) \
{ \
/* Codes_SRS_TQUEUE_01_069: [ If reallocation fails, TQUEUE_PUSH(T) shall return TQUEUE_PUSH_ERROR. ]*/ \
LogError("realloc_2(tqueue_ptr->queue=%p, new_queue_size=%" PRIu32 ", sizeof(" MU_TOSTRING(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T)) ")=%zu) failed", \
tqueue_ptr->queue, new_queue_size, sizeof(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T))); \
result = TQUEUE_PUSH_ERROR; \
/* Codes_SRS_TQUEUE_01_065: [ TQUEUE_PUSH(T) shall release in exclusive mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_release_exclusive(&tqueue_ptr->resize_lock); \
/* No need to take the lock again, we are going to return anyway */ \
unlock_needed = false; \
break; \
} \
else \
{ \
tqueue_ptr->queue = temp_queue; \
/* Codes_SRS_TQUEUE_01_077: [ TQUEUE_PUSH(T) shall move the entries between the tail index and the array end like below: ]*/ \
uint32_t elements_in_queue = (uint32_t)(current_head - current_tail); \
uint32_t tail_index = current_tail % tqueue_ptr->queue_size; \
uint32_t copy_item_count = tqueue_ptr->queue_size - tail_index; \
uint32_t new_tail_index = new_queue_size - copy_item_count; \
/* Codes_SRS_TQUEUE_01_078: [ Entries at the tail shall be moved to the end of the resized array ]*/ \
/* Please see diagram in the spec */ \
if (copy_item_count > 0) \
{ \
(void)memmove(&tqueue_ptr->queue[new_tail_index], &tqueue_ptr->queue[tail_index], sizeof(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T)) * copy_item_count); \
} \
for (uint32_t i = tail_index; i < new_tail_index; i++) \
{ \
(void)interlocked_exchange(&tqueue_ptr->queue[i].state, QUEUE_ENTRY_STATE_NOT_USED); \
} \
(void)interlocked_exchange_64(&tqueue_ptr->tail, new_tail_index); \
(void)interlocked_exchange_64(&tqueue_ptr->head, new_tail_index + elements_in_queue); \
tqueue_ptr->queue_size = new_queue_size; \
} \
} \
\
/* Codes_SRS_TQUEUE_01_065: [ TQUEUE_PUSH(T) shall release in exclusive mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_release_exclusive(&tqueue_ptr->resize_lock); \
/* Codes_SRS_TQUEUE_01_066: [ TQUEUE_PUSH(T) shall acquire in shared mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_acquire_shared(&tqueue_ptr->resize_lock); \
continue; \
} \
} \
else \
{ \
uint32_t index = (uint32_t)(current_head % tqueue_ptr->queue_size); \
/* Codes_SRS_TQUEUE_01_017: [ Using interlocked_compare_exchange, TQUEUE_PUSH(T) shall change the head array entry state to PUSHING (from NOT_USED). ]*/ \
if (interlocked_compare_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_PUSHING, QUEUE_ENTRY_STATE_NOT_USED) != QUEUE_ENTRY_STATE_NOT_USED) \
{ \
/* Codes_SRS_TQUEUE_01_023: [ If the state of the array entry corresponding to the head is not NOT_USED, TQUEUE_PUSH(T) shall retry the whole push. ]*/ \
/* likely queue full */ \
continue; \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_018: [ Using interlocked_compare_exchange_64, TQUEUE_PUSH(T) shall replace the head value with the head value obtained earlier + 1. ]*/ \
if (interlocked_compare_exchange_64(&tqueue_ptr->head, current_head + 1, current_head) != current_head) \
{ \
/* Codes_SRS_TQUEUE_01_043: [ If the queue head has changed, TQUEUE_PUSH(T) shall set the state back to NOT_USED and retry the push. ]*/ \
(void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_NOT_USED); \
continue; \
} \
else \
{ \
if (tqueue_ptr->copy_item_function == NULL) \
{ \
/* Codes_SRS_TQUEUE_01_019: [ If no copy_item_function was specified in TQUEUE_CREATE(T), TQUEUE_PUSH(T) shall copy the value of item into the array entry value whose state was changed to PUSHING. ]*/ \
(void)memcpy((void*)&tqueue_ptr->queue[index].value, (void*)item, sizeof(T)); \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_024: [ If a copy_item_function was specified in TQUEUE_CREATE(T), TQUEUE_PUSH(T) shall call the copy_item_function with copy_item_function_context as context, a pointer to the array entry value whose state was changed to PUSHING as push_dst and item as push_src. ] */ \
tqueue_ptr->copy_item_function(copy_item_function_context, &tqueue_ptr->queue[index].value, item); \
} \
/* Codes_SRS_TQUEUE_01_020: [ TQUEUE_PUSH(T) shall set the state to USED by using interlocked_exchange. ]*/ \
(void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_USED); \
/* Codes_SRS_TQUEUE_01_021: [ TQUEUE_PUSH(T) shall succeed and return TQUEUE_PUSH_OK. ]*/ \
result = TQUEUE_PUSH_OK; \
break; \
} \
} \
} \
} while (1); \
\
/* Codes_SRS_TQUEUE_01_059: [ TQUEUE_PUSH(T) shall release in shared mode the lock used to guard the growing of the queue. ] */ \
if (unlock_needed) \
{ \
srw_lock_ll_release_shared(&tqueue_ptr->resize_lock); \
} \
} \
return result; \
} \