void completion_port_remove()

in linux/src/completion_port_linux.c [421:483]


void completion_port_remove(COMPLETION_PORT_HANDLE completion_port, SOCKET_HANDLE socket)
{
    if (
        // Codes_SRS_COMPLETION_PORT_LINUX_11_028: [ If completion_port is NULL, completion_port_remove shall return. ]
        completion_port == NULL ||
        // Codes_SRS_COMPLETION_PORT_LINUX_11_029: [ If socket is INVALID_SOCKET, completion_port_remove shall return. ]
        socket == INVALID_SOCKET)
    {
        LogError("Invalid arguments: COMPLETION_PORT_HANDLE completion_port=%p, SOCKET_HANDLE socket=%" PRI_SOCKET "", completion_port, socket);
    }
    else
    {
        if (interlocked_add(&completion_port->worker_thread_continue, 0) != 0)
        {
            LogError("Invalid call sequence completion port module is destroying");
        }
        else
        {
            (void)interlocked_increment(&completion_port->pending_calls);

            // Codes_SRS_COMPLETION_PORT_LINUX_11_030: [ completion_port_remove shall remove the underlying socket from the epoll by calling epoll_ctl with EPOLL_CTL_DEL. ]
            if (epoll_ctl(completion_port->epoll, EPOLL_CTL_DEL, socket, NULL) == -1)
            {
                LogErrorNo("Failure epoll_ctl with EPOLL_CTL_DEL %" PRI_SOCKET "", socket);
            }

            // Codes_SRS_COMPLETION_PORT_LINUX_11_037: [ completion_port_remove shall loop through the list of EPOLL_THREAD_DATA object and call the event_callback with COMPLETION_PORT_EPOLL_ABANDONED ]
            enter_crit_section(completion_port);
            {
                PS_LIST_ENTRY current_item = completion_port->alloc_data_list.next;
                while (current_item != NULL)
                {
                    EPOLL_THREAD_DATA* epoll_data = CONTAINING_RECORD(current_item, EPOLL_THREAD_DATA, link);
                    if (epoll_data->socket == socket)
                    {
                        COMPLETION_PORT_CALLBACK_STATE callback_state = interlocked_compare_exchange(&epoll_data->event_callback_called, COMPLETION_PORT_CALLBACK_EXECUTING, COMPLETION_PORT_CALLBACK_INIT);
                        // Codes_SRS_COMPLETION_PORT_LINUX_11_038: [ If the event_callback has not been called, completion_port_remove shall call the event_callback. ]
                        if (callback_state == COMPLETION_PORT_CALLBACK_INIT)
                        {
                            epoll_data->event_callback(epoll_data->event_callback_ctx, COMPLETION_PORT_EPOLL_ABANDONED);
                            (void)interlocked_exchange(&epoll_data->event_callback_called, COMPLETION_PORT_CALLBACK_EXECUTED);
                        }
                        // Codes_SRS_COMPLETION_PORT_LINUX_11_039: [ If the event_callback is currently executing, completion_port_remove shall wait for the event_callback function to finish before completing. ]
                        else if (callback_state == COMPLETION_PORT_CALLBACK_EXECUTING)
                        {
                            // Callback is executing, so wait till the callback is done
                            (void)InterlockedHL_WaitForValue(&epoll_data->event_callback_called, COMPLETION_PORT_CALLBACK_EXECUTED, UINT32_MAX);
                        }
                        else
                        {
                            // callback state is Executed Do Nothing
                        }
                    }
                    current_item = current_item->next;
                }
            }
            leave_crit_section(completion_port);

            (void)interlocked_decrement(&completion_port->pending_calls);
            wake_by_address_single(&completion_port->pending_calls);
        }
    }
}