CdiReturnStatus RxCreateInternal()

in src/cdi/internal_rx.c [548:734]


CdiReturnStatus RxCreateInternal(CdiConnectionProtocolType protocol_type, CdiRxConfigData* config_data_ptr,
                                 CdiCallback rx_cb_ptr, CdiConnectionHandle* ret_handle_ptr)
{
    CdiReturnStatus rs = kCdiStatusOk;
    CdiConnectionState* con_state_ptr = (CdiConnectionState*)CdiOsMemAllocZero(sizeof *con_state_ptr);
    if (con_state_ptr == NULL) {
        return kCdiStatusNotEnoughMemory;
    }
    int max_rx_payloads = config_data_ptr->max_simultaneous_rx_payloads_per_connection;
    if (max_rx_payloads == 0) {
        max_rx_payloads = CDI_MAX_SIMULTANEOUS_RX_PAYLOADS_PER_CONNECTION;
    }

    con_state_ptr->adapter_state_ptr = config_data_ptr->adapter_handle;
    con_state_ptr->handle_type = kHandleTypeRx;
    con_state_ptr->protocol_type = protocol_type;
    con_state_ptr->magic = kMagicConnection;
    memcpy(&con_state_ptr->rx_state.config_data, config_data_ptr, sizeof *config_data_ptr);
    con_state_ptr->rx_state.cb_ptr = rx_cb_ptr;
    // Now that we have a connection logger, we can use the CDI_LOG_HANDLE() macro to add log messages to it. Since this
    // thread is from the application, we cannot use the CDI_LOG_THREAD() macro.

    if (-1 == con_state_ptr->rx_state.config_data.buffer_delay_ms) {
        con_state_ptr->rx_state.config_data.buffer_delay_ms = CDI_ENABLED_RX_BUFFER_DELAY_DEFAULT_MS;
    } else {
        if (config_data_ptr->buffer_delay_ms > CDI_MAXIMUM_RX_BUFFER_DELAY_MS) {
            CDI_LOG_HANDLE(cdi_global_context.global_log_handle, kLogError,
                           "Buffer delay specified[%d]ms exceeds maximum allowable value[%d]ms.",
                           config_data_ptr->buffer_delay_ms, CDI_MAXIMUM_RX_BUFFER_DELAY_MS);
            rs = kCdiStatusInvalidParameter;
        } else if (config_data_ptr->buffer_delay_ms < -1) {
            CDI_LOG_HANDLE(cdi_global_context.global_log_handle, kLogError,
                           "Buffer delay specified[%d]ms is a negative value.",
                           config_data_ptr->buffer_delay_ms);
            rs = kCdiStatusInvalidParameter;
        }
    }

    // This log will be used by all the threads created for this connection.
    if (kCdiStatusOk == rs) {
        if (kLogMethodFile == config_data_ptr->connection_log_method_data_ptr->log_method) {
            CDI_LOG_HANDLE(cdi_global_context.global_log_handle, kLogInfo, "Setting log file[%s] for SDK Rx logging.",
                           config_data_ptr->connection_log_method_data_ptr->log_filename_str);
        }
        if (!CdiLoggerCreateLog(cdi_global_context.logger_handle, con_state_ptr,
                                config_data_ptr->connection_log_method_data_ptr, &con_state_ptr->log_handle)) {
            rs = kCdiStatusCreateLogFailed;
        }
    }

    if (kCdiStatusOk == rs) {
        CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo, "Creating Rx connection. Protocol[%s] Destination Port[%d] Name[%s]",
                       CdiUtilityKeyEnumToString(kKeyConnectionProtocolType, protocol_type),
                       con_state_ptr->rx_state.config_data.dest_port,
                       CdiGetEmptyStringIfNull(con_state_ptr->rx_state.config_data.connection_name_str));
        if (con_state_ptr->rx_state.config_data.buffer_delay_ms) {
            CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo, "Using Rx buffer delay[%d]ms.",
                           con_state_ptr->rx_state.config_data.buffer_delay_ms);
        }
    }

    // Copy the name for the connection from the config data or generate one. NOTE: Do this here, since other logic
    // below uses the saved name.
    if ((NULL == config_data_ptr->connection_name_str) || (0 == strlen(config_data_ptr->connection_name_str))) {
        snprintf(con_state_ptr->saved_connection_name_str, sizeof(con_state_ptr->saved_connection_name_str), "dest%d",
                 config_data_ptr->dest_port);

        config_data_ptr->connection_name_str = con_state_ptr->saved_connection_name_str;

        CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo, "Rx connection is unnamed. Created name[%s]",
                       con_state_ptr->saved_connection_name_str);
    } else {
        CdiOsStrCpy(con_state_ptr->saved_connection_name_str, sizeof(con_state_ptr->saved_connection_name_str),
                    con_state_ptr->rx_state.config_data.connection_name_str);
    }

    // Update copy of config data to use the saved connection string.
    con_state_ptr->rx_state.config_data.connection_name_str = con_state_ptr->saved_connection_name_str;

    if (kCdiStatusOk == rs) {
        rs = ConnectionCommonResourcesCreate(con_state_ptr, config_data_ptr->stats_cb_ptr,
                                             config_data_ptr->stats_user_cb_param, &config_data_ptr->stats_config);
    }

    int reserve_packet_buffers = MAX_RX_PACKETS_PER_CONNECTION;
    if (con_state_ptr->rx_state.config_data.buffer_delay_ms) {
        // Rx buffer delay is enabled, so we need to allocate additional Rx buffers.
        reserve_packet_buffers += (MAX_RX_PACKETS_PER_CONNECTION * con_state_ptr->rx_state.config_data.buffer_delay_ms)
                                  / CDI_RX_BUFFER_DELAY_BUFFER_MS_DIVISOR;
    }

    if (kCdiStatusOk == rs) {
        if (!CdiPoolCreate("Connection Rx CdiSglEntry Pool", reserve_packet_buffers,
                           MAX_RX_PACKETS_PER_CONNECTION_GROW, MAX_POOL_GROW_COUNT,
                           sizeof(CdiSglEntry), true, // true= Make thread-safe
                           &con_state_ptr->rx_state.payload_sgl_entry_pool_handle)) {
            rs = kCdiStatusNotEnoughMemory;
        }
    }

    if (kCdiStatusOk == rs) {
        if (!CdiPoolCreate("Rx CdiReorderList Out of Order Pool", MAX_RX_OUT_OF_ORDER,
                           MAX_RX_OUT_OF_ORDER_GROW, MAX_POOL_GROW_COUNT,
                           sizeof(CdiReorderList), true, // true= Make thread-safe
                           &con_state_ptr->rx_state.reorder_entries_pool_handle)) {
            rs = kCdiStatusNotEnoughMemory;
        }
    }

    if (kCdiStatusOk == rs && kCdiLinearBuffer == config_data_ptr->rx_buffer_type) {
        // Allocate an extra couple of buffers for payloads being reassembled.
        if (!CdiPoolCreate("Rx Linear Buffer Pool", RX_LINEAR_BUFFER_COUNT + 2, NO_GROW_SIZE, NO_GROW_COUNT,
                           config_data_ptr->linear_buffer_size, true, &con_state_ptr->linear_buffer_pool)) {
            rs = kCdiStatusNotEnoughMemory;
        }
    }

    if (kCdiStatusOk == rs) {
        // Set up receive buffer handling if enabled; either way, set payload complete queue to point to the right one.
        if (0 != con_state_ptr->rx_state.config_data.buffer_delay_ms) {
            rs = RxBufferInit(con_state_ptr->log_handle, con_state_ptr->error_message_pool,
                              con_state_ptr->rx_state.config_data.buffer_delay_ms, max_rx_payloads,
                              con_state_ptr->app_payload_message_queue_handle,
                              &con_state_ptr->rx_state.receive_buffer_handle,
                              &con_state_ptr->rx_state.active_payload_complete_queue_handle);
        } else {
            // No receive buffer so send payloads directly to application callback thread's input queue.
            con_state_ptr->rx_state.active_payload_complete_queue_handle =
                con_state_ptr->app_payload_message_queue_handle;
        }
    }

    // NOTE: The pools at rx_state.rx_payload_state_pool_handle and rx_state.payload_memory_state_pool_handle are
    // created dynamically in RxEndpointCreateDynamicPools() based on the protocol version being used.

    if (kCdiStatusOk == rs) {
        // Create a packet message thread that is used by both Tx and Rx connections.
        rs = ConnectionCommonPacketMessageThreadCreate(con_state_ptr, "Rx:PayloadMessage");
    }

    if (kCdiStatusOk == rs) {
        // Open an connection to receive packets from a remote host.
        CdiAdapterConnectionConfigData config_data = {
            .cdi_adapter_handle = con_state_ptr->adapter_state_ptr,
            .cdi_connection_handle = con_state_ptr,
            .endpoint_manager_handle = con_state_ptr->endpoint_manager_handle,

            .connection_cb_ptr = config_data_ptr->connection_cb_ptr,
            .connection_user_cb_param = config_data_ptr->connection_user_cb_param,

            .log_handle = con_state_ptr->log_handle,
            .port_number = config_data_ptr->dest_port,
            .shared_thread_id = config_data_ptr->shared_thread_id,
            .thread_core_num = config_data_ptr->thread_core_num,

            .direction = kEndpointDirectionReceive,
            .rx_state.reserve_packet_buffers = reserve_packet_buffers,

            // This endpoint is used for normal data transmission (not used for control). This means that the Endpoint
            // Manager is used for managing threads related to the connection.
            .data_type = kEndpointTypeData,
        };
        if (kCdiStatusOk != CdiAdapterCreateConnection(&config_data, &con_state_ptr->adapter_connection_ptr)) {
            rs = kCdiStatusFatal;
        }
    }

    // Socket adapter does not dynamically create Rx endpoints, so create it here.
    if (kCdiStatusOk == rs && kCdiAdapterTypeSocket == config_data_ptr->adapter_handle->adapter_data.adapter_type) {
        rs = EndpointManagerRxCreateEndpoint(con_state_ptr->endpoint_manager_handle, config_data_ptr->dest_port, NULL);
    }

    if (kCdiStatusOk == rs) {
        CdiOsSignalSet(con_state_ptr->start_signal); // Start connection threads.
        CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo, "Successfully created Rx connection. Name[%s]",
                       con_state_ptr->saved_connection_name_str);
    }

    if (kCdiStatusOk != rs) {
        ConnectionDestroyInternal((CdiConnectionHandle)con_state_ptr);
        con_state_ptr = NULL;
    }

    *ret_handle_ptr = (CdiConnectionHandle)con_state_ptr;

    return rs;
}