CLDS_HASH_TABLE_INSERT_RESULT clds_hash_table_insert()

in src/clds_hash_table.c [327:506]


CLDS_HASH_TABLE_INSERT_RESULT clds_hash_table_insert(CLDS_HASH_TABLE_HANDLE clds_hash_table, CLDS_HAZARD_POINTERS_THREAD_HANDLE clds_hazard_pointers_thread, void* key, CLDS_HASH_TABLE_ITEM* value, int64_t* sequence_number)
{
    CLDS_HASH_TABLE_INSERT_RESULT result;

    if (
        /* Codes_SRS_CLDS_HASH_TABLE_01_010: [ If clds_hash_table is NULL, clds_hash_table_insert shall fail and return CLDS_HASH_TABLE_INSERT_ERROR. ]*/
        (clds_hash_table == NULL) ||
        /* Codes_SRS_CLDS_HASH_TABLE_01_011: [ If key is NULL, clds_hash_table_insert shall fail and return CLDS_HASH_TABLE_INSERT_ERROR. ]*/
        (key == NULL) ||
        /* Codes_SRS_CLDS_HASH_TABLE_01_012: [ If clds_hazard_pointers_thread is NULL, clds_hash_table_insert shall fail and return CLDS_HASH_TABLE_INSERT_ERROR. ]*/
        (clds_hazard_pointers_thread == NULL) ||
        /* Codes_SRS_CLDS_HASH_TABLE_01_062: [ If the sequence_number argument is non-NULL, but no start sequence number was specified in clds_hash_table_create, clds_hash_table_insert shall fail and return CLDS_HASH_TABLE_INSERT_ERROR. ]*/
        ((sequence_number != NULL) && (clds_hash_table->sequence_number == NULL))
        )
    {
        LogError("Invalid arguments: CLDS_HASH_TABLE_HANDLE clds_hash_table=%p, CLDS_HAZARD_POINTERS_THREAD_HANDLE clds_hazard_pointers_thread=%p, void* key=%p, CLDS_HASH_TABLE_ITEM* value=%p, int64_t* sequence_number=%p",
            clds_hash_table, clds_hazard_pointers_thread, key, value, sequence_number);
        result = CLDS_HASH_TABLE_INSERT_ERROR;
    }
    else
    {
        /* Codes_SRS_CLDS_HASH_TABLE_42_032: [ clds_hash_table_insert shall try the following until it acquires a write lock for the table: ]*/
        /* Codes_SRS_CLDS_HASH_TABLE_42_033: [ clds_hash_table_insert shall increment the count of pending write operations. ]*/
        /* Codes_SRS_CLDS_HASH_TABLE_42_034: [ If the counter to lock the table for writes is non-zero then: ]*/
        /* Codes_SRS_CLDS_HASH_TABLE_42_035: [ clds_hash_table_insert shall decrement the count of pending write operations. ]*/
        /* Codes_SRS_CLDS_HASH_TABLE_42_036: [ clds_hash_table_insert shall wait for the counter to lock the table for writes to reach 0 and repeat. ]*/
        check_lock_and_begin_write_operation(clds_hash_table);

        bool restart_needed;
        CLDS_SORTED_LIST_HANDLE bucket_list = NULL;
        HASH_TABLE_ITEM* hash_table_item = CLDS_SORTED_LIST_GET_VALUE(HASH_TABLE_ITEM, value);
        uint64_t hash;
        BUCKET_ARRAY* current_bucket_array;
        int32_t bucket_count;
        uint64_t bucket_index;
        bool found_in_lower_levels = false;

        // find or allocate a new bucket array
        current_bucket_array = get_first_bucket_array(clds_hash_table);
        bucket_count = interlocked_add(&current_bucket_array->bucket_count, 0);

        (void)interlocked_increment(&current_bucket_array->pending_insert_count);

        // compute the hash
        /* Codes_SRS_CLDS_HASH_TABLE_01_038: [ clds_hash_table_insert shall hash the key by calling the compute_hash function passed to clds_hash_table_create. ]*/
        hash = clds_hash_table->compute_hash(key);

        found_in_lower_levels = false;

        // check if the key exists in the lower level bucket arrays
        BUCKET_ARRAY* find_bucket_array = current_bucket_array;
        BUCKET_ARRAY* next_bucket_array = interlocked_compare_exchange_pointer((void* volatile_atomic*)&find_bucket_array->next_bucket, NULL, NULL);

        if (next_bucket_array != NULL)
        {
            // wait for all outstanding inserts in the lower levels to complete
            do
            {
                if (interlocked_add(&next_bucket_array->pending_insert_count, 0) == 0)
                {
                    break;
                }
            } while (1);
        }

        find_bucket_array = next_bucket_array;
        while (find_bucket_array != NULL)
        {
            next_bucket_array = interlocked_compare_exchange_pointer((void* volatile_atomic*)&find_bucket_array->next_bucket, NULL, NULL);

            bucket_index = hash % interlocked_add(&find_bucket_array->bucket_count, 0);
            bucket_list = interlocked_compare_exchange_pointer((void* volatile_atomic*)&find_bucket_array->hash_table[bucket_index], NULL, NULL);

            if (bucket_list != NULL)
            {
                CLDS_SORTED_LIST_ITEM* sorted_list_item = clds_sorted_list_find_key(bucket_list, clds_hazard_pointers_thread, key);
                if (sorted_list_item != NULL)
                {
                    clds_sorted_list_node_release(sorted_list_item);

                    found_in_lower_levels = true;
                    break;
                }
            }

            find_bucket_array = next_bucket_array;
        }

        if (found_in_lower_levels)
        {
            result = CLDS_HASH_TABLE_INSERT_KEY_ALREADY_EXISTS;
        }
        else
        {
            (void)interlocked_increment(&current_bucket_array->item_count);

            // find the bucket
            /* Codes_SRS_CLDS_HASH_TABLE_01_018: [ clds_hash_table_insert shall obtain the bucket index to be used by calling compute_hash and passing to it the key value. ]*/
            bucket_index = hash % bucket_count;

            do
            {
                // do we have a list here or do we create one?
                bucket_list = interlocked_compare_exchange_pointer((void* volatile_atomic*)&current_bucket_array->hash_table[bucket_index], NULL, NULL);
                if (bucket_list != NULL)
                {
                    restart_needed = false;
                }
                else
                {
                    // create a list
                    /* Codes_SRS_CLDS_HASH_TABLE_01_019: [ If no sorted list exists at the determined bucket index then a new list shall be created. ]*/
                    /* Codes_SRS_CLDS_HASH_TABLE_01_071: [ When a new list is created, the start sequence number passed to clds_hash_tabel_create shall be passed as the start_sequence_number argument. ]*/
                    bucket_list = clds_sorted_list_create(clds_hash_table->clds_hazard_pointers, get_item_key_cb, clds_hash_table, key_compare_cb, clds_hash_table, clds_hash_table->sequence_number, clds_hash_table->sequence_number == NULL ? NULL : on_sorted_list_skipped_seq_no, clds_hash_table);
                    if (bucket_list == NULL)
                    {
                        /* Codes_SRS_CLDS_HASH_TABLE_01_022: [ If any error is encountered while inserting the key/value pair, clds_hash_table_insert shall fail and return CLDS_HASH_TABLE_INSERT_ERROR. ]*/
                        LogError("Cannot allocate list for hash table bucket");
                        restart_needed = false;
                    }
                    else
                    {
                        // now put the list in the bucket
                        if (interlocked_compare_exchange_pointer((void* volatile_atomic*)&current_bucket_array->hash_table[bucket_index], bucket_list, NULL) != NULL)
                        {
                            // oops, someone else inserted a new list, just bail on our list and restart
                            clds_sorted_list_destroy(bucket_list);
                            restart_needed = true;
                        }
                        else
                        {
                            // set new list
                            restart_needed = false;
                        }
                    }
                }
            } while (restart_needed);

            if (bucket_list == NULL)
            {
                LogError("Cannot acquire bucket list");
                result = CLDS_HASH_TABLE_INSERT_ERROR;
            }
            else
            {
                CLDS_SORTED_LIST_INSERT_RESULT list_insert_result;

                /* Codes_SRS_CLDS_HASH_TABLE_01_020: [ A new sorted list item shall be created by calling clds_sorted_list_node_create. ]*/
                hash_table_item->key = key;

                /* Codes_SRS_CLDS_HASH_TABLE_01_021: [ The new sorted list node shall be inserted in the sorted list at the identified bucket by calling clds_sorted_list_insert. ]*/
                /* Codes_SRS_CLDS_HASH_TABLE_01_059: [ For each insert the order of the operation shall be computed by passing sequence_number to clds_sorted_list_insert. ]*/
                list_insert_result = clds_sorted_list_insert(bucket_list, clds_hazard_pointers_thread, (void*)value, sequence_number);

                if (list_insert_result == CLDS_SORTED_LIST_INSERT_KEY_ALREADY_EXISTS)
                {
                    /* Codes_SRS_CLDS_HASH_TABLE_01_046: [ If the key already exists in the hash table, clds_hash_table_insert shall fail and return CLDS_HASH_TABLE_INSERT_ALREADY_EXISTS. ]*/
                    result = CLDS_HASH_TABLE_INSERT_KEY_ALREADY_EXISTS;
                }
                else if (list_insert_result != CLDS_SORTED_LIST_INSERT_OK)
                {
                    /* Codes_SRS_CLDS_HASH_TABLE_01_022: [ If any error is encountered while inserting the key/value pair, clds_hash_table_insert shall fail and return CLDS_HASH_TABLE_INSERT_ERROR. ]*/
                    LogError("Cannot insert hash table item into list");
                    result = CLDS_HASH_TABLE_INSERT_ERROR;
                }
                else
                {
                    /* Codes_SRS_CLDS_HASH_TABLE_01_009: [ On success clds_hash_table_insert shall return CLDS_HASH_TABLE_INSERT_OK. ]*/
                    result = CLDS_HASH_TABLE_INSERT_OK;
                }
            }
        }

        (void)interlocked_decrement(&current_bucket_array->pending_insert_count);

        /* Codes_SRS_CLDS_HASH_TABLE_42_063: [ clds_hash_table_insert shall decrement the count of pending write operations. ]*/
        end_write_operation(clds_hash_table);
    }
    return result;
}