CdiReturnStatus EndpointManagerTxCreateEndpoint()

in src/cdi/endpoint_manager.c [882:990]


CdiReturnStatus EndpointManagerTxCreateEndpoint(EndpointManagerHandle handle, bool is_multi_stream,
                                                const char* dest_ip_addr_str, int dest_port,
                                                const char* stream_name_str,
                                                CdiEndpointHandle* ret_endpoint_handle_ptr)
{
    CdiReturnStatus rs = kCdiStatusOk;
    EndpointManagerState* mgr_ptr = (EndpointManagerState*)handle;
    CdiConnectionState* con_ptr = mgr_ptr->connection_state_ptr;

    // Make a copy of provided stream name or copy the connection name if no stream name provided.
    char temp_stream_name_str[CDI_MAX_STREAM_NAME_STRING_LENGTH];
    const char* src_str = (stream_name_str && '\0' != stream_name_str[0])
                          ? stream_name_str : con_ptr->saved_connection_name_str;
    CdiOsStrCpy(temp_stream_name_str, sizeof(temp_stream_name_str), src_str);

    CdiOsCritSectionReserve(mgr_ptr->endpoint_list_lock);

    int stream_count = CdiListCount(&mgr_ptr->endpoint_list);
    if (stream_count > CDI_MAX_ENDPOINTS_PER_CONNECTION) {
        CDI_LOG_HANDLE(cdi_global_context.global_log_handle, kLogError,
                       "[%d] streams exceeds the maximum[%d] allowed in a single connection.",
                       stream_count, CDI_MAX_ENDPOINTS_PER_CONNECTION);
        rs = kCdiStatusInvalidParameter;
    }

    CdiEndpointState* endpoint_ptr = NULL;
    InternalEndpointState* internal_endpoint_ptr = NULL;
    if (kCdiStatusOk == rs && is_multi_stream) {
        // For multi-stream endpoints, if matching destination endpoint already exists then use it.
        CdiEndpointHandle found_handle = EndpointManagerGetFirstEndpoint(mgr_ptr);
        while (found_handle) {
            int found_dest_port = EndpointManagerEndpointRemotePortGet(found_handle);
            if (0 == CdiOsStrCmp(found_handle->remote_ip_str, dest_ip_addr_str) &&
                found_dest_port == dest_port) {
                endpoint_ptr = found_handle;
                internal_endpoint_ptr = CdiEndpointToInternalEndpoint(found_handle);
                CDI_LOG_HANDLE(cdi_global_context.global_log_handle, kLogInfo,
                               "Using existing Tx endpoint with same remote IP[%s:%d].", dest_ip_addr_str, dest_port);
                break;
            }
            found_handle = EndpointManagerGetNextEndpoint(found_handle);
        }
    }

    if (NULL == endpoint_ptr) {
        if (kCdiStatusOk == rs) {
            rs = CreateEndpointCommonResources(mgr_ptr, &internal_endpoint_ptr);
        }

        if (kCdiStatusOk == rs) {
            endpoint_ptr = &internal_endpoint_ptr->cdi_endpoint;
            struct sockaddr_in dest_addr = {
                .sin_family = AF_INET,
                .sin_port = htons(dest_port), // Convert int port to network byte order
                .sin_addr = { 0 },
                .sin_zero = { 0 }
            };
            inet_pton(AF_INET, dest_ip_addr_str, &dest_addr.sin_addr);
            EndpointManagerRemoteEndpointInfoSet(endpoint_ptr, &dest_addr, stream_name_str);

            if (!CdiOsCritSectionCreate(&endpoint_ptr->tx_state.payload_num_lock)) {
                rs = kCdiStatusNotEnoughMemory;
            }
        }

        if (kCdiStatusOk == rs) {
            // Open an endpoint to send packets to a remote host. Do this last since doing so will open the flood gates
            // for callbacks to begin.
            CdiAdapterEndpointConfigData config_data = {
                .connection_handle = con_ptr->adapter_connection_ptr,
                .cdi_endpoint_handle = endpoint_ptr,

                .msg_from_endpoint_func_ptr = TxPacketWorkRequestComplete,
                .msg_from_endpoint_param_ptr = endpoint_ptr,

                .remote_address_str = dest_ip_addr_str,
                .port_number = dest_port,
                .endpoint_stats_ptr = &endpoint_ptr->transfer_stats.endpoint_stats,
            };
            if (kCdiStatusOk != CdiAdapterOpenEndpoint(&config_data, &endpoint_ptr->adapter_endpoint_ptr)) {
                rs = kCdiStatusFatal;
            }
        }

        if (kCdiStatusOk == rs) {
            CdiOsSignalSet(con_ptr->start_signal); // Start connection threads.
            CdiAdapterStartEndpoint(endpoint_ptr->adapter_endpoint_ptr); // Start adapter endpoint threads.
            CDI_LOG_HANDLE(con_ptr->log_handle, kLogInfo, "Successfully created Tx remote IP[%s:%d] endpoint. Name[%s]",
                        dest_ip_addr_str, dest_port, con_ptr->saved_connection_name_str);

            // Protect multi-threaded access to the list.
            CdiOsCritSectionReserve(mgr_ptr->endpoint_list_lock);
            CdiListAddTail(&mgr_ptr->endpoint_list, &internal_endpoint_ptr->list_entry);
            CdiOsCritSectionRelease(mgr_ptr->endpoint_list_lock);
        } else if (endpoint_ptr) {
            DestroyEndpoint(endpoint_ptr);
            endpoint_ptr = NULL;
            internal_endpoint_ptr = NULL; // DestroyEndpoint() frees this.
        }
    }

    if (ret_endpoint_handle_ptr) {
        *ret_endpoint_handle_ptr = endpoint_ptr;
    }

    CdiOsCritSectionRelease(mgr_ptr->endpoint_list_lock);

    return rs;
}