in src/cdi/endpoint_manager.c [882:990]
CdiReturnStatus EndpointManagerTxCreateEndpoint(EndpointManagerHandle handle, bool is_multi_stream,
const char* dest_ip_addr_str, int dest_port,
const char* stream_name_str,
CdiEndpointHandle* ret_endpoint_handle_ptr)
{
CdiReturnStatus rs = kCdiStatusOk;
EndpointManagerState* mgr_ptr = (EndpointManagerState*)handle;
CdiConnectionState* con_ptr = mgr_ptr->connection_state_ptr;
// Make a copy of provided stream name or copy the connection name if no stream name provided.
char temp_stream_name_str[CDI_MAX_STREAM_NAME_STRING_LENGTH];
const char* src_str = (stream_name_str && '\0' != stream_name_str[0])
? stream_name_str : con_ptr->saved_connection_name_str;
CdiOsStrCpy(temp_stream_name_str, sizeof(temp_stream_name_str), src_str);
CdiOsCritSectionReserve(mgr_ptr->endpoint_list_lock);
int stream_count = CdiListCount(&mgr_ptr->endpoint_list);
if (stream_count > CDI_MAX_ENDPOINTS_PER_CONNECTION) {
CDI_LOG_HANDLE(cdi_global_context.global_log_handle, kLogError,
"[%d] streams exceeds the maximum[%d] allowed in a single connection.",
stream_count, CDI_MAX_ENDPOINTS_PER_CONNECTION);
rs = kCdiStatusInvalidParameter;
}
CdiEndpointState* endpoint_ptr = NULL;
InternalEndpointState* internal_endpoint_ptr = NULL;
if (kCdiStatusOk == rs && is_multi_stream) {
// For multi-stream endpoints, if matching destination endpoint already exists then use it.
CdiEndpointHandle found_handle = EndpointManagerGetFirstEndpoint(mgr_ptr);
while (found_handle) {
int found_dest_port = EndpointManagerEndpointRemotePortGet(found_handle);
if (0 == CdiOsStrCmp(found_handle->remote_ip_str, dest_ip_addr_str) &&
found_dest_port == dest_port) {
endpoint_ptr = found_handle;
internal_endpoint_ptr = CdiEndpointToInternalEndpoint(found_handle);
CDI_LOG_HANDLE(cdi_global_context.global_log_handle, kLogInfo,
"Using existing Tx endpoint with same remote IP[%s:%d].", dest_ip_addr_str, dest_port);
break;
}
found_handle = EndpointManagerGetNextEndpoint(found_handle);
}
}
if (NULL == endpoint_ptr) {
if (kCdiStatusOk == rs) {
rs = CreateEndpointCommonResources(mgr_ptr, &internal_endpoint_ptr);
}
if (kCdiStatusOk == rs) {
endpoint_ptr = &internal_endpoint_ptr->cdi_endpoint;
struct sockaddr_in dest_addr = {
.sin_family = AF_INET,
.sin_port = htons(dest_port), // Convert int port to network byte order
.sin_addr = { 0 },
.sin_zero = { 0 }
};
inet_pton(AF_INET, dest_ip_addr_str, &dest_addr.sin_addr);
EndpointManagerRemoteEndpointInfoSet(endpoint_ptr, &dest_addr, stream_name_str);
if (!CdiOsCritSectionCreate(&endpoint_ptr->tx_state.payload_num_lock)) {
rs = kCdiStatusNotEnoughMemory;
}
}
if (kCdiStatusOk == rs) {
// Open an endpoint to send packets to a remote host. Do this last since doing so will open the flood gates
// for callbacks to begin.
CdiAdapterEndpointConfigData config_data = {
.connection_handle = con_ptr->adapter_connection_ptr,
.cdi_endpoint_handle = endpoint_ptr,
.msg_from_endpoint_func_ptr = TxPacketWorkRequestComplete,
.msg_from_endpoint_param_ptr = endpoint_ptr,
.remote_address_str = dest_ip_addr_str,
.port_number = dest_port,
.endpoint_stats_ptr = &endpoint_ptr->transfer_stats.endpoint_stats,
};
if (kCdiStatusOk != CdiAdapterOpenEndpoint(&config_data, &endpoint_ptr->adapter_endpoint_ptr)) {
rs = kCdiStatusFatal;
}
}
if (kCdiStatusOk == rs) {
CdiOsSignalSet(con_ptr->start_signal); // Start connection threads.
CdiAdapterStartEndpoint(endpoint_ptr->adapter_endpoint_ptr); // Start adapter endpoint threads.
CDI_LOG_HANDLE(con_ptr->log_handle, kLogInfo, "Successfully created Tx remote IP[%s:%d] endpoint. Name[%s]",
dest_ip_addr_str, dest_port, con_ptr->saved_connection_name_str);
// Protect multi-threaded access to the list.
CdiOsCritSectionReserve(mgr_ptr->endpoint_list_lock);
CdiListAddTail(&mgr_ptr->endpoint_list, &internal_endpoint_ptr->list_entry);
CdiOsCritSectionRelease(mgr_ptr->endpoint_list_lock);
} else if (endpoint_ptr) {
DestroyEndpoint(endpoint_ptr);
endpoint_ptr = NULL;
internal_endpoint_ptr = NULL; // DestroyEndpoint() frees this.
}
}
if (ret_endpoint_handle_ptr) {
*ret_endpoint_handle_ptr = endpoint_ptr;
}
CdiOsCritSectionRelease(mgr_ptr->endpoint_list_lock);
return rs;
}