in src/cdi/adapter.c [582:754]
CdiReturnStatus CdiAdapterCreateConnection(CdiAdapterConnectionConfigData* config_data_ptr,
AdapterConnectionHandle* return_handle_ptr)
{
// NOTE: Since the caller is the application's thread, use SDK_LOG_GLOBAL() for any logging in this function.
CdiReturnStatus rs = kCdiStatusOk;
CdiOsCritSectionReserve(config_data_ptr->cdi_adapter_handle->adapter_lock);
// Allocate a generic connection state structure.
AdapterConnectionState* adapter_con_state_ptr = CdiOsMemAllocZero(sizeof(*adapter_con_state_ptr));
if (adapter_con_state_ptr == NULL) {
rs = kCdiStatusNotEnoughMemory;
}
if (kCdiStatusOk == rs) {
if (!CdiOsSignalCreate(&adapter_con_state_ptr->shutdown_signal)) {
rs = kCdiStatusAllocationFailed;
}
}
if (kCdiStatusOk == rs) {
// Link endpoint to its adapter, queue message function and log.
adapter_con_state_ptr->adapter_state_ptr = config_data_ptr->cdi_adapter_handle;
adapter_con_state_ptr->data_state.cdi_connection_handle = config_data_ptr->cdi_connection_handle;
adapter_con_state_ptr->log_handle = config_data_ptr->log_handle;
adapter_con_state_ptr->data_state.connection_cb_ptr = config_data_ptr->connection_cb_ptr;
adapter_con_state_ptr->data_state.connection_user_cb_param = config_data_ptr->connection_user_cb_param;
// Remember what kind of endpoint this is.
adapter_con_state_ptr->direction = config_data_ptr->direction;
adapter_con_state_ptr->can_transmit = (kEndpointDirectionSend == adapter_con_state_ptr->direction ||
kEndpointDirectionBidirectional == adapter_con_state_ptr->direction);
adapter_con_state_ptr->can_receive = (kEndpointDirectionReceive == adapter_con_state_ptr->direction ||
kEndpointDirectionBidirectional == adapter_con_state_ptr->direction);
if (adapter_con_state_ptr->can_transmit) {
if (!CdiOsSignalCreate(&adapter_con_state_ptr->tx_poll_do_work_signal)) {
rs = kCdiStatusAllocationFailed;
}
}
}
if (kCdiStatusOk == rs) {
if (adapter_con_state_ptr->can_receive) {
adapter_con_state_ptr->rx_state = config_data_ptr->rx_state;
}
adapter_con_state_ptr->port_number = config_data_ptr->port_number;
// Set this prior to opening the endpoint. Receive packets may start flowing before Open() returns and the
// connection must have a valid endpoint pointer set.
*return_handle_ptr = adapter_con_state_ptr;
// Do adapter specific open connection actions.
rs = config_data_ptr->cdi_adapter_handle->functions_ptr->CreateConnection(adapter_con_state_ptr,
config_data_ptr->port_number);
}
const char* thread_name_prefix_str = NULL;
if (kCdiStatusOk == rs) {
switch (config_data_ptr->direction) {
case kEndpointDirectionSend:
thread_name_prefix_str = "PollTx";
break;
case kEndpointDirectionReceive:
thread_name_prefix_str = "PollRx";
break;
case kEndpointDirectionBidirectional:
thread_name_prefix_str = "PollBx";
break;
}
}
if (kCdiStatusOk == rs) {
PollThreadState* poll_thread_state_ptr = NULL;
// Only share the poll thread if the ID is greater than zero.
if (config_data_ptr->shared_thread_id > 0) {
// Check if poll thread with this ID already exists.
CdiListIterator list_iterator;
// NOTE: Must have acquired adapter_lock before using poll_thread_list.
CdiListIteratorInit(&config_data_ptr->cdi_adapter_handle->poll_thread_list, &list_iterator);
while (NULL != (poll_thread_state_ptr = (PollThreadState*)CdiListIteratorGetNext(&list_iterator))) {
if (poll_thread_state_ptr->shared_thread_id == config_data_ptr->shared_thread_id) {
break;
}
}
}
if (poll_thread_state_ptr) {
// Use poll thread from existing connection.
if (poll_thread_state_ptr->thread_core_num != config_data_ptr->thread_core_num) {
CDI_LOG_THREAD(kLogError, "Poll thread cannot use a mix of thread_core_num. Shared thread ID[%d].",
config_data_ptr->shared_thread_id);
rs = kCdiStatusInvalidParameter;
} else if (poll_thread_state_ptr->data_type != config_data_ptr->data_type) {
CDI_LOG_THREAD(kLogError, "Poll thread cannot use a mix of endpoint types. Shared thread ID[%d].",
config_data_ptr->shared_thread_id);
rs = kCdiStatusInvalidParameter;
} else if (poll_thread_state_ptr->is_poll !=
(NULL != adapter_con_state_ptr->adapter_state_ptr->functions_ptr->Poll)) {
CDI_LOG_THREAD(kLogError,
"Poll thread cannot use a mix of polling and non-polling adapters. Shared thread ID[%d].",
config_data_ptr->shared_thread_id);
rs = kCdiStatusFatal;
} else {
if (adapter_con_state_ptr->can_receive) {
poll_thread_state_ptr->only_transmit = false;
}
PollThreadConnectionAdd(poll_thread_state_ptr, adapter_con_state_ptr);
}
} else {
// Create a new poll thread for this connection.
char thread_name_str[CDI_MAX_THREAD_NAME];
snprintf(thread_name_str, sizeof(thread_name_str), "%s%s%d", thread_name_prefix_str,
CdiUtilityKeyEnumToString(kKeyAdapterType,
config_data_ptr->cdi_adapter_handle->adapter_data.adapter_type),
config_data_ptr->shared_thread_id);
// Create new poll thread state data.
PollThreadState* poll_thread_state_ptr = CdiOsMemAllocZero(sizeof(PollThreadState));
poll_thread_state_ptr->shared_thread_id = config_data_ptr->shared_thread_id;
poll_thread_state_ptr->thread_core_num = config_data_ptr->thread_core_num;
poll_thread_state_ptr->data_type = config_data_ptr->data_type;
poll_thread_state_ptr->is_poll = (NULL != adapter_con_state_ptr->adapter_state_ptr->functions_ptr->Poll);
if (!adapter_con_state_ptr->can_receive) {
poll_thread_state_ptr->only_transmit = true;
}
CdiListInit(&poll_thread_state_ptr->connection_list);
if (!CdiOsSignalCreate(&poll_thread_state_ptr->connection_list_changed_signal)) {
rs = kCdiStatusNotEnoughMemory;
} else if (!CdiOsSignalCreate(&poll_thread_state_ptr->connection_list_processed_signal)) {
rs = kCdiStatusNotEnoughMemory;
} else if (!CdiOsCritSectionCreate(&poll_thread_state_ptr->connection_list_lock)) {
rs = kCdiStatusNotEnoughMemory;
} else if (!CdiOsSignalCreate(&poll_thread_state_ptr->start_signal)) {
rs = kCdiStatusNotEnoughMemory;
} else {
// Add the connection to the poll thread state data so when the thread starts running it will have a
// connection to use.
PollThreadConnectionAdd(poll_thread_state_ptr, adapter_con_state_ptr);
// Create poll worker thread.
if (!CdiOsThreadCreatePinned(PollThread, &poll_thread_state_ptr->thread_id, thread_name_str,
poll_thread_state_ptr, poll_thread_state_ptr->start_signal,
config_data_ptr->thread_core_num)) {
rs = kCdiStatusCreateThreadFailed;
}
}
if (kCdiStatusOk == rs) {
// Add poll thread state data to list held by adapter.
// NOTE: Must have acquired adapter_lock before using poll_thread_list.
CdiListAddTail(&config_data_ptr->cdi_adapter_handle->poll_thread_list, &poll_thread_state_ptr->list_entry);
} else {
PollThreadDestroy(poll_thread_state_ptr, adapter_con_state_ptr->shutdown_signal);
poll_thread_state_ptr = NULL;
}
}
}
if (kCdiStatusOk != rs) {
CdiAdapterDestroyConnection(adapter_con_state_ptr);
adapter_con_state_ptr = NULL;
*return_handle_ptr = NULL;
}
CdiOsCritSectionRelease(config_data_ptr->cdi_adapter_handle->adapter_lock);
return rs;
}