int completion_port_add()

in linux/src/completion_port_linux.c [319:419]


int completion_port_add(COMPLETION_PORT_HANDLE completion_port, int epoll_op, SOCKET_HANDLE socket,
    ON_COMPLETION_PORT_EVENT_COMPLETE event_callback, void* event_callback_ctx)
{
    int result;
    if (
        // Codes_SRS_COMPLETION_PORT_LINUX_11_016: [ If completion_port is NULL, completion_port_add shall return a non-NULL value. ]
        completion_port == NULL ||
        // Codes_SRS_COMPLETION_PORT_LINUX_11_017: [ If socket is INVALID_SOCKET, completion_port_add shall return a non-NULL value. ]
        socket == INVALID_SOCKET ||
        // Codes_SRS_COMPLETION_PORT_LINUX_11_018: [ If event_callback is NULL, completion_port_add shall return a non-NULL value. ]
        event_callback == NULL)
    {
        LogError("Invalid arguments: COMPLETION_PORT_HANDLE completion_port=%p, epoll_op=%d, SOCKET_HANDLE socket=%" PRI_SOCKET ", ON_COMPLETION_PORT_EVENT_COMPLETE event_callback=%p, void* event_callback_ctx=%p",
            completion_port, epoll_op, socket, event_callback, event_callback_ctx);
        result = MU_FAILURE;
    }
    else
    {
        // Codes_SRS_COMPLETION_PORT_LINUX_11_019: [ completion_port_add shall ensure the thread completion flag is not set. ]
        if (interlocked_add(&completion_port->worker_thread_continue, 0) != 0)
        {
            LogError("Invalid call sequence completion port module is destroying");
            result = MU_FAILURE;
        }
        else
        {
            // Codes_SRS_COMPLETION_PORT_LINUX_11_020: [ completion_port_add shall increment the ongoing call count value to prevent close. ]
            (void)interlocked_increment(&completion_port->pending_calls);

            // Codes_SRS_COMPLETION_PORT_LINUX_11_021: [ completion_port_add shall allocate a EPOLL_THREAD_DATA object to store thread data. ]
            EPOLL_THREAD_DATA* epoll_thread_data = malloc(sizeof(EPOLL_THREAD_DATA));
            if (epoll_thread_data == NULL)
            {
                LogError("failure allocating epoll thread data size: %zu", sizeof(EPOLL_THREAD_DATA));
                result = MU_FAILURE;
            }
            else
            {
                epoll_thread_data->event_callback = event_callback;
                epoll_thread_data->event_callback_ctx = event_callback_ctx;
                (void)interlocked_exchange(&epoll_thread_data->event_callback_called, COMPLETION_PORT_CALLBACK_INIT);
                epoll_thread_data->socket = socket;
                epoll_thread_data->link.next = NULL;

                struct epoll_event ev = {0};
                ev.events = epoll_op;
                ev.data.ptr = (void*)epoll_thread_data;
                #ifdef USE_VALGRIND
                    ANNOTATE_HAPPENS_BEFORE(epoll_thread_data);
                #endif
                // Codes_SRS_COMPLETION_PORT_LINUX_11_022: [ completion_port_add shall add the EPOLL_THREAD_DATA object to a list for later removal. ]
                if (add_thread_data_to_list(completion_port, epoll_thread_data) != 0)
                {
                    LogError("failure adding data to list");
                }
                else
                {
                    // Codes_SRS_COMPLETION_PORT_LINUX_11_023: [ completion_port_add shall add the socket in the epoll system by calling epoll_ctl with EPOLL_CTL_MOD along with the epoll_op variable. ]
                    if (epoll_ctl(completion_port->epoll, EPOLL_CTL_MOD, epoll_thread_data->socket, &ev) < 0)
                    {
                        if (errno == ENOENT)
                        {
                            // Codes_SRS_COMPLETION_PORT_LINUX_11_024: [ If the epoll_ctl call fails with ENOENT, completion_port_add shall call epoll_ctl again with EPOLL_CTL_ADD. ]
                            if (epoll_ctl(completion_port->epoll, EPOLL_CTL_ADD, epoll_thread_data->socket, &ev) < 0)
                            {
                                LogErrorNo("failure with epoll_ctl EPOLL_CTL_ADD");
                            }
                            else
                            {
                                result = 0;
                                // (void)interlocked_decrement(&completion_port->pending_calls);
                                // wake_by_address_single(&completion_port->pending_calls);
                                goto all_ok;
                            }
                        }
                        else
                        {
                            LogErrorNo("failure with epoll_ctl EPOLL_CTL_MOD");
                        }
                    }
                    else
                    {
                        // Codes_SRS_COMPLETION_PORT_LINUX_11_026: [ On success, completion_port_add shall return 0. ]
                        result = 0;
                        goto all_ok;
                    }
                    // Remove
                    remove_thread_data_from_list(completion_port, epoll_thread_data);
                }
                free(epoll_thread_data);
            }
            // Codes_SRS_COMPLETION_PORT_LINUX_11_027: [ If any error occurs, completion_port_add shall fail and return a non-zero value. ]
            result = MU_FAILURE;
all_ok:
            // Codes_SRS_COMPLETION_PORT_LINUX_11_025: [ completion_port_add shall decrement the ongoing call count value to unblock close. ]
            (void)interlocked_decrement(&completion_port->pending_calls);
            wake_by_address_single(&completion_port->pending_calls);
        }
    }
    return result;
}