CLDS_SORTED_LIST_INSERT_RESULT clds_sorted_list_insert()

in src/clds_sorted_list.c [725:983]


CLDS_SORTED_LIST_INSERT_RESULT clds_sorted_list_insert(CLDS_SORTED_LIST_HANDLE clds_sorted_list, CLDS_HAZARD_POINTERS_THREAD_HANDLE clds_hazard_pointers_thread, CLDS_SORTED_LIST_ITEM* item, int64_t* sequence_number)
{
    CLDS_SORTED_LIST_INSERT_RESULT result;

    if (
        /* Codes_SRS_CLDS_SORTED_LIST_01_011: [ If clds_sorted_list is NULL, clds_sorted_list_insert shall fail and return CLDS_SORTED_LIST_INSERT_ERROR. ]*/
        (clds_sorted_list == NULL) ||
        /* Codes_SRS_CLDS_SORTED_LIST_01_012: [ If item is NULL, clds_sorted_list_insert shall fail and return CLDS_SORTED_LIST_INSERT_ERROR. ]*/
        (item == NULL) ||
        /* Codes_SRS_CLDS_SORTED_LIST_01_013: [ If clds_hazard_pointers_thread is NULL, clds_sorted_list_insert shall fail and return CLDS_SORTED_LIST_INSERT_ERROR. ]*/
        (clds_hazard_pointers_thread == NULL) ||
        /* Codes_SRS_CLDS_SORTED_LIST_01_062: [ If the sequence_number argument is non-NULL, but no start sequence number was specified in clds_sorted_list_create, clds_sorted_list_insert shall fail and return CLDS_SORTED_LIST_INSERT_ERROR. ]*/
        ((sequence_number != NULL) && (clds_sorted_list->sequence_number == NULL))
        )
    {
        LogError("Invalid arguments: clds_sorted_list = %p, item = %p, clds_hazard_pointers_thread = %p, sequence_number = %p",
            clds_sorted_list, item, clds_hazard_pointers_thread, sequence_number);
        result = CLDS_SORTED_LIST_INSERT_ERROR;
    }
    else
    {
        /*Codes_SRS_CLDS_SORTED_LIST_42_001: [ clds_sorted_list_insert shall try the following until it acquires a write lock for the list: ]*/
        /*Codes_SRS_CLDS_SORTED_LIST_42_002: [ clds_sorted_list_insert shall increment the count of pending write operations. ]*/
        /*Codes_SRS_CLDS_SORTED_LIST_42_003: [ If the counter to lock the list for writes is non-zero then: ]*/
        /*Codes_SRS_CLDS_SORTED_LIST_42_004: [ clds_sorted_list_insert shall decrement the count of pending write operations. ]*/
        /*Codes_SRS_CLDS_SORTED_LIST_42_005: [ clds_sorted_list_insert shall wait for the counter to lock the list for writes to reach 0 and repeat. ]*/
        check_lock_and_begin_write_operation(clds_sorted_list);

        bool restart_needed;
        void* new_item_key = clds_sorted_list->get_item_key_cb(clds_sorted_list->get_item_key_cb_context, item);
        int64_t local_seq_no = 0;

        /* Codes_SRS_CLDS_SORTED_LIST_01_069: [ If no start sequence number was provided in clds_sorted_list_create and sequence_number is NULL, no sequence number computations shall be done. ]*/
        if (clds_sorted_list->sequence_number != NULL)
        {
            /* Codes_SRS_CLDS_SORTED_LIST_01_060: [ For each insert the order of the operation shall be computed based on the start sequence number passed to clds_sorted_list_create. ]*/
            local_seq_no = interlocked_increment_64(clds_sorted_list->sequence_number);

            /* Codes_SRS_CLDS_SORTED_LIST_01_061: [ If the sequence_number argument passed to clds_sorted_list_insert is NULL, the computed sequence number for the insert shall still be computed but it shall not be provided to the user. ]*/
            if (sequence_number != NULL)
            {
                *sequence_number = local_seq_no;
            }
        }

        /* Codes_SRS_CLDS_SORTED_LIST_01_047: [ clds_sorted_list_insert shall insert the item at its correct location making sure that items in the list are sorted according to the order given by item keys. ]*/

        do
        {
            CLDS_HAZARD_POINTER_RECORD_HANDLE previous_hp = NULL;
            CLDS_SORTED_LIST_ITEM* previous_item = NULL;
            CLDS_SORTED_LIST_ITEM* volatile_atomic* current_item_address = (CLDS_SORTED_LIST_ITEM* volatile_atomic*)&clds_sorted_list->head;
            result = CLDS_SORTED_LIST_INSERT_ERROR;
            uint64_t iteration_count = 0;

            do
            {
                if (++iteration_count > ITERATION_COUNT_LOG_LIMIT)
                {
                    LogInfo("clds_sorted_list_insert spun for %" PRIu64 " iterations", (uint64_t)ITERATION_COUNT_LOG_LIMIT);
                    iteration_count = 0;
                }

                // get the current_item value
                CLDS_SORTED_LIST_ITEM* current_item = interlocked_compare_exchange_pointer((void* volatile_atomic*)current_item_address, NULL, NULL);

                // clear any delete lock bit from what we read
                current_item = (CLDS_SORTED_LIST_ITEM*)((uintptr_t)current_item & ~0x1);

                // check if the item is NULL
                if (current_item == NULL)
                {
                    item->next = NULL;

                    // not found, so insert it here
                    if (previous_item != NULL)
                    {
                        // have a previous item, try to replace the NULL with the new item
                        // if there is something else than NULL there, restart, there were some major changes
                        if (interlocked_compare_exchange_pointer((void* volatile_atomic*)&previous_item->next, (void*)item, (void*)current_item) != NULL)
                        {
                            // let go of previous hazard pointer
                            clds_hazard_pointers_release(clds_hazard_pointers_thread, previous_hp);
                            restart_needed = true;
                            break;
                        }
                        else
                        {
                            clds_hazard_pointers_release(clds_hazard_pointers_thread, previous_hp);
                            restart_needed = false;

                            /* Codes_SRS_CLDS_SORTED_LIST_01_010: [ On success clds_sorted_list_insert shall return CLDS_SORTED_LIST_INSERT_OK. ]*/
                            result = CLDS_SORTED_LIST_INSERT_OK;
                            break;
                        }
                    }
                    else
                    {
                        // no previous item, replace the head, make sure it is a "clean" NULL, no lock bit set
                        if (interlocked_compare_exchange_pointer((void* volatile_atomic*)&clds_sorted_list->head, item, NULL) != NULL)
                        {
                            restart_needed = true;
                            break;
                        }
                        else
                        {
                            // insert done
                            restart_needed = false;

                            /* Codes_SRS_CLDS_SORTED_LIST_01_010: [ On success clds_sorted_list_insert shall return CLDS_SORTED_LIST_INSERT_OK. ]*/
                            result = CLDS_SORTED_LIST_INSERT_OK;
                            break;
                        }
                    }

                    break;
                }
                else
                {
                    // acquire hazard pointer
                    CLDS_HAZARD_POINTER_RECORD_HANDLE current_item_hp = clds_hazard_pointers_acquire(clds_hazard_pointers_thread, (void*)current_item);
                    if (current_item_hp == NULL)
                    {
                        if (previous_hp != NULL)
                        {
                            // let go of previous hazard pointer
                            clds_hazard_pointers_release(clds_hazard_pointers_thread, previous_hp);
                        }

                        if (clds_sorted_list->skipped_seq_no_cb != NULL)
                        {
                            /* Codes_SRS_CLDS_SORTED_LIST_01_079: [** If sequence numbers are generated and a skipped sequence number callback was provided to clds_sorted_list_create, when the item is indicated as already existing, the generated sequence number shall be indicated as skipped. ]*/
                            clds_sorted_list->skipped_seq_no_cb(clds_sorted_list->skipped_seq_no_cb_context, local_seq_no);
                        }

                        LogError("Cannot acquire hazard pointer");
                        restart_needed = false;
                        result = CLDS_SORTED_LIST_INSERT_ERROR;
                        break;
                    }
                    else
                    {
                        // now make sure the item has not changed. This also takes care of checking that the delete lock bit is not set
                        if (interlocked_compare_exchange_pointer((void* volatile_atomic*)current_item_address, NULL, NULL) != (void*)current_item)
                        {
                            if (previous_hp != NULL)
                            {
                                // let go of previous hazard pointer
                                clds_hazard_pointers_release(clds_hazard_pointers_thread, previous_hp);
                            }

                            // item changed, it is likely that the node is no longer reachable, so we should not use its memory, restart
                            clds_hazard_pointers_release(clds_hazard_pointers_thread, current_item_hp);

                            restart_needed = true;
                            break;
                        }
                        else
                        {
                            // we are in a stable state, at this point the previous node does not have a delete lock bit set
                            // compare the current item key to our key
                            void* current_item_key = clds_sorted_list->get_item_key_cb(clds_sorted_list->get_item_key_cb_context, (struct CLDS_SORTED_LIST_ITEM_TAG*)current_item);
                            int compare_result = clds_sorted_list->key_compare_cb(clds_sorted_list->key_compare_cb_context, new_item_key, current_item_key);

                            if (compare_result == 0)
                            {
                                // item already in the list
                                if (previous_item != NULL)
                                {
                                    clds_hazard_pointers_release(clds_hazard_pointers_thread, previous_hp);
                                }

                                clds_hazard_pointers_release(clds_hazard_pointers_thread, current_item_hp);
                                restart_needed = false;

                                if (clds_sorted_list->skipped_seq_no_cb != NULL)
                                {
                                    /* Codes_SRS_CLDS_SORTED_LIST_01_079: [** If sequence numbers are generated and a skipped sequence number callback was provided to clds_sorted_list_create, when the item is indicated as already existing, the generated sequence number shall be indicated as skipped. ]*/
                                    clds_sorted_list->skipped_seq_no_cb(clds_sorted_list->skipped_seq_no_cb_context, local_seq_no);
                                }

                                /* Codes_SRS_CLDS_SORTED_LIST_01_048: [ If the item with the given key already exists in the list, clds_sorted_list_insert shall fail and return CLDS_SORTED_LIST_INSERT_KEY_ALREADY_EXISTS. ]*/
                                result = CLDS_SORTED_LIST_INSERT_KEY_ALREADY_EXISTS;
                                break;
                            }
                            else if (compare_result < 0)
                            {
                                // need to insert between the previous and current node, since current node's key is higher than what we want to insert
                                item->next = current_item;

                                if (previous_item != NULL)
                                {
                                    // have a previous item
                                    if (interlocked_compare_exchange_pointer((void* volatile_atomic*)&previous_item->next, (void*)item, (void*)current_item) != (void*)current_item)
                                    {
                                        // let go of both hazard pointers
                                        clds_hazard_pointers_release(clds_hazard_pointers_thread, previous_hp);
                                        clds_hazard_pointers_release(clds_hazard_pointers_thread, current_item_hp);
                                        restart_needed = true;
                                        break;
                                    }
                                    else
                                    {
                                        // let go of both hazard pointers
                                        clds_hazard_pointers_release(clds_hazard_pointers_thread, previous_hp);
                                        clds_hazard_pointers_release(clds_hazard_pointers_thread, current_item_hp);
                                        restart_needed = false;

                                        /* Codes_SRS_CLDS_SORTED_LIST_01_010: [ On success clds_sorted_list_insert shall return 0. ]*/
                                        result = CLDS_SORTED_LIST_INSERT_OK;
                                        break;
                                    }
                                }
                                else
                                {
                                    if (interlocked_compare_exchange_pointer((void* volatile_atomic*)&clds_sorted_list->head, (void*)item, (void*)current_item) != current_item)
                                    {
                                        // let go of the hazard pointer
                                        clds_hazard_pointers_release(clds_hazard_pointers_thread, current_item_hp);
                                        restart_needed = true;
                                        break;
                                    }
                                    else
                                    {
                                        // let go of the hazard pointer
                                        clds_hazard_pointers_release(clds_hazard_pointers_thread, current_item_hp);
                                        restart_needed = false;

                                        /* Codes_SRS_CLDS_SORTED_LIST_01_010: [ On success clds_sorted_list_insert shall return CLDS_SORTED_LIST_INSERT_OK. ]*/
                                        result = CLDS_SORTED_LIST_INSERT_OK;
                                        break;
                                    }
                                }
                            }
                            else // item is less than the current, so move on
                            {
                                // we have a stable pointer to the current item, now simply set the previous to be this
                                if (previous_hp != NULL)
                                {
                                    // let go of previous hazard pointer
                                    clds_hazard_pointers_release(clds_hazard_pointers_thread, previous_hp);
                                }

                                previous_hp = current_item_hp;
                                previous_item = current_item;
                                current_item_address = (CLDS_SORTED_LIST_ITEM* volatile_atomic*)&current_item->next;
                            }
                        }
                    }
                }
            } while (1);
        } while (restart_needed);

        /*Codes_SRS_CLDS_SORTED_LIST_42_051: [ clds_sorted_list_insert shall decrement the count of pending write operations. ]*/
        end_write_operation(clds_sorted_list);
    }

    return result;
}