bool CdiOsSignalsWait()

in src/common/src/os_linux.c [759:915]


bool CdiOsSignalsWait(CdiSignalType* signal_array, uint8_t num_signals, bool wait_all, uint32_t timeout_in_ms,
                      uint32_t* ret_signal_index_ptr)
{
    bool return_val = true;
    SignalInfo** signal_info_ptr_array = (SignalInfo**)signal_array;
    uint32_t i;
    uint32_t signal_count_array[CDI_MAX_WAIT_MULTIPLE];
    int j;
    bool keep_waiting = true;

    if(num_signals > CDI_MAX_WAIT_MULTIPLE) {
        ERROR_MESSAGE("Exceeded maximum number of wait signals[%d]", CDI_MAX_WAIT_MULTIPLE);
        return false;
    }

    if (wait_all) {
        // Wait for all signals to be set.
        uint64_t start_ms = 0;

        if (timeout_in_ms != CDI_INFINITE) {
            start_ms = CdiOsGetMilliseconds();
        }

        if (ret_signal_index_ptr) {
            *ret_signal_index_ptr = 1;
        }

        while (keep_waiting && return_val) {
            // Check if all signals are active.
            keep_waiting = false;
            for (i = 0; i < num_signals; i++) {
                if (!(signal_info_ptr_array[i]->signal_count & 1)) {
                    // Signal is not active, wait on it.
                    bool timed_out;
                    uint32_t new_timeout_ms = CDI_INFINITE;

                    if (timeout_in_ms != CDI_INFINITE) {
                        uint64_t time_run_ms = CdiOsGetMilliseconds() - start_ms;
                        if (time_run_ms > timeout_in_ms) {
                            new_timeout_ms = 0;
                        } else {
                            new_timeout_ms = timeout_in_ms - time_run_ms;
                        }
                    }
                    return_val = CdiOsSignalWait((CdiSignalType)signal_info_ptr_array[i], new_timeout_ms, &timed_out);
                    if (timed_out) {
                        if (ret_signal_index_ptr) {
                            *ret_signal_index_ptr = CDI_OS_SIG_TIMEOUT;
                        }
                        keep_waiting = false;
                        break;
                    }
                    keep_waiting = true;
                }
            }
        }
        // The pthread_mutex functions imply a memory barrier. If we don't wait we must do our own.
        __sync_synchronize();

        return return_val;
    }

    // First, see if any signals are active.
    for (i = 0; i < num_signals; i++) {
        signal_count_array[i] = signal_info_ptr_array[i]->signal_count;
        if (signal_count_array[i] & 1) {
            keep_waiting = false;
            if (NULL != ret_signal_index_ptr) {
                *ret_signal_index_ptr = i;
            }
            break;
        }
    }

    if (keep_waiting) {

        // No signals currently active, we need to wait for the first signal and register with the remaining signals.
        struct timespec sTime;
        if(timeout_in_ms != CDI_INFINITE) {
            GetTimeout(&sTime, timeout_in_ms, kPreferredClock);
        }

        pthread_mutex_lock(&signal_info_ptr_array[0]->mutex);
        // Register with all the other signals. We use atomics to increment the signal counts and set a pointer to our
        // signal. This is done to avoid needing to grab locks, since that will introduce messey race conditions.
        //
        // We need to have the lock to signal0 so that none of these signals can call signal0's condition variable until
        // we are sleeping on it.
        for (i = 1; i < num_signals; i++) {
            int previous_count = __sync_fetch_and_add(&signal_info_ptr_array[i]->num_other_sigs, 1);
            if(previous_count >= MAX_THREADS_WAITING) {
                ERROR_MESSAGE("__sync_fetch_and_add too high");
                __sync_fetch_and_sub(&signal_info_ptr_array[i]->num_other_sigs, 1);
                return_val = false;
                keep_waiting = false;
            }
            for (j = 0; j < MAX_THREADS_WAITING; j++) {
                if (__sync_bool_compare_and_swap(&signal_info_ptr_array[i]->other_sigs_ptr_array[j], NULL, signal_info_ptr_array[0])) {
                    break;
                }
            }
            if (j == MAX_THREADS_WAITING) {
                ERROR_MESSAGE("MAX_THREADS_WAITING");
                __sync_fetch_and_sub(&signal_info_ptr_array[i]->num_other_sigs, 1);
                return_val = false;
                keep_waiting = false;
            }
        }

        while (keep_waiting) {
            // Check if a signal is set.
            for (i = 0; i < num_signals; i++) {
                if(signal_info_ptr_array[i]->signal_count != signal_count_array[i]) {
                    keep_waiting = false;
                    if(NULL != ret_signal_index_ptr) {
                        *ret_signal_index_ptr = i;
                    }
                    break;
                }
            }
            if (keep_waiting) {
                // No signal set, wait on the first condition variable.
                if(timeout_in_ms != CDI_INFINITE) {
                    int ret_code = pthread_cond_timedwait(&signal_info_ptr_array[0]->condition, &signal_info_ptr_array[0]->mutex, &sTime);
                    if (ret_code) {
                        if (ret_signal_index_ptr) {
                            *ret_signal_index_ptr = CDI_OS_SIG_TIMEOUT;
                        }
                        keep_waiting = false;
                        break;
                    }
                } else {
                    pthread_cond_wait(&signal_info_ptr_array[0]->condition, &signal_info_ptr_array[0]->mutex);
                }
            }
        }

        // Done with the lock.
        pthread_mutex_unlock(&signal_info_ptr_array[0]->mutex);

        // Remove the registrations using atomics to avoid locks.
        for (i = 1; i < num_signals; i++) {
            for (j = 0; j < MAX_THREADS_WAITING; j++) {
                if (__sync_bool_compare_and_swap(&signal_info_ptr_array[i]->other_sigs_ptr_array[j], signal_info_ptr_array[0], NULL)) {
                    __sync_fetch_and_sub(&signal_info_ptr_array[i]->num_other_sigs, 1);
                    break;
                }
            }
        }
    }
    else {
        // The pthread_mutex functions imply a memory barrier. If we don't wait we must do our own.
        __sync_synchronize();
    }

    return return_val;
}