static CdiReturnStatus TxCreateConnection()

in src/cdi/internal_tx.c [383:582]


static CdiReturnStatus TxCreateConnection(CdiConnectionProtocolType protocol_type, CdiTxConfigData* config_data_ptr,
                                          CdiCallback tx_cb_ptr, CdiConnectionHandle* ret_handle_ptr)
{
    CdiReturnStatus rs = kCdiStatusOk;

    int max_tx_payloads = CDI_MAX_SIMULTANEOUS_TX_PAYLOADS_PER_CONNECTION;

    // If max_simultaneous_tx_payloads has been set use that value otherwise use
    // MAX_SIMULTANEOUS_TX_PAYLOADS_PER_CONNECTION
    if (config_data_ptr->max_simultaneous_tx_payloads) {
        max_tx_payloads = config_data_ptr->max_simultaneous_tx_payloads;
    }

    int max_tx_payload_sgl_entries = CDI_MAX_SIMULTANEOUS_TX_PAYLOAD_SGL_ENTRIES_PER_CONNECTION;

    // If max_simultaneous_tx_payload_sgl_entries has been set use that value otherwise use
    // CDI_MAX_SIMULTANEOUS_TX_PAYLOAD_SGL_ENTRIES_PER_CONNECTION
    if (config_data_ptr->max_simultaneous_tx_payload_sgl_entries) {
        max_tx_payload_sgl_entries = config_data_ptr->max_simultaneous_tx_payload_sgl_entries;
    }

    CdiConnectionState* con_state_ptr = (CdiConnectionState*)CdiOsMemAllocZero(sizeof *con_state_ptr);
    if (con_state_ptr == NULL) {
        rs = kCdiStatusNotEnoughMemory;
    }

    if (kCdiStatusOk == rs) {
        con_state_ptr->adapter_state_ptr = (CdiAdapterState*)config_data_ptr->adapter_handle;
        con_state_ptr->handle_type = kHandleTypeTx;
        con_state_ptr->protocol_type = protocol_type;
        con_state_ptr->magic = kMagicConnection;

        // Make a copy of the configuration data.
        memcpy(&con_state_ptr->tx_state.config_data, config_data_ptr, sizeof *config_data_ptr);

        // Make a copy of configuration data strings and update the copy of the config data to use them. NOTE: The
        // connection_name_str is updated in logic below (see saved_connection_name_str).
        if (config_data_ptr->dest_ip_addr_str) {
            CdiOsStrCpy(con_state_ptr->tx_state.copy_dest_ip_addr_str,
                        sizeof(con_state_ptr->tx_state.copy_dest_ip_addr_str), config_data_ptr->dest_ip_addr_str);
            con_state_ptr->tx_state.config_data.dest_ip_addr_str = con_state_ptr->tx_state.copy_dest_ip_addr_str;
        }

        // Save callback address.
        con_state_ptr->tx_state.cb_ptr = tx_cb_ptr;
    }
    // Now that we have a connection logger, we can use the CDI_LOG_HANDLE() macro to add log messages to it. Since this
    // thread is from the application, we cannot use the CDI_LOG_THEAD() macro.

    // This log will be used by all the threads created for this connection.
    if (kCdiStatusOk == rs) {
        if (kLogMethodFile == config_data_ptr->connection_log_method_data_ptr->log_method) {
            CDI_LOG_HANDLE(cdi_global_context.global_log_handle, kLogInfo, "Setting log file[%s] for SDK Tx logging.",
                             config_data_ptr->connection_log_method_data_ptr->log_filename_str);
        }
        if (!CdiLoggerCreateLog(cdi_global_context.logger_handle, con_state_ptr,
                                config_data_ptr->connection_log_method_data_ptr, &con_state_ptr->log_handle)) {
            rs = kCdiStatusCreateLogFailed;
        }
    }

    // Copy the name for the connection from the config data or generate one. NOTE: Do this here, since other logic
    // below uses the saved name.
    if ((NULL == config_data_ptr->connection_name_str) || (0 == strlen(config_data_ptr->connection_name_str))) {
        if (NULL == config_data_ptr->dest_ip_addr_str) {
            snprintf(con_state_ptr->saved_connection_name_str, sizeof(con_state_ptr->saved_connection_name_str), "%s:%d",
                     "unknown_ip", config_data_ptr->dest_port);
        } else {
            snprintf(con_state_ptr->saved_connection_name_str, sizeof(con_state_ptr->saved_connection_name_str), "%s:%d",
                     config_data_ptr->dest_ip_addr_str, config_data_ptr->dest_port);
        }

        config_data_ptr->connection_name_str = con_state_ptr->saved_connection_name_str;

        CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo, "Tx connection is unnamed. Created name[%s]",
                       con_state_ptr->saved_connection_name_str);
    } else {
        CdiOsStrCpy(con_state_ptr->saved_connection_name_str, sizeof(con_state_ptr->saved_connection_name_str),
                    config_data_ptr->connection_name_str);
    }
    // Update copy of config data to use the saved connection string.
    con_state_ptr->tx_state.config_data.connection_name_str = con_state_ptr->saved_connection_name_str;

    if (kCdiStatusOk == rs) {
        CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo,
                       "Creating Tx connection. Protocol[%s] Destination IP[%s] Destination Port[%d] Name[%s]",
                       CdiUtilityKeyEnumToString(kKeyConnectionProtocolType, protocol_type),
                       con_state_ptr->tx_state.config_data.dest_ip_addr_str,
                       con_state_ptr->tx_state.config_data.dest_port,
                       CdiGetEmptyStringIfNull(con_state_ptr->tx_state.config_data.connection_name_str));
    }

    if (kCdiStatusOk == rs) {
        rs = ConnectionCommonResourcesCreate(con_state_ptr, config_data_ptr->stats_cb_ptr,
                                             config_data_ptr->stats_user_cb_param, &config_data_ptr->stats_config);
    }

    if (kCdiStatusOk == rs) {
        // Create queue used to hold Tx payload messages that are sent to the TxPayloadThread() thread. Depth must be
        // less than the number of TX payloads allowed per connection to allow for proper pushback and payload state
        // data management.
        if (!CdiQueueCreate("TxPayloadState queue Pointer", max_tx_payloads-1,
                            CDI_FIXED_QUEUE_SIZE, CDI_FIXED_QUEUE_SIZE, sizeof(TxPayloadState*),
                            kQueueSignalPopWait | kQueueMultipleWritersFlag, // Can use wait signal for pops (reads),
                                                                             // thread safe for multiple writers.
                            &con_state_ptr->tx_state.payload_queue_handle)) {
            rs = kCdiStatusNotEnoughMemory;
        }
    }

    if (kCdiStatusOk == rs) {
        // Create worker thread.
        if (!CdiOsThreadCreate(TxPayloadThread, &con_state_ptr->payload_thread_id, "TxPayload", con_state_ptr,
                                con_state_ptr->start_signal)) {
            rs = kCdiStatusFatal;
        }
    }

    // Create memory pools. NOTE: These pools do not use any resource locks and are therefore not thread-safe.
    // TxPayloadThread() is the only user of the pools, except when restarting/shutting down the connection which is
    // done by EndpointManagerThread() while TxPayloadThread() is blocked.
    if (kCdiStatusOk == rs) {
        if (!CdiPoolCreate("Connection Tx TxPacketWorkRequest Pool", MAX_TX_PACKET_WORK_REQUESTS_PER_CONNECTION,
                           MAX_TX_PACKET_WORK_REQUESTS_PER_CONNECTION_GROW, MAX_POOL_GROW_COUNT,
                           sizeof(TxPacketWorkRequest), false, // false= Not thread-safe (no resource locks)
                           &con_state_ptr->tx_state.work_request_pool_handle)) {
            rs = kCdiStatusNotEnoughMemory;
        }
    }
    if (kCdiStatusOk == rs) {
        if (!CdiPoolCreate("Connection Tx CdiSglEntry Pool", TX_PACKET_SGL_ENTRY_SIZE_PER_CONNECTION, NO_GROW_SIZE,
                           NO_GROW_COUNT, sizeof(CdiSglEntry), false, // false= Not thread-safe (no resource locks)
                           &con_state_ptr->tx_state.packet_sgl_entry_pool_handle)) {
            rs = kCdiStatusNotEnoughMemory;
        }
    }
    if (kCdiStatusOk == rs) {
        // There is a limit on the number of simultaneous Tx payloads per connection, so don't allow this pool to grow.
        if (!CdiPoolCreate("Connection Tx Payload State Pool", max_tx_payloads,
                           NO_GROW_SIZE, NO_GROW_COUNT, sizeof(TxPayloadState), true, // true= Is thread-safe.
                           &con_state_ptr->tx_state.payload_state_pool_handle)) {
            rs = kCdiStatusNotEnoughMemory;
        }
    }
    if (kCdiStatusOk == rs) {
        if (!CdiPoolCreate("Connection Tx Payload CdiSglEntry Pool",
                           max_tx_payload_sgl_entries, NO_GROW_SIZE, NO_GROW_COUNT,
                           sizeof(CdiSglEntry), true, // true= Is thread-safe.
                           &con_state_ptr->tx_state.payload_sgl_entry_pool_handle)) {
            rs = kCdiStatusNotEnoughMemory;
        }
    }

    if (kCdiStatusOk == rs) {
        if (!CdiQueueCreate("Connection Tx TxPacketWorkRequest* Queue", MAX_TX_PACKETS_PER_CONNECTION,
                            TX_PACKET_POOL_SIZE_GROW, MAX_POOL_GROW_COUNT,
                            sizeof(CdiSinglyLinkedList), kQueueSignalPopWait, // Make a blockable reader.
                            &con_state_ptr->tx_state.work_req_comp_queue_handle)) {
            rs = kCdiStatusNotEnoughMemory;
        }
    }

    if (kCdiStatusOk == rs) {
        // Create a packet message thread that is used by both Tx and Rx connections.
        rs = ConnectionCommonPacketMessageThreadCreate(con_state_ptr, "Tx:PayloadMessage");
    }

    if (kCdiStatusOk == rs) {
        CdiAdapterConnectionConfigData config_data = {
            .cdi_adapter_handle = con_state_ptr->adapter_state_ptr,
            .cdi_connection_handle = con_state_ptr,
            .endpoint_manager_handle = con_state_ptr->endpoint_manager_handle,

            .connection_cb_ptr = con_state_ptr->tx_state.config_data.connection_cb_ptr,
            .connection_user_cb_param = con_state_ptr->tx_state.config_data.connection_user_cb_param,

            .log_handle = con_state_ptr->log_handle,
            .shared_thread_id = config_data_ptr->shared_thread_id,
            .thread_core_num = config_data_ptr->thread_core_num,
            .direction = kEndpointDirectionSend,
            .port_number = con_state_ptr->tx_state.config_data.dest_port,

            // This endpoint is used for normal data transmission (not used for control). This means that the Endpoint
            // Manager is used for managing threads related to the connection.
            .data_type = kEndpointTypeData,
        };
        if (kCdiStatusOk != CdiAdapterCreateConnection(&config_data, &con_state_ptr->adapter_connection_ptr)) {
            rs = kCdiStatusFatal;
        }
    }

    if (kCdiStatusOk != rs) {
        ConnectionDestroyInternal((CdiConnectionHandle)con_state_ptr);
        con_state_ptr = NULL;
    }

    *ret_handle_ptr = (CdiConnectionHandle)con_state_ptr;

    return rs;
}