in src/cdi/internal_rx.c [548:734]
CdiReturnStatus RxCreateInternal(CdiConnectionProtocolType protocol_type, CdiRxConfigData* config_data_ptr,
CdiCallback rx_cb_ptr, CdiConnectionHandle* ret_handle_ptr)
{
CdiReturnStatus rs = kCdiStatusOk;
CdiConnectionState* con_state_ptr = (CdiConnectionState*)CdiOsMemAllocZero(sizeof *con_state_ptr);
if (con_state_ptr == NULL) {
return kCdiStatusNotEnoughMemory;
}
int max_rx_payloads = config_data_ptr->max_simultaneous_rx_payloads_per_connection;
if (max_rx_payloads == 0) {
max_rx_payloads = CDI_MAX_SIMULTANEOUS_RX_PAYLOADS_PER_CONNECTION;
}
con_state_ptr->adapter_state_ptr = config_data_ptr->adapter_handle;
con_state_ptr->handle_type = kHandleTypeRx;
con_state_ptr->protocol_type = protocol_type;
con_state_ptr->magic = kMagicConnection;
memcpy(&con_state_ptr->rx_state.config_data, config_data_ptr, sizeof *config_data_ptr);
con_state_ptr->rx_state.cb_ptr = rx_cb_ptr;
// Now that we have a connection logger, we can use the CDI_LOG_HANDLE() macro to add log messages to it. Since this
// thread is from the application, we cannot use the CDI_LOG_THREAD() macro.
if (-1 == con_state_ptr->rx_state.config_data.buffer_delay_ms) {
con_state_ptr->rx_state.config_data.buffer_delay_ms = CDI_ENABLED_RX_BUFFER_DELAY_DEFAULT_MS;
} else {
if (config_data_ptr->buffer_delay_ms > CDI_MAXIMUM_RX_BUFFER_DELAY_MS) {
CDI_LOG_HANDLE(cdi_global_context.global_log_handle, kLogError,
"Buffer delay specified[%d]ms exceeds maximum allowable value[%d]ms.",
config_data_ptr->buffer_delay_ms, CDI_MAXIMUM_RX_BUFFER_DELAY_MS);
rs = kCdiStatusInvalidParameter;
} else if (config_data_ptr->buffer_delay_ms < -1) {
CDI_LOG_HANDLE(cdi_global_context.global_log_handle, kLogError,
"Buffer delay specified[%d]ms is a negative value.",
config_data_ptr->buffer_delay_ms);
rs = kCdiStatusInvalidParameter;
}
}
// This log will be used by all the threads created for this connection.
if (kCdiStatusOk == rs) {
if (kLogMethodFile == config_data_ptr->connection_log_method_data_ptr->log_method) {
CDI_LOG_HANDLE(cdi_global_context.global_log_handle, kLogInfo, "Setting log file[%s] for SDK Rx logging.",
config_data_ptr->connection_log_method_data_ptr->log_filename_str);
}
if (!CdiLoggerCreateLog(cdi_global_context.logger_handle, con_state_ptr,
config_data_ptr->connection_log_method_data_ptr, &con_state_ptr->log_handle)) {
rs = kCdiStatusCreateLogFailed;
}
}
if (kCdiStatusOk == rs) {
CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo, "Creating Rx connection. Protocol[%s] Destination Port[%d] Name[%s]",
CdiUtilityKeyEnumToString(kKeyConnectionProtocolType, protocol_type),
con_state_ptr->rx_state.config_data.dest_port,
CdiGetEmptyStringIfNull(con_state_ptr->rx_state.config_data.connection_name_str));
if (con_state_ptr->rx_state.config_data.buffer_delay_ms) {
CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo, "Using Rx buffer delay[%d]ms.",
con_state_ptr->rx_state.config_data.buffer_delay_ms);
}
}
// Copy the name for the connection from the config data or generate one. NOTE: Do this here, since other logic
// below uses the saved name.
if ((NULL == config_data_ptr->connection_name_str) || (0 == strlen(config_data_ptr->connection_name_str))) {
snprintf(con_state_ptr->saved_connection_name_str, sizeof(con_state_ptr->saved_connection_name_str), "dest%d",
config_data_ptr->dest_port);
config_data_ptr->connection_name_str = con_state_ptr->saved_connection_name_str;
CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo, "Rx connection is unnamed. Created name[%s]",
con_state_ptr->saved_connection_name_str);
} else {
CdiOsStrCpy(con_state_ptr->saved_connection_name_str, sizeof(con_state_ptr->saved_connection_name_str),
con_state_ptr->rx_state.config_data.connection_name_str);
}
// Update copy of config data to use the saved connection string.
con_state_ptr->rx_state.config_data.connection_name_str = con_state_ptr->saved_connection_name_str;
if (kCdiStatusOk == rs) {
rs = ConnectionCommonResourcesCreate(con_state_ptr, config_data_ptr->stats_cb_ptr,
config_data_ptr->stats_user_cb_param, &config_data_ptr->stats_config);
}
int reserve_packet_buffers = MAX_RX_PACKETS_PER_CONNECTION;
if (con_state_ptr->rx_state.config_data.buffer_delay_ms) {
// Rx buffer delay is enabled, so we need to allocate additional Rx buffers.
reserve_packet_buffers += (MAX_RX_PACKETS_PER_CONNECTION * con_state_ptr->rx_state.config_data.buffer_delay_ms)
/ CDI_RX_BUFFER_DELAY_BUFFER_MS_DIVISOR;
}
if (kCdiStatusOk == rs) {
if (!CdiPoolCreate("Connection Rx CdiSglEntry Pool", reserve_packet_buffers,
MAX_RX_PACKETS_PER_CONNECTION_GROW, MAX_POOL_GROW_COUNT,
sizeof(CdiSglEntry), true, // true= Make thread-safe
&con_state_ptr->rx_state.payload_sgl_entry_pool_handle)) {
rs = kCdiStatusNotEnoughMemory;
}
}
if (kCdiStatusOk == rs) {
if (!CdiPoolCreate("Rx CdiReorderList Out of Order Pool", MAX_RX_OUT_OF_ORDER,
MAX_RX_OUT_OF_ORDER_GROW, MAX_POOL_GROW_COUNT,
sizeof(CdiReorderList), true, // true= Make thread-safe
&con_state_ptr->rx_state.reorder_entries_pool_handle)) {
rs = kCdiStatusNotEnoughMemory;
}
}
if (kCdiStatusOk == rs && kCdiLinearBuffer == config_data_ptr->rx_buffer_type) {
// Allocate an extra couple of buffers for payloads being reassembled.
if (!CdiPoolCreate("Rx Linear Buffer Pool", RX_LINEAR_BUFFER_COUNT + 2, NO_GROW_SIZE, NO_GROW_COUNT,
config_data_ptr->linear_buffer_size, true, &con_state_ptr->linear_buffer_pool)) {
rs = kCdiStatusNotEnoughMemory;
}
}
if (kCdiStatusOk == rs) {
// Set up receive buffer handling if enabled; either way, set payload complete queue to point to the right one.
if (0 != con_state_ptr->rx_state.config_data.buffer_delay_ms) {
rs = RxBufferInit(con_state_ptr->log_handle, con_state_ptr->error_message_pool,
con_state_ptr->rx_state.config_data.buffer_delay_ms, max_rx_payloads,
con_state_ptr->app_payload_message_queue_handle,
&con_state_ptr->rx_state.receive_buffer_handle,
&con_state_ptr->rx_state.active_payload_complete_queue_handle);
} else {
// No receive buffer so send payloads directly to application callback thread's input queue.
con_state_ptr->rx_state.active_payload_complete_queue_handle =
con_state_ptr->app_payload_message_queue_handle;
}
}
// NOTE: The pools at rx_state.rx_payload_state_pool_handle and rx_state.payload_memory_state_pool_handle are
// created dynamically in RxEndpointCreateDynamicPools() based on the protocol version being used.
if (kCdiStatusOk == rs) {
// Create a packet message thread that is used by both Tx and Rx connections.
rs = ConnectionCommonPacketMessageThreadCreate(con_state_ptr, "Rx:PayloadMessage");
}
if (kCdiStatusOk == rs) {
// Open an connection to receive packets from a remote host.
CdiAdapterConnectionConfigData config_data = {
.cdi_adapter_handle = con_state_ptr->adapter_state_ptr,
.cdi_connection_handle = con_state_ptr,
.endpoint_manager_handle = con_state_ptr->endpoint_manager_handle,
.connection_cb_ptr = config_data_ptr->connection_cb_ptr,
.connection_user_cb_param = config_data_ptr->connection_user_cb_param,
.log_handle = con_state_ptr->log_handle,
.port_number = config_data_ptr->dest_port,
.shared_thread_id = config_data_ptr->shared_thread_id,
.thread_core_num = config_data_ptr->thread_core_num,
.direction = kEndpointDirectionReceive,
.rx_state.reserve_packet_buffers = reserve_packet_buffers,
// This endpoint is used for normal data transmission (not used for control). This means that the Endpoint
// Manager is used for managing threads related to the connection.
.data_type = kEndpointTypeData,
};
if (kCdiStatusOk != CdiAdapterCreateConnection(&config_data, &con_state_ptr->adapter_connection_ptr)) {
rs = kCdiStatusFatal;
}
}
// Socket adapter does not dynamically create Rx endpoints, so create it here.
if (kCdiStatusOk == rs && kCdiAdapterTypeSocket == config_data_ptr->adapter_handle->adapter_data.adapter_type) {
rs = EndpointManagerRxCreateEndpoint(con_state_ptr->endpoint_manager_handle, config_data_ptr->dest_port, NULL);
}
if (kCdiStatusOk == rs) {
CdiOsSignalSet(con_state_ptr->start_signal); // Start connection threads.
CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo, "Successfully created Rx connection. Name[%s]",
con_state_ptr->saved_connection_name_str);
}
if (kCdiStatusOk != rs) {
ConnectionDestroyInternal((CdiConnectionHandle)con_state_ptr);
con_state_ptr = NULL;
}
*ret_handle_ptr = (CdiConnectionHandle)con_state_ptr;
return rs;
}