in src/cdi/internal_tx.c [383:582]
static CdiReturnStatus TxCreateConnection(CdiConnectionProtocolType protocol_type, CdiTxConfigData* config_data_ptr,
CdiCallback tx_cb_ptr, CdiConnectionHandle* ret_handle_ptr)
{
CdiReturnStatus rs = kCdiStatusOk;
int max_tx_payloads = CDI_MAX_SIMULTANEOUS_TX_PAYLOADS_PER_CONNECTION;
// If max_simultaneous_tx_payloads has been set use that value otherwise use
// MAX_SIMULTANEOUS_TX_PAYLOADS_PER_CONNECTION
if (config_data_ptr->max_simultaneous_tx_payloads) {
max_tx_payloads = config_data_ptr->max_simultaneous_tx_payloads;
}
int max_tx_payload_sgl_entries = CDI_MAX_SIMULTANEOUS_TX_PAYLOAD_SGL_ENTRIES_PER_CONNECTION;
// If max_simultaneous_tx_payload_sgl_entries has been set use that value otherwise use
// CDI_MAX_SIMULTANEOUS_TX_PAYLOAD_SGL_ENTRIES_PER_CONNECTION
if (config_data_ptr->max_simultaneous_tx_payload_sgl_entries) {
max_tx_payload_sgl_entries = config_data_ptr->max_simultaneous_tx_payload_sgl_entries;
}
CdiConnectionState* con_state_ptr = (CdiConnectionState*)CdiOsMemAllocZero(sizeof *con_state_ptr);
if (con_state_ptr == NULL) {
rs = kCdiStatusNotEnoughMemory;
}
if (kCdiStatusOk == rs) {
con_state_ptr->adapter_state_ptr = (CdiAdapterState*)config_data_ptr->adapter_handle;
con_state_ptr->handle_type = kHandleTypeTx;
con_state_ptr->protocol_type = protocol_type;
con_state_ptr->magic = kMagicConnection;
// Make a copy of the configuration data.
memcpy(&con_state_ptr->tx_state.config_data, config_data_ptr, sizeof *config_data_ptr);
// Make a copy of configuration data strings and update the copy of the config data to use them. NOTE: The
// connection_name_str is updated in logic below (see saved_connection_name_str).
if (config_data_ptr->dest_ip_addr_str) {
CdiOsStrCpy(con_state_ptr->tx_state.copy_dest_ip_addr_str,
sizeof(con_state_ptr->tx_state.copy_dest_ip_addr_str), config_data_ptr->dest_ip_addr_str);
con_state_ptr->tx_state.config_data.dest_ip_addr_str = con_state_ptr->tx_state.copy_dest_ip_addr_str;
}
// Save callback address.
con_state_ptr->tx_state.cb_ptr = tx_cb_ptr;
}
// Now that we have a connection logger, we can use the CDI_LOG_HANDLE() macro to add log messages to it. Since this
// thread is from the application, we cannot use the CDI_LOG_THEAD() macro.
// This log will be used by all the threads created for this connection.
if (kCdiStatusOk == rs) {
if (kLogMethodFile == config_data_ptr->connection_log_method_data_ptr->log_method) {
CDI_LOG_HANDLE(cdi_global_context.global_log_handle, kLogInfo, "Setting log file[%s] for SDK Tx logging.",
config_data_ptr->connection_log_method_data_ptr->log_filename_str);
}
if (!CdiLoggerCreateLog(cdi_global_context.logger_handle, con_state_ptr,
config_data_ptr->connection_log_method_data_ptr, &con_state_ptr->log_handle)) {
rs = kCdiStatusCreateLogFailed;
}
}
// Copy the name for the connection from the config data or generate one. NOTE: Do this here, since other logic
// below uses the saved name.
if ((NULL == config_data_ptr->connection_name_str) || (0 == strlen(config_data_ptr->connection_name_str))) {
if (NULL == config_data_ptr->dest_ip_addr_str) {
snprintf(con_state_ptr->saved_connection_name_str, sizeof(con_state_ptr->saved_connection_name_str), "%s:%d",
"unknown_ip", config_data_ptr->dest_port);
} else {
snprintf(con_state_ptr->saved_connection_name_str, sizeof(con_state_ptr->saved_connection_name_str), "%s:%d",
config_data_ptr->dest_ip_addr_str, config_data_ptr->dest_port);
}
config_data_ptr->connection_name_str = con_state_ptr->saved_connection_name_str;
CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo, "Tx connection is unnamed. Created name[%s]",
con_state_ptr->saved_connection_name_str);
} else {
CdiOsStrCpy(con_state_ptr->saved_connection_name_str, sizeof(con_state_ptr->saved_connection_name_str),
config_data_ptr->connection_name_str);
}
// Update copy of config data to use the saved connection string.
con_state_ptr->tx_state.config_data.connection_name_str = con_state_ptr->saved_connection_name_str;
if (kCdiStatusOk == rs) {
CDI_LOG_HANDLE(con_state_ptr->log_handle, kLogInfo,
"Creating Tx connection. Protocol[%s] Destination IP[%s] Destination Port[%d] Name[%s]",
CdiUtilityKeyEnumToString(kKeyConnectionProtocolType, protocol_type),
con_state_ptr->tx_state.config_data.dest_ip_addr_str,
con_state_ptr->tx_state.config_data.dest_port,
CdiGetEmptyStringIfNull(con_state_ptr->tx_state.config_data.connection_name_str));
}
if (kCdiStatusOk == rs) {
rs = ConnectionCommonResourcesCreate(con_state_ptr, config_data_ptr->stats_cb_ptr,
config_data_ptr->stats_user_cb_param, &config_data_ptr->stats_config);
}
if (kCdiStatusOk == rs) {
// Create queue used to hold Tx payload messages that are sent to the TxPayloadThread() thread. Depth must be
// less than the number of TX payloads allowed per connection to allow for proper pushback and payload state
// data management.
if (!CdiQueueCreate("TxPayloadState queue Pointer", max_tx_payloads-1,
CDI_FIXED_QUEUE_SIZE, CDI_FIXED_QUEUE_SIZE, sizeof(TxPayloadState*),
kQueueSignalPopWait | kQueueMultipleWritersFlag, // Can use wait signal for pops (reads),
// thread safe for multiple writers.
&con_state_ptr->tx_state.payload_queue_handle)) {
rs = kCdiStatusNotEnoughMemory;
}
}
if (kCdiStatusOk == rs) {
// Create worker thread.
if (!CdiOsThreadCreate(TxPayloadThread, &con_state_ptr->payload_thread_id, "TxPayload", con_state_ptr,
con_state_ptr->start_signal)) {
rs = kCdiStatusFatal;
}
}
// Create memory pools. NOTE: These pools do not use any resource locks and are therefore not thread-safe.
// TxPayloadThread() is the only user of the pools, except when restarting/shutting down the connection which is
// done by EndpointManagerThread() while TxPayloadThread() is blocked.
if (kCdiStatusOk == rs) {
if (!CdiPoolCreate("Connection Tx TxPacketWorkRequest Pool", MAX_TX_PACKET_WORK_REQUESTS_PER_CONNECTION,
MAX_TX_PACKET_WORK_REQUESTS_PER_CONNECTION_GROW, MAX_POOL_GROW_COUNT,
sizeof(TxPacketWorkRequest), false, // false= Not thread-safe (no resource locks)
&con_state_ptr->tx_state.work_request_pool_handle)) {
rs = kCdiStatusNotEnoughMemory;
}
}
if (kCdiStatusOk == rs) {
if (!CdiPoolCreate("Connection Tx CdiSglEntry Pool", TX_PACKET_SGL_ENTRY_SIZE_PER_CONNECTION, NO_GROW_SIZE,
NO_GROW_COUNT, sizeof(CdiSglEntry), false, // false= Not thread-safe (no resource locks)
&con_state_ptr->tx_state.packet_sgl_entry_pool_handle)) {
rs = kCdiStatusNotEnoughMemory;
}
}
if (kCdiStatusOk == rs) {
// There is a limit on the number of simultaneous Tx payloads per connection, so don't allow this pool to grow.
if (!CdiPoolCreate("Connection Tx Payload State Pool", max_tx_payloads,
NO_GROW_SIZE, NO_GROW_COUNT, sizeof(TxPayloadState), true, // true= Is thread-safe.
&con_state_ptr->tx_state.payload_state_pool_handle)) {
rs = kCdiStatusNotEnoughMemory;
}
}
if (kCdiStatusOk == rs) {
if (!CdiPoolCreate("Connection Tx Payload CdiSglEntry Pool",
max_tx_payload_sgl_entries, NO_GROW_SIZE, NO_GROW_COUNT,
sizeof(CdiSglEntry), true, // true= Is thread-safe.
&con_state_ptr->tx_state.payload_sgl_entry_pool_handle)) {
rs = kCdiStatusNotEnoughMemory;
}
}
if (kCdiStatusOk == rs) {
if (!CdiQueueCreate("Connection Tx TxPacketWorkRequest* Queue", MAX_TX_PACKETS_PER_CONNECTION,
TX_PACKET_POOL_SIZE_GROW, MAX_POOL_GROW_COUNT,
sizeof(CdiSinglyLinkedList), kQueueSignalPopWait, // Make a blockable reader.
&con_state_ptr->tx_state.work_req_comp_queue_handle)) {
rs = kCdiStatusNotEnoughMemory;
}
}
if (kCdiStatusOk == rs) {
// Create a packet message thread that is used by both Tx and Rx connections.
rs = ConnectionCommonPacketMessageThreadCreate(con_state_ptr, "Tx:PayloadMessage");
}
if (kCdiStatusOk == rs) {
CdiAdapterConnectionConfigData config_data = {
.cdi_adapter_handle = con_state_ptr->adapter_state_ptr,
.cdi_connection_handle = con_state_ptr,
.endpoint_manager_handle = con_state_ptr->endpoint_manager_handle,
.connection_cb_ptr = con_state_ptr->tx_state.config_data.connection_cb_ptr,
.connection_user_cb_param = con_state_ptr->tx_state.config_data.connection_user_cb_param,
.log_handle = con_state_ptr->log_handle,
.shared_thread_id = config_data_ptr->shared_thread_id,
.thread_core_num = config_data_ptr->thread_core_num,
.direction = kEndpointDirectionSend,
.port_number = con_state_ptr->tx_state.config_data.dest_port,
// This endpoint is used for normal data transmission (not used for control). This means that the Endpoint
// Manager is used for managing threads related to the connection.
.data_type = kEndpointTypeData,
};
if (kCdiStatusOk != CdiAdapterCreateConnection(&config_data, &con_state_ptr->adapter_connection_ptr)) {
rs = kCdiStatusFatal;
}
}
if (kCdiStatusOk != rs) {
ConnectionDestroyInternal((CdiConnectionHandle)con_state_ptr);
con_state_ptr = NULL;
}
*ret_handle_ptr = (CdiConnectionHandle)con_state_ptr;
return rs;
}