common/inc/c_pal/tqueue_ll.h (507 lines of code) (raw):
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
#ifndef TQUEUE_LL_H
#define TQUEUE_LL_H
#ifdef __cplusplus
#include <cinttypes>
#else // __cplusplus
#include <stdbool.h>
#include <inttypes.h>
#endif // __cplusplus
#include "c_pal/interlocked.h"
#include "c_pal/srw_lock_ll.h"
#include "c_pal/thandle_ll.h"
#include "umock_c/umock_c_prod.h"
#define TQUEUE_PUSH_RESULT_VALUES \
TQUEUE_PUSH_OK, \
TQUEUE_PUSH_INVALID_ARG, \
TQUEUE_PUSH_QUEUE_FULL, \
TQUEUE_PUSH_ERROR \
MU_DEFINE_ENUM(TQUEUE_PUSH_RESULT, TQUEUE_PUSH_RESULT_VALUES);
#define TQUEUE_POP_RESULT_VALUES \
TQUEUE_POP_OK, \
TQUEUE_POP_INVALID_ARG, \
TQUEUE_POP_QUEUE_EMPTY, \
TQUEUE_POP_REJECTED
MU_DEFINE_ENUM(TQUEUE_POP_RESULT, TQUEUE_POP_RESULT_VALUES);
#define QUEUE_ENTRY_STATE_VALUES \
QUEUE_ENTRY_STATE_NOT_USED, \
QUEUE_ENTRY_STATE_PUSHING, \
QUEUE_ENTRY_STATE_USED, \
QUEUE_ENTRY_STATE_POPPING \
MU_DEFINE_ENUM(QUEUE_ENTRY_STATE, QUEUE_ENTRY_STATE_VALUES);
/*TQUEUE is backed by a THANDLE build on the structure below*/
#define TQUEUE_STRUCT_TYPE_NAME_TAG(T) MU_C2(TQUEUE_TYPEDEF_NAME(T), _TAG)
#define TQUEUE_TYPEDEF_NAME(T) MU_C2(TQUEUE_STRUCT_, T)
#define TQUEUE_ENTRY_STRUCT_TYPE_NAME_TAG(T) MU_C2(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T), _TAG)
#define TQUEUE_ENTRY_STRUCT_TYPE_NAME(T) MU_C2(TQUEUE_ENTRY_STRUCT_, T)
/* This introduces the name for the copy item function */
#define TQUEUE_DEFINE_COPY_ITEM_FUNCTION_TYPE_NAME(T) MU_C2(TQUEUE_COPY_ITEM_FUNC_TYPE_, T)
#define TQUEUE_COPY_ITEM_FUNC(T) TQUEUE_DEFINE_COPY_ITEM_FUNCTION_TYPE_NAME(T)
/* This introduces the name for the dispose item function */
#define TQUEUE_DEFINE_DISPOSE_ITEM_FUNCTION_TYPE_NAME(T) MU_C2(TQUEUE_DISPOSE_ITEM_FUNC_TYPE_, T)
#define TQUEUE_DISPOSE_ITEM_FUNC(T) TQUEUE_DEFINE_DISPOSE_ITEM_FUNCTION_TYPE_NAME(T)
/* This introduces the name for the pop condition function */
#define TQUEUE_DEFINE_CONDITION_FUNCTION_TYPE_NAME(T) MU_C2(TQUEUE_CONDITION_FUNC_TYPE_, T)
#define TQUEUE_CONDITION_FUNC(T) TQUEUE_DEFINE_CONDITION_FUNCTION_TYPE_NAME(T)
/*TQUEUE_DEFINE_STRUCT_TYPE(T) introduces the base type that holds the queue typed as T*/
#define TQUEUE_DEFINE_STRUCT_TYPE(T) \
typedef void (*TQUEUE_DEFINE_COPY_ITEM_FUNCTION_TYPE_NAME(T))(void* context, T* dst, T* src); \
typedef void (*TQUEUE_DEFINE_DISPOSE_ITEM_FUNCTION_TYPE_NAME(T))(void* context, T* item); \
typedef bool (*TQUEUE_DEFINE_CONDITION_FUNCTION_TYPE_NAME(T))(void* context, T* item); \
typedef struct TQUEUE_ENTRY_STRUCT_TYPE_NAME_TAG(T) \
{ \
union \
{ \
volatile_atomic int32_t state; \
volatile_atomic QUEUE_ENTRY_STATE state_as_enum; \
}; \
T value; \
} TQUEUE_ENTRY_STRUCT_TYPE_NAME(T); \
typedef struct TQUEUE_STRUCT_TYPE_NAME_TAG(T) \
{ \
volatile_atomic int64_t head; \
volatile_atomic int64_t tail; \
TQUEUE_COPY_ITEM_FUNC(T) copy_item_function; \
TQUEUE_DISPOSE_ITEM_FUNC(T) dispose_item_function; \
void* dispose_item_function_context; \
uint32_t queue_size; \
uint32_t max_size; \
SRW_LOCK_LL resize_lock; \
TQUEUE_ENTRY_STRUCT_TYPE_NAME(T)* queue; \
} TQUEUE_TYPEDEF_NAME(T); \
/*TQUEUE is-a THANDLE*/
/*given a type "T" TQUEUE_LL(T) expands to the name of the type. */
#define TQUEUE_LL(T) THANDLE(TQUEUE_TYPEDEF_NAME(T))
/*because TQUEUE is a THANDLE, all THANDLE's macro APIs are useable with TQUEUE.*/
/*the below are just shortcuts of THANDLE's public ones*/
#define TQUEUE_LL_INITIALIZE(T) THANDLE_INITIALIZE(TQUEUE_TYPEDEF_NAME(T))
#define TQUEUE_LL_ASSIGN(T) THANDLE_ASSIGN(TQUEUE_TYPEDEF_NAME(T))
#define TQUEUE_LL_MOVE(T) THANDLE_MOVE(TQUEUE_TYPEDEF_NAME(T))
#define TQUEUE_LL_INITIALIZE_MOVE(T) THANDLE_INITIALIZE_MOVE(TQUEUE_TYPEDEF_NAME(T))
/*introduces a new name for a function that returns a TQUEUE_LL(T)*/
#define TQUEUE_LL_CREATE_NAME(C) MU_C2(TQUEUE_LL_CREATE_, C)
#define TQUEUE_LL_CREATE(C) TQUEUE_LL_CREATE_NAME(C)
/*introduces a new name for the push function */
#define TQUEUE_LL_PUSH_NAME(C) MU_C2(TQUEUE_LL_PUSH_, C)
#define TQUEUE_LL_PUSH(C) TQUEUE_LL_PUSH_NAME(C)
/*introduces a new name for the pop function */
#define TQUEUE_LL_POP_NAME(C) MU_C2(TQUEUE_LL_POP_, C)
#define TQUEUE_LL_POP(C) TQUEUE_LL_POP_NAME(C)
/*introduces a new name for the get_volatile_count function */
#define TQUEUE_LL_GET_VOLATILE_COUNT_NAME(C) MU_C2(TQUEUE_LL_GET_VOLATILE_COUNT_, C)
#define TQUEUE_LL_GET_VOLATILE_COUNT(C) TQUEUE_LL_GET_VOLATILE_COUNT_NAME(C)
/*introduces a function declaration for tqueue_create*/
#define TQUEUE_LL_CREATE_DECLARE(C, T) MOCKABLE_FUNCTION(, TQUEUE_LL(T), TQUEUE_LL_CREATE(C), uint32_t, initial_queue_size, uint32_t, max_queue_size, TQUEUE_COPY_ITEM_FUNC(T), copy_item_function, TQUEUE_DISPOSE_ITEM_FUNC(T), dispose_item_function, void*, dispose_item_function_context);
/*introduces a function declaration for tqueue_push*/
#define TQUEUE_LL_PUSH_DECLARE(C, T) MOCKABLE_FUNCTION(, TQUEUE_PUSH_RESULT, TQUEUE_LL_PUSH(C), TQUEUE_LL(T), tqueue, T*, item, void*, copy_item_function_context);
/*introduces a function declaration for tqueue_pop*/
#define TQUEUE_LL_POP_DECLARE(C, T) MOCKABLE_FUNCTION(, TQUEUE_POP_RESULT, TQUEUE_LL_POP(C), TQUEUE_LL(T), tqueue, T*, item, void*, copy_item_function_context, TQUEUE_CONDITION_FUNC(T), condition_function, void*, condition_function_context);
/*introduces a function declaration for tqueue_get_volatile_count*/
#define TQUEUE_LL_GET_VOLATILE_COUNT_DECLARE(C, T) MOCKABLE_FUNCTION(, int64_t, TQUEUE_LL_GET_VOLATILE_COUNT(C), TQUEUE_LL(T), tqueue);
/*introduces a name for the function that free's a TQUEUE when it's ref count got to 0*/
#define TQUEUE_LL_FREE_NAME(C) MU_C2(TQUEUE_LL_FREE_, C)
/*introduces a function definition for freeing the allocated resources for a TQUEUE*/
#define TQUEUE_LL_FREE_DEFINE(C, T) \
static void TQUEUE_LL_FREE_NAME(C)(TQUEUE_TYPEDEF_NAME(T)* tqueue) \
{ \
if (tqueue == NULL) \
{ \
LogError("invalid arguments " MU_TOSTRING(TQUEUE_TYPEDEF_NAME(T)) "* tqueue=%p", \
tqueue); \
} \
else \
{ \
if (tqueue->dispose_item_function == NULL) \
{ \
/* Codes_SRS_TQUEUE_01_008: [ If dispose_item_function passed to TQUEUE_CREATE(T) is NULL, TQUEUE_DISPOSE_FUNC(T) shall return. ]*/ \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_009: [ Otherwise, TQUEUE_DISPOSE_FUNC(T) shall obtain the current queue head by calling interlocked_add_64. ]*/ \
int64_t current_head = interlocked_add_64((volatile_atomic int64_t*)&tqueue->head, 0); \
/* Codes_SRS_TQUEUE_01_010: [ TQUEUE_DISPOSE_FUNC(T) shall obtain the current queue tail by calling interlocked_add_64. ]*/ \
int64_t current_tail = interlocked_add_64((volatile_atomic int64_t*)&tqueue->tail, 0); \
for (int64_t pos = current_tail; pos < current_head; pos++) \
{ \
uint32_t index = (uint32_t)(pos % tqueue->queue_size); \
/* Codes_SRS_TQUEUE_01_011: [ For each item in the queue, dispose_item_function shall be called with dispose_function_context and a pointer to the array entry value (T*). ]*/ \
tqueue->dispose_item_function(tqueue->dispose_item_function_context, &tqueue->queue[index].value); \
} \
} \
/* Codes_SRS_TQUEUE_01_056: [ The lock initialized in TQUEUE_CREATE(T) shall be de-initialized. ] */ \
srw_lock_ll_deinit(&tqueue->resize_lock); \
/* Codes_SRS_TQUEUE_01_057: [ The array backing the queue shall be freed. ] */ \
free(tqueue->queue); \
} \
} \
/*introduces a function definition for tqueue_create*/
#define TQUEUE_LL_CREATE_DEFINE(C, T) \
TQUEUE_LL(T) TQUEUE_LL_CREATE(C)(uint32_t initial_queue_size, uint32_t max_queue_size, TQUEUE_COPY_ITEM_FUNC(T) copy_item_function, TQUEUE_DISPOSE_ITEM_FUNC(T) dispose_item_function, void* dispose_item_function_context) \
{ \
TQUEUE_TYPEDEF_NAME(T)* result; \
bool is_copy_item_function_NULL = (copy_item_function == NULL); \
bool is_dispose_item_function_NULL = (dispose_item_function == NULL); \
if ( \
/* Codes_SRS_TQUEUE_01_046: [ If initial_queue_size is 0, TQUEUE_CREATE(T) shall fail and return NULL. ]*/ \
(initial_queue_size == 0) || \
/* Codes_SRS_TQUEUE_01_047: [ If initial_queue_size is greater than max_queue_size, TQUEUE_CREATE(T) shall fail and return NULL. ]*/ \
(initial_queue_size > max_queue_size) || \
/* Codes_SRS_TQUEUE_01_048: [ If any of copy_item_function and dispose_item_function are NULL and at least one of them is not NULL, TQUEUE_CREATE(T) shall fail and return NULL. ]*/ \
((is_copy_item_function_NULL || is_dispose_item_function_NULL) && \
!(is_copy_item_function_NULL && is_dispose_item_function_NULL)) \
) \
{ \
LogError("Invalid arguments: uint32_t initial_queue_size=%" PRIu32 ", uint32_t max_queue_size=%" PRIu32 ", " MU_TOSTRING(TQUEUE_COPY_ITEM_FUNC(T)) " copy_item_function=%p, " MU_TOSTRING(TQUEUE_DISPOSE_ITEM_FUNC(T)) " dispose_item_function=%p, void* dispose_item_function_context=%p", \
initial_queue_size, max_queue_size, copy_item_function, dispose_item_function, dispose_item_function_context); \
result = NULL; \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_049: [ TQUEUE_CREATE(T) shall call THANDLE_MALLOC with TQUEUE_DISPOSE_FUNC(T) as dispose function. ] */ \
result = THANDLE_MALLOC(TQUEUE_TYPEDEF_NAME(C))(TQUEUE_LL_FREE_NAME(C)); \
if (result == NULL) \
{ \
/* Codes_SRS_TQUEUE_01_071: [ If there are any failures then TQUEUE_CREATE(T) shall fail and return NULL. ]*/ \
LogError("failure in " MU_TOSTRING(THANDLE_MALLOC(TQUEUE_TYPEDEF_NAME(C))) ""); \
/*return as is*/ \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_050: [ TQUEUE_CREATE(T) shall allocate memory for an array of size size containing elements of type T. ] */ \
result->queue = malloc_2(initial_queue_size, sizeof(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T))); \
if (result->queue == NULL) \
{ \
/* Codes_SRS_TQUEUE_01_071: [ If there are any failures then TQUEUE_CREATE(T) shall fail and return NULL. ]*/ \
LogError("failure in malloc_2(%" PRIu32 ", sizeof(" MU_TOSTRING(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T)) ")=%zu)", \
initial_queue_size, sizeof(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T))); \
} \
else \
{ \
result->queue_size = initial_queue_size; \
result->max_size = max_queue_size; \
result->copy_item_function = copy_item_function; \
result->dispose_item_function = dispose_item_function; \
result->dispose_item_function_context = dispose_item_function_context; \
/* Codes_SRS_TQUEUE_01_051: [ TQUEUE_CREATE(T) shall initialize the head and tail of the list with 0 by using interlocked_exchange_64. ] */ \
(void)interlocked_exchange_64(&result->head, 0); \
(void)interlocked_exchange_64(&result->tail, 0); \
for (uint32_t i = 0; i < initial_queue_size; i++) \
{ \
/* Codes_SRS_TQUEUE_01_052: [ TQUEUE_CREATE(T) shall initialize the state for each entry in the array used for the queue with NOT_USED by using interlocked_exchange. ] */ \
(void)interlocked_exchange(&result->queue[i].state, QUEUE_ENTRY_STATE_NOT_USED); \
} \
/* Codes_SRS_TQUEUE_01_053: [ TQUEUE_CREATE(T) shall initialize a SRW_LOCK_LL to be used for locking the queue when it needs to grow in size. ] */ \
(void)srw_lock_ll_init(&result->resize_lock); \
/* Codes_SRS_TQUEUE_01_054: [ TQUEUE_CREATE(T) shall succeed and return a non-NULL value. ] */ \
/*return as is*/ \
goto all_ok; \
} \
THANDLE_FREE(TQUEUE_TYPEDEF_NAME(C))(result); \
result = NULL; \
} \
} \
all_ok: \
return result; \
}
/*introduces a function definition for tqueue_push*/
#define TQUEUE_LL_PUSH_DEFINE(C, T) \
TQUEUE_PUSH_RESULT TQUEUE_LL_PUSH(C)(TQUEUE_LL(T) tqueue, T* item, void* copy_item_function_context) \
{ \
TQUEUE_PUSH_RESULT result; \
if ( \
/* Codes_SRS_TQUEUE_01_012: [ If tqueue is NULL then TQUEUE_PUSH(T) shall fail and return TQUEUE_PUSH_INVALID_ARG. ]*/ \
(tqueue == NULL) || \
/* Codes_SRS_TQUEUE_01_013: [ If item is NULL then TQUEUE_PUSH(T) shall fail and return TQUEUE_PUSH_INVALID_ARG. ]*/ \
(item == NULL) \
) \
{ \
LogError("Invalid arguments: TQUEUE_LL(" MU_TOSTRING(T) ") tqueue=%p, const " MU_TOSTRING(T) "* item=%p, void* copy_item_function_context=%p", \
tqueue, item, copy_item_function_context); \
result = TQUEUE_PUSH_INVALID_ARG; \
} \
else \
{ \
TQUEUE_TYPEDEF_NAME(T)* tqueue_ptr = THANDLE_GET_T(TQUEUE_TYPEDEF_NAME(T))(tqueue); \
bool unlock_needed = true; \
/* Codes_SRS_TQUEUE_01_058: [ TQUEUE_PUSH(T) shall acquire in shared mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_acquire_shared(&tqueue_ptr->resize_lock); \
/* Codes_SRS_TQUEUE_01_014: [ TQUEUE_PUSH(T) shall execute the following actions until it is either able to push the item in the queue or the queue is full: ]*/ \
do \
{ \
/* Codes_SRS_TQUEUE_01_015: [ TQUEUE_PUSH(T) shall obtain the current head queue by calling interlocked_add_64. ]*/ \
int64_t current_head = interlocked_add_64(&tqueue_ptr->head, 0); \
/* Codes_SRS_TQUEUE_01_016: [ TQUEUE_PUSH(T) shall obtain the current tail queue by calling interlocked_add_64. ]*/ \
int64_t current_tail = interlocked_add_64(&tqueue_ptr->tail, 0); \
/* Codes_SRS_TQUEUE_01_060: [ If the queue is full (current head >= current tail + queue size): ]*/ \
if (current_head >= current_tail + tqueue_ptr->queue_size) \
{ \
/* greater cannot really happen */ \
if (tqueue_ptr->queue_size >= tqueue_ptr->max_size) \
{ \
/* Codes_SRS_TQUEUE_01_061: [ If the current queue size is equal to the max queue size, TQUEUE_PUSH(T) shall return TQUEUE_PUSH_QUEUE_FULL. ]*/ \
result = TQUEUE_PUSH_QUEUE_FULL; \
break; \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_062: [ If the current queue size is less than the max queue size: ] */ \
/* Codes_SRS_TQUEUE_01_063: [ TQUEUE_PUSH(T) shall release in shared mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_release_shared(&tqueue_ptr->resize_lock); \
/* Codes_SRS_TQUEUE_01_064: [ TQUEUE_PUSH(T) shall acquire in exclusive mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_acquire_exclusive(&tqueue_ptr->resize_lock); \
/* Codes_SRS_TQUEUE_01_075: [ TQUEUE_PUSH(T) shall obtain again the current head or the queue. ]*/ \
current_head = interlocked_add_64(&tqueue_ptr->head, 0); \
/* Codes_SRS_TQUEUE_01_076: [ TQUEUE_PUSH(T) shall obtain again the current tail or the queue. ]*/ \
current_tail = interlocked_add_64(&tqueue_ptr->tail, 0); \
/* Codes_SRS_TQUEUE_01_074: [ If the size of the queue did not change after acquiring the lock in shared mode: ]*/ \
if (current_head < current_tail + tqueue_ptr->queue_size) \
{ \
/* queue was resized by another thread or now there's space, do nothing */ \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_067: [ TQUEUE_PUSH(T) shall double the size of the queue. ]*/ \
uint32_t new_queue_size = tqueue_ptr->queue_size * 2; \
if (new_queue_size > tqueue_ptr->max_size) \
{ \
/* Codes_SRS_TQUEUE_01_070: [ If the newly computed queue size is higher than the max_queue_size value passed to TQUEUE_CREATE(T), TQUEUE_PUSH(T) shall use max_queue_size as the new queue size. ]*/ \
new_queue_size = tqueue_ptr->max_size; \
} \
/* Codes_SRS_TQUEUE_01_068: [ TQUEUE_PUSH(T) shall reallocate the array used to store the queue items based on the newly computed size. ]*/ \
TQUEUE_ENTRY_STRUCT_TYPE_NAME(T)* temp_queue = realloc_2(tqueue_ptr->queue, new_queue_size, sizeof(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T))); \
if (temp_queue == NULL) \
{ \
/* Codes_SRS_TQUEUE_01_069: [ If reallocation fails, TQUEUE_PUSH(T) shall return TQUEUE_PUSH_ERROR. ]*/ \
LogError("realloc_2(tqueue_ptr->queue=%p, new_queue_size=%" PRIu32 ", sizeof(" MU_TOSTRING(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T)) ")=%zu) failed", \
tqueue_ptr->queue, new_queue_size, sizeof(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T))); \
result = TQUEUE_PUSH_ERROR; \
/* Codes_SRS_TQUEUE_01_065: [ TQUEUE_PUSH(T) shall release in exclusive mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_release_exclusive(&tqueue_ptr->resize_lock); \
/* No need to take the lock again, we are going to return anyway */ \
unlock_needed = false; \
break; \
} \
else \
{ \
tqueue_ptr->queue = temp_queue; \
/* Codes_SRS_TQUEUE_01_077: [ TQUEUE_PUSH(T) shall move the entries between the tail index and the array end like below: ]*/ \
uint32_t elements_in_queue = (uint32_t)(current_head - current_tail); \
uint32_t tail_index = current_tail % tqueue_ptr->queue_size; \
uint32_t copy_item_count = tqueue_ptr->queue_size - tail_index; \
uint32_t new_tail_index = new_queue_size - copy_item_count; \
/* Codes_SRS_TQUEUE_01_078: [ Entries at the tail shall be moved to the end of the resized array ]*/ \
/* Please see diagram in the spec */ \
if (copy_item_count > 0) \
{ \
(void)memmove(&tqueue_ptr->queue[new_tail_index], &tqueue_ptr->queue[tail_index], sizeof(TQUEUE_ENTRY_STRUCT_TYPE_NAME(T)) * copy_item_count); \
} \
for (uint32_t i = tail_index; i < new_tail_index; i++) \
{ \
(void)interlocked_exchange(&tqueue_ptr->queue[i].state, QUEUE_ENTRY_STATE_NOT_USED); \
} \
(void)interlocked_exchange_64(&tqueue_ptr->tail, new_tail_index); \
(void)interlocked_exchange_64(&tqueue_ptr->head, new_tail_index + elements_in_queue); \
tqueue_ptr->queue_size = new_queue_size; \
} \
} \
\
/* Codes_SRS_TQUEUE_01_065: [ TQUEUE_PUSH(T) shall release in exclusive mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_release_exclusive(&tqueue_ptr->resize_lock); \
/* Codes_SRS_TQUEUE_01_066: [ TQUEUE_PUSH(T) shall acquire in shared mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_acquire_shared(&tqueue_ptr->resize_lock); \
continue; \
} \
} \
else \
{ \
uint32_t index = (uint32_t)(current_head % tqueue_ptr->queue_size); \
/* Codes_SRS_TQUEUE_01_017: [ Using interlocked_compare_exchange, TQUEUE_PUSH(T) shall change the head array entry state to PUSHING (from NOT_USED). ]*/ \
if (interlocked_compare_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_PUSHING, QUEUE_ENTRY_STATE_NOT_USED) != QUEUE_ENTRY_STATE_NOT_USED) \
{ \
/* Codes_SRS_TQUEUE_01_023: [ If the state of the array entry corresponding to the head is not NOT_USED, TQUEUE_PUSH(T) shall retry the whole push. ]*/ \
/* likely queue full */ \
continue; \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_018: [ Using interlocked_compare_exchange_64, TQUEUE_PUSH(T) shall replace the head value with the head value obtained earlier + 1. ]*/ \
if (interlocked_compare_exchange_64(&tqueue_ptr->head, current_head + 1, current_head) != current_head) \
{ \
/* Codes_SRS_TQUEUE_01_043: [ If the queue head has changed, TQUEUE_PUSH(T) shall set the state back to NOT_USED and retry the push. ]*/ \
(void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_NOT_USED); \
continue; \
} \
else \
{ \
if (tqueue_ptr->copy_item_function == NULL) \
{ \
/* Codes_SRS_TQUEUE_01_019: [ If no copy_item_function was specified in TQUEUE_CREATE(T), TQUEUE_PUSH(T) shall copy the value of item into the array entry value whose state was changed to PUSHING. ]*/ \
(void)memcpy((void*)&tqueue_ptr->queue[index].value, (void*)item, sizeof(T)); \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_024: [ If a copy_item_function was specified in TQUEUE_CREATE(T), TQUEUE_PUSH(T) shall call the copy_item_function with copy_item_function_context as context, a pointer to the array entry value whose state was changed to PUSHING as push_dst and item as push_src. ] */ \
tqueue_ptr->copy_item_function(copy_item_function_context, &tqueue_ptr->queue[index].value, item); \
} \
/* Codes_SRS_TQUEUE_01_020: [ TQUEUE_PUSH(T) shall set the state to USED by using interlocked_exchange. ]*/ \
(void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_USED); \
/* Codes_SRS_TQUEUE_01_021: [ TQUEUE_PUSH(T) shall succeed and return TQUEUE_PUSH_OK. ]*/ \
result = TQUEUE_PUSH_OK; \
break; \
} \
} \
} \
} while (1); \
\
/* Codes_SRS_TQUEUE_01_059: [ TQUEUE_PUSH(T) shall release in shared mode the lock used to guard the growing of the queue. ] */ \
if (unlock_needed) \
{ \
srw_lock_ll_release_shared(&tqueue_ptr->resize_lock); \
} \
} \
return result; \
} \
/*introduces a function definition for tqueue_pop*/
#define TQUEUE_LL_POP_DEFINE(C, T) \
TQUEUE_POP_RESULT TQUEUE_LL_POP(C)(TQUEUE_LL(T) tqueue, T* item, void* copy_item_function_context, TQUEUE_CONDITION_FUNC(T) condition_function, void* condition_function_context) \
{ \
TQUEUE_POP_RESULT result; \
if ( \
/* Codes_SRS_TQUEUE_01_025: [ If tqueue is NULL then TQUEUE_POP(T) shall fail and return TQUEUE_POP_INVALID_ARG. ]*/ \
(tqueue == NULL) || \
/* Codes_SRS_TQUEUE_01_027: [ If item is NULL then TQUEUE_POP(T) shall fail and return TQUEUE_POP_INVALID_ARG. ]*/ \
(item == NULL) \
) \
{ \
LogError("Invalid arguments: TQUEUE_LL(" MU_TOSTRING(T) ") tqueue=%p, " MU_TOSTRING(T) "*=%p, void* copy_item_function_context, TQUEUE_CONDITION_FUNC(T) condition_function, void* condition_function_context", \
tqueue, item); \
result = TQUEUE_POP_INVALID_ARG; \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_072: [ TQUEUE_POP(T) shall acquire in shared mode the lock used to guard the growing of the queue. ]*/ \
TQUEUE_TYPEDEF_NAME(T)* tqueue_ptr = THANDLE_GET_T(TQUEUE_TYPEDEF_NAME(T))(tqueue); \
srw_lock_ll_acquire_shared(&tqueue_ptr->resize_lock); \
{ \
/* Codes_SRS_TQUEUE_01_026: [ TQUEUE_POP(T) shall execute the following actions until it is either able to pop the item from the queue or the queue is empty: ] */ \
do \
{ \
/* Codes_SRS_TQUEUE_01_028: [ TQUEUE_POP(T) shall obtain the current head queue by calling interlocked_add_64. ]*/ \
int64_t current_head = interlocked_add_64(&tqueue_ptr->head, 0); \
/* Codes_SRS_TQUEUE_01_029: [ TQUEUE_POP(T) shall obtain the current tail queue by calling interlocked_add_64. ]*/ \
int64_t current_tail = interlocked_add_64(&tqueue_ptr->tail, 0); \
if (current_tail >= current_head) \
{ \
/* Codes_SRS_TQUEUE_01_035: [ If the queue is empty (current tail >= current head), TQUEUE_POP(T) shall return TQUEUE_POP_QUEUE_EMPTY. ]*/ \
result = TQUEUE_POP_QUEUE_EMPTY; \
break; \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_030: [ Using interlocked_compare_exchange, TQUEUE_PUSH(T) shall set the tail array entry state to POPPING (from USED). ]*/ \
uint32_t index = (uint32_t)(current_tail % tqueue_ptr->queue_size); \
if (interlocked_compare_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_POPPING, QUEUE_ENTRY_STATE_USED) != QUEUE_ENTRY_STATE_USED) \
{ \
/* Codes_SRS_TQUEUE_01_036: [ If the state of the array entry corresponding to the tail is not USED, TQUEUE_POP(T) shall try again. ]*/ \
continue; \
} \
else \
{ \
bool should_pop; \
/* Codes_SRS_TQUEUE_01_039: [ If condition_function is not NULL: ]*/ \
if (condition_function != NULL) \
{ \
/* Codes_SRS_TQUEUE_01_040: [ TQUEUE_POP(T) shall call condition_function with condition_function_context and a pointer to the array entry value whose state was changed to POPPING. ] */ \
should_pop = condition_function(condition_function_context, (T*)&tqueue_ptr->queue[index].value); \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_042: [ Otherwise, shall proceed with the pop. ]*/ \
should_pop = true; \
} \
if (!should_pop) \
{ \
/* Codes_SRS_TQUEUE_01_041: [ If condition_function returns false, TQUEUE_POP(T) shall set the state to USED by using interlocked_exchange and return TQUEUE_POP_REJECTED. ]*/ \
(void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_USED); \
result = TQUEUE_POP_REJECTED; \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_031: [ TQUEUE_POP(T) shall replace the tail value with the tail value obtained earlier + 1 by using interlocked_exchange_64. ]*/ \
if (interlocked_compare_exchange_64(&tqueue_ptr->tail, current_tail + 1, current_tail) != current_tail) \
{ \
/* Codes_SRS_TQUEUE_01_044: [ If incrementing the tail by using interlocked_compare_exchange_64 does not succeed, TQUEUE_POP(T) shall revert the state of the array entry to USED and retry. ]*/ \
(void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_USED); \
continue; \
} \
else \
{ \
if (tqueue_ptr->copy_item_function == NULL) \
{ \
/* Codes_SRS_TQUEUE_01_032: [ If a copy_item_function was not specified in TQUEUE_CREATE(T): ]*/ \
/* Codes_SRS_TQUEUE_01_033: [ TQUEUE_POP(T) shall copy array entry value whose state was changed to POPPING to item. ]*/ \
(void)memcpy((void*)item, (void*)&tqueue_ptr->queue[index].value, sizeof(T)); \
} \
else \
{ \
/* Codes_SRS_TQUEUE_01_037: [ If copy_item_function and sispose_item_function were specified in TQUEUE_CREATE(T): ]*/ \
/* Codes_SRS_TQUEUE_01_038: [ TQUEUE_POP(T) shall call copy_item_function with copy_item_function_context as context, the array entry value whose state was changed to POPPING to item as pop_src and item as pop_dst. ]*/ \
tqueue_ptr->copy_item_function(copy_item_function_context, item, (T*)&tqueue_ptr->queue[index].value); \
} \
if (tqueue_ptr->dispose_item_function != NULL) \
{ \
/* Codes_SRS_TQUEUE_01_045: [ TQUEUE_POP(T) shall call dispose_item_function with dispose_item_function_context as context and the array entry value whose state was changed to POPPING as item. ]*/ \
tqueue_ptr->dispose_item_function(tqueue_ptr->dispose_item_function_context, (T*)&tqueue_ptr->queue[index].value); \
} \
/* Codes_SRS_TQUEUE_01_034: [ TQUEUE_POP(T) shall set the state to NOT_USED by using interlocked_exchange, succeed and return TQUEUE_POP_OK. ]*/ \
(void)interlocked_exchange(&tqueue_ptr->queue[index].state, QUEUE_ENTRY_STATE_NOT_USED); \
result = TQUEUE_POP_OK; \
} \
} \
break; \
} \
} \
} while (1); \
/* Codes_SRS_TQUEUE_01_073: [ TQUEUE_POP(T) shall release in shared mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_release_shared(&tqueue_ptr->resize_lock); \
} \
} \
return result; \
} \
/*introduces a function definition for tqueue_pop*/
#define TQUEUE_LL_GET_VOLATILE_COUNT_DEFINE(C, T) \
int64_t TQUEUE_LL_GET_VOLATILE_COUNT(C)(TQUEUE_LL(T) tqueue) \
{ \
int64_t result; \
/* Codes_SRS_TQUEUE_22_001: [ If tqueue is NULL then TQUEUE_GET_VOLATILE_COUNT(T) shall return zero. ]*/ \
if (tqueue == NULL) \
{ \
LogError("Invalid arguments: TQUEUE_LL(" MU_TOSTRING(T) ") tqueue=%p.", tqueue); \
result = 0; \
} \
else \
{ \
TQUEUE_TYPEDEF_NAME(T)* tqueue_ptr = THANDLE_GET_T(TQUEUE_TYPEDEF_NAME(T))(tqueue); \
/* Codes_SRS_TQUEUE_01_080: [ TQUEUE_GET_VOLATILE_COUNT(T) shall acquire in shared mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_acquire_shared(&tqueue_ptr->resize_lock); \
{ \
int64_t current_tail = 0; \
int64_t current_head = 0; \
do \
{ \
/* Codes_SRS_TQUEUE_22_003: [ TQUEUE_GET_VOLATILE_COUNT(T) shall obtain the current tail queue by calling interlocked_add_64. ]*/ \
current_tail = interlocked_add_64(&tqueue_ptr->tail, 0); \
/* Codes_SRS_TQUEUE_22_002: [ TQUEUE_GET_VOLATILE_COUNT(T) shall obtain the current head queue by calling interlocked_add_64. ]*/ \
current_head = interlocked_add_64(&tqueue_ptr->head, 0); \
/* Codes_SRS_TQUEUE_22_006: [ TQUEUE_GET_VOLATILE_COUNT(T) shall obtain the current tail queue again by calling interlocked_add_64 and compare with the previosuly obtained tail value. The tail value is valid only if it has not changed. ]*/ \
} while (current_tail != interlocked_add_64(&tqueue_ptr->tail, 0)); \
\
if (current_tail >= current_head) \
{ \
/* Codes_SRS_TQUEUE_22_004: [ If the queue is empty (current tail >= current head), TQUEUE_GET_VOLATILE_COUNT(T) shall return zero. ]*/ \
result = 0; \
} \
else \
{ \
/* Codes_SRS_TQUEUE_22_005: [ TQUEUE_GET_VOLATILE_COUNT(T) shall return the item count of the queue. ]*/ \
result = current_head - current_tail; \
} \
/* Codes_SRS_TQUEUE_01_081: [ TQUEUE_GET_VOLATILE_COUNT(T) shall release in shared mode the lock used to guard the growing of the queue. ] */ \
srw_lock_ll_release_shared(&tqueue_ptr->resize_lock); \
} \
} \
return result; \
} \
/*macro to be used in headers*/ \
#define TQUEUE_LL_TYPE_DECLARE(C, T, ...) \
/*hint: have TQUEUE_DEFINE_STRUCT_TYPE(T) before TQUEUE_LL_TYPE_DECLARE*/ \
THANDLE_LL_TYPE_DECLARE(TQUEUE_TYPEDEF_NAME(C), TQUEUE_TYPEDEF_NAME(T)) \
TQUEUE_LL_CREATE_DECLARE(C, T) \
TQUEUE_LL_PUSH_DECLARE(C, T) \
TQUEUE_LL_POP_DECLARE(C, T) \
TQUEUE_LL_GET_VOLATILE_COUNT_DECLARE(C, T) \
/*macro to be used in .c*/ \
#define TQUEUE_LL_TYPE_DEFINE(C, T, ...) \
/*hint: have THANDLE_TYPE_DEFINE(TQUEUE_TYPEDEF_NAME(T)) before TQUEUE_LL_TYPE_DEFINE*/ \
TQUEUE_LL_FREE_DEFINE(C, T) \
TQUEUE_LL_CREATE_DEFINE(C, T) \
TQUEUE_LL_PUSH_DEFINE(C, T) \
TQUEUE_LL_POP_DEFINE(C, T) \
TQUEUE_LL_GET_VOLATILE_COUNT_DEFINE(C, T) \
#endif /*TQUEUE_LL_H*/