CdiReturnStatus CdiAdapterCreateConnection()

in src/cdi/adapter.c [582:754]


CdiReturnStatus CdiAdapterCreateConnection(CdiAdapterConnectionConfigData* config_data_ptr,
                                           AdapterConnectionHandle* return_handle_ptr)
{
    // NOTE: Since the caller is the application's thread, use SDK_LOG_GLOBAL() for any logging in this function.
    CdiReturnStatus rs = kCdiStatusOk;

    CdiOsCritSectionReserve(config_data_ptr->cdi_adapter_handle->adapter_lock);

    // Allocate a generic connection state structure.
    AdapterConnectionState* adapter_con_state_ptr = CdiOsMemAllocZero(sizeof(*adapter_con_state_ptr));
    if (adapter_con_state_ptr == NULL) {
        rs = kCdiStatusNotEnoughMemory;
    }

    if (kCdiStatusOk == rs) {
        if (!CdiOsSignalCreate(&adapter_con_state_ptr->shutdown_signal)) {
            rs = kCdiStatusAllocationFailed;
        }
    }

    if (kCdiStatusOk == rs) {
        // Link endpoint to its adapter, queue message function and log.
        adapter_con_state_ptr->adapter_state_ptr = config_data_ptr->cdi_adapter_handle;
        adapter_con_state_ptr->data_state.cdi_connection_handle = config_data_ptr->cdi_connection_handle;
        adapter_con_state_ptr->log_handle = config_data_ptr->log_handle;
        adapter_con_state_ptr->data_state.connection_cb_ptr = config_data_ptr->connection_cb_ptr;
        adapter_con_state_ptr->data_state.connection_user_cb_param = config_data_ptr->connection_user_cb_param;

        // Remember what kind of endpoint this is.
        adapter_con_state_ptr->direction = config_data_ptr->direction;
        adapter_con_state_ptr->can_transmit = (kEndpointDirectionSend == adapter_con_state_ptr->direction ||
                                                kEndpointDirectionBidirectional == adapter_con_state_ptr->direction);
        adapter_con_state_ptr->can_receive = (kEndpointDirectionReceive == adapter_con_state_ptr->direction ||
                                                kEndpointDirectionBidirectional == adapter_con_state_ptr->direction);

        if (adapter_con_state_ptr->can_transmit) {
            if (!CdiOsSignalCreate(&adapter_con_state_ptr->tx_poll_do_work_signal)) {
                rs = kCdiStatusAllocationFailed;
            }
        }
    }

    if (kCdiStatusOk == rs) {
        if (adapter_con_state_ptr->can_receive) {
            adapter_con_state_ptr->rx_state = config_data_ptr->rx_state;
        }

        adapter_con_state_ptr->port_number = config_data_ptr->port_number;

        // Set this prior to opening the endpoint. Receive packets may start flowing before Open() returns and the
        // connection must have a valid endpoint pointer set.
        *return_handle_ptr = adapter_con_state_ptr;

        // Do adapter specific open connection actions.
        rs = config_data_ptr->cdi_adapter_handle->functions_ptr->CreateConnection(adapter_con_state_ptr,
                                                                                  config_data_ptr->port_number);
    }

    const char* thread_name_prefix_str = NULL;
    if (kCdiStatusOk == rs) {
        switch (config_data_ptr->direction) {
            case kEndpointDirectionSend:
                thread_name_prefix_str = "PollTx";
            break;
            case kEndpointDirectionReceive:
                thread_name_prefix_str = "PollRx";
            break;
            case kEndpointDirectionBidirectional:
                thread_name_prefix_str = "PollBx";
            break;
        }
    }

    if (kCdiStatusOk == rs) {
        PollThreadState* poll_thread_state_ptr = NULL;

        // Only share the poll thread if the ID is greater than zero.
        if (config_data_ptr->shared_thread_id > 0) {
            // Check if poll thread with this ID already exists.
            CdiListIterator list_iterator;
            // NOTE: Must have acquired adapter_lock before using poll_thread_list.
            CdiListIteratorInit(&config_data_ptr->cdi_adapter_handle->poll_thread_list, &list_iterator);
            while (NULL != (poll_thread_state_ptr = (PollThreadState*)CdiListIteratorGetNext(&list_iterator))) {
                if (poll_thread_state_ptr->shared_thread_id == config_data_ptr->shared_thread_id) {
                    break;
                }
            }
        }

        if (poll_thread_state_ptr) {
            // Use poll thread from existing connection.
            if (poll_thread_state_ptr->thread_core_num != config_data_ptr->thread_core_num) {
                CDI_LOG_THREAD(kLogError, "Poll thread cannot use a mix of thread_core_num. Shared thread ID[%d].",
                               config_data_ptr->shared_thread_id);
                rs = kCdiStatusInvalidParameter;
            } else if (poll_thread_state_ptr->data_type != config_data_ptr->data_type) {
                CDI_LOG_THREAD(kLogError, "Poll thread cannot use a mix of endpoint types. Shared thread ID[%d].",
                               config_data_ptr->shared_thread_id);
                rs = kCdiStatusInvalidParameter;
            } else if (poll_thread_state_ptr->is_poll !=
                       (NULL != adapter_con_state_ptr->adapter_state_ptr->functions_ptr->Poll)) {
                CDI_LOG_THREAD(kLogError,
                    "Poll thread cannot use a mix of polling and non-polling adapters. Shared thread ID[%d].",
                    config_data_ptr->shared_thread_id);
                rs = kCdiStatusFatal;
            } else {
                if (adapter_con_state_ptr->can_receive) {
                    poll_thread_state_ptr->only_transmit = false;
                }
                PollThreadConnectionAdd(poll_thread_state_ptr, adapter_con_state_ptr);
            }
        } else {
            // Create a new poll thread for this connection.
            char thread_name_str[CDI_MAX_THREAD_NAME];
            snprintf(thread_name_str, sizeof(thread_name_str), "%s%s%d", thread_name_prefix_str,
                    CdiUtilityKeyEnumToString(kKeyAdapterType,
                                            config_data_ptr->cdi_adapter_handle->adapter_data.adapter_type),
                    config_data_ptr->shared_thread_id);

            // Create new poll thread state data.
            PollThreadState* poll_thread_state_ptr = CdiOsMemAllocZero(sizeof(PollThreadState));

            poll_thread_state_ptr->shared_thread_id = config_data_ptr->shared_thread_id;
            poll_thread_state_ptr->thread_core_num = config_data_ptr->thread_core_num;
            poll_thread_state_ptr->data_type = config_data_ptr->data_type;
            poll_thread_state_ptr->is_poll = (NULL != adapter_con_state_ptr->adapter_state_ptr->functions_ptr->Poll);
            if (!adapter_con_state_ptr->can_receive) {
                poll_thread_state_ptr->only_transmit = true;
            }
            CdiListInit(&poll_thread_state_ptr->connection_list);

            if (!CdiOsSignalCreate(&poll_thread_state_ptr->connection_list_changed_signal)) {
                rs = kCdiStatusNotEnoughMemory;
            } else if (!CdiOsSignalCreate(&poll_thread_state_ptr->connection_list_processed_signal)) {
                rs = kCdiStatusNotEnoughMemory;
            } else if (!CdiOsCritSectionCreate(&poll_thread_state_ptr->connection_list_lock)) {
                rs = kCdiStatusNotEnoughMemory;
            } else if (!CdiOsSignalCreate(&poll_thread_state_ptr->start_signal)) {
                rs = kCdiStatusNotEnoughMemory;
            } else {
                // Add the connection to the poll thread state data so when the thread starts running it will have a
                // connection to use.
                PollThreadConnectionAdd(poll_thread_state_ptr, adapter_con_state_ptr);

                // Create poll worker thread.
                if (!CdiOsThreadCreatePinned(PollThread, &poll_thread_state_ptr->thread_id, thread_name_str,
                                            poll_thread_state_ptr, poll_thread_state_ptr->start_signal,
                                            config_data_ptr->thread_core_num)) {
                    rs = kCdiStatusCreateThreadFailed;
                }
            }

            if (kCdiStatusOk == rs) {
                // Add poll thread state data to list held by adapter.
                // NOTE: Must have acquired adapter_lock before using poll_thread_list.
                CdiListAddTail(&config_data_ptr->cdi_adapter_handle->poll_thread_list, &poll_thread_state_ptr->list_entry);
            } else {
                PollThreadDestroy(poll_thread_state_ptr, adapter_con_state_ptr->shutdown_signal);
                poll_thread_state_ptr = NULL;
            }
        }
    }

    if (kCdiStatusOk != rs) {
        CdiAdapterDestroyConnection(adapter_con_state_ptr);
        adapter_con_state_ptr = NULL;
        *return_handle_ptr = NULL;
    }

    CdiOsCritSectionRelease(config_data_ptr->cdi_adapter_handle->adapter_lock);

    return rs;
}