static CDI_THREAD PollThread()

in src/cdi/adapter.c [340:475]


static CDI_THREAD PollThread(void* ptr)
{
    PollThreadState* poll_thread_state_ptr = (PollThreadState*)ptr;
    AdapterConnectionState* adapter_con_ptr_array[CDI_MAX_SIMULTANEOUS_CONNECTIONS] = {0};
    int num_of_connections = 0;
    int connection_index = 0;

    // Allocate array of signals used to wake-up a poll thread used by a Tx connection. First signal is
    // connection_list_changed_signal. Next is an array of signals, grouped by connection.
    CdiSignalType tx_signal_array[1+3*CDI_MAX_SIMULTANEOUS_CONNECTIONS] = {0};
    tx_signal_array[kSignalIndexConnectionList] = poll_thread_state_ptr->connection_list_changed_signal;
    int num_signals = kSignalIndexArray;

    bool all_idle = true;
    while (true) {
        if (CdiOsSignalReadState(poll_thread_state_ptr->connection_list_changed_signal) && (0 == connection_index)) {
            // Make local copy of the connection list for this poll thread. This allows the connection list to be
            // externally updated without affecting the poll thread.
            CdiOsCritSectionReserve(poll_thread_state_ptr->connection_list_lock);

            CdiListIterator list_iterator;
            CdiListIteratorInit(&poll_thread_state_ptr->connection_list, &list_iterator);
            num_of_connections = 0;
            all_idle = true;
            num_signals = kSignalIndexArray;
            AdapterConnectionState* entry_ptr = NULL;
            poll_thread_state_ptr->only_transmit = true; // Default to only transmit. State is updated below.
            while (NULL != (entry_ptr = (AdapterConnectionState*)CdiListIteratorGetNext(&list_iterator))) {
                adapter_con_ptr_array[num_of_connections++] = entry_ptr;
                // If receiver or bi-directional then clear the only_transmit flag used to determine if poll thread can
                // sleep.
                if (kEndpointDirectionReceive == entry_ptr->direction ||
                    kEndpointDirectionBidirectional == entry_ptr->direction) {
                    poll_thread_state_ptr->only_transmit = false;
                }

                // If the Tx poll do work signal exists, add it to the array.
                if (entry_ptr->tx_poll_do_work_signal) {
                    tx_signal_array[num_signals++] = entry_ptr->tx_poll_do_work_signal;
                }

                if (kEndpointTypeControl == poll_thread_state_ptr->data_type) {
                    // Control interface uses Tx packet queue for notification signals.
                    if (!poll_thread_state_ptr->is_poll && entry_ptr->can_transmit) {
                        AdapterEndpointState* adapter_endpoint_ptr = entry_ptr->control_state.control_endpoint_handle;
                        tx_signal_array[num_signals++] =
                            CdiQueueGetPopWaitSignal(adapter_endpoint_ptr->tx_packet_queue_handle);
                    }
                } else {
                    // Data interface uses Endpoint Manager notification signals.
                    EndpointManagerHandle mgr_handle = EndpointManagerConnectionToEndpointManager(
                                        entry_ptr->data_state.cdi_connection_handle);
                    tx_signal_array[num_signals++] = EndpointManagerGetNotificationSignal(mgr_handle);
                }
            }
            CdiOsSignalClear(poll_thread_state_ptr->connection_list_changed_signal);
            CdiOsSignalSet(poll_thread_state_ptr->connection_list_processed_signal);
            CdiOsCritSectionRelease(poll_thread_state_ptr->connection_list_lock);
        }

        if (0 == num_of_connections) {
            // No connections, so exit the loop and the thread.
            break;
        }

        AdapterConnectionState* adapter_con_state_ptr = adapter_con_ptr_array[connection_index];

        if (kPollStart == adapter_con_state_ptr->poll_state) {
            // This connection is just starting to use the poll thread, so update initial data.
            adapter_con_state_ptr->load_state.top_time = CdiOsGetMicroseconds();
            adapter_con_state_ptr->load_state.start_time = adapter_con_state_ptr->load_state.top_time;
            adapter_con_state_ptr->load_state.idle_accumulator = 0;
            adapter_con_state_ptr->load_state.busy_accumulator = 0;
        }

        if (kPollStopped != adapter_con_state_ptr->poll_state) {
            // Connection is active, so allow poll thread to poll it.
            if (kEndpointTypeControl == poll_thread_state_ptr->data_type) {
                // Control interface (SDK probe/control protocol).
                ControlInterfacePoll(adapter_con_state_ptr, num_signals, tx_signal_array);
                // Control interface adapter's don't require polling, so wait for a notification.
                assert(!poll_thread_state_ptr->is_poll);
                CdiOsSignalsWait(tx_signal_array, num_signals, false, CDI_INFINITE, NULL);
            } else {
                // Data interface (user data payloads/packets and probe EFA packets).
                if (!DataPoll(adapter_con_state_ptr)) {
                    all_idle = false;
                }
                // For transmitter, If tx_poll_do_work_signal is set and all endpoints are idle then clear the signal,
                // ensuring that was ok to clear it.
                if (adapter_con_state_ptr->can_transmit &&
                    CdiOsSignalReadState(adapter_con_state_ptr->tx_poll_do_work_signal) && all_idle) {
                    CdiOsSignalClear(adapter_con_state_ptr->tx_poll_do_work_signal);
                    // To avoid the use of critical sections here, use atomic operations to ensure it was safe to clear
                    // the signal. If tx_in_flight_ref_count was incremented outside the scope of this function after we
                    // just cleared tx_poll_do_work_signal, then restore the signal's state (set it).
                    EndpointManagerHandle mgr_handle = EndpointManagerConnectionToEndpointManager(
                                                          adapter_con_state_ptr->data_state.cdi_connection_handle);
                    CdiEndpointHandle cdi_endpoint_handle = EndpointManagerGetFirstEndpoint(mgr_handle);
                    while (cdi_endpoint_handle) {
                        AdapterEndpointState* adapter_endpoint_ptr =
                            EndpointManagerEndpointToAdapterEndpoint(cdi_endpoint_handle);
                        if (CdiOsAtomicLoad32(&adapter_endpoint_ptr->tx_in_flight_ref_count)) {
                            CdiOsSignalSet(adapter_con_state_ptr->tx_poll_do_work_signal);
                            break;
                        }
                        cdi_endpoint_handle = EndpointManagerGetNextEndpoint(cdi_endpoint_handle);
                    }
                }
            }
        }

        // Advance to next connection.
        connection_index++;
        if (connection_index >= num_of_connections) {
            connection_index = 0;
            // If the poll thread is data type, only contains transmitters, uses a polling adapter and all endpoints for
            // all connections are currently idle then sleep until there is a notification.
            if (kEndpointTypeData == poll_thread_state_ptr->data_type && poll_thread_state_ptr->only_transmit &&
                poll_thread_state_ptr->is_poll && all_idle) {
#ifdef DEBUG_POLL_THREAD_SLEEP_TIME
                uint64_t start_time = CdiOsGetMicroseconds();
#endif
                uint32_t index = 0;
                CdiOsSignalsWait(tx_signal_array, num_signals, false, CDI_INFINITE, &index);
#ifdef DEBUG_POLL_THREAD_SLEEP_TIME
                CDI_LOG_THREAD(kLogInfo, "SigIdx=%d slept=%lu", index, CdiOsGetMicroseconds() - start_time);
#endif
            }
            all_idle = true;
        }
    }

    CdiLoggerThreadLogUnset();
    return 0; // Return code not used.
}