in linux/src/completion_port_linux.c [421:483]
void completion_port_remove(COMPLETION_PORT_HANDLE completion_port, SOCKET_HANDLE socket)
{
if (
// Codes_SRS_COMPLETION_PORT_LINUX_11_028: [ If completion_port is NULL, completion_port_remove shall return. ]
completion_port == NULL ||
// Codes_SRS_COMPLETION_PORT_LINUX_11_029: [ If socket is INVALID_SOCKET, completion_port_remove shall return. ]
socket == INVALID_SOCKET)
{
LogError("Invalid arguments: COMPLETION_PORT_HANDLE completion_port=%p, SOCKET_HANDLE socket=%" PRI_SOCKET "", completion_port, socket);
}
else
{
if (interlocked_add(&completion_port->worker_thread_continue, 0) != 0)
{
LogError("Invalid call sequence completion port module is destroying");
}
else
{
(void)interlocked_increment(&completion_port->pending_calls);
// Codes_SRS_COMPLETION_PORT_LINUX_11_030: [ completion_port_remove shall remove the underlying socket from the epoll by calling epoll_ctl with EPOLL_CTL_DEL. ]
if (epoll_ctl(completion_port->epoll, EPOLL_CTL_DEL, socket, NULL) == -1)
{
LogErrorNo("Failure epoll_ctl with EPOLL_CTL_DEL %" PRI_SOCKET "", socket);
}
// Codes_SRS_COMPLETION_PORT_LINUX_11_037: [ completion_port_remove shall loop through the list of EPOLL_THREAD_DATA object and call the event_callback with COMPLETION_PORT_EPOLL_ABANDONED ]
enter_crit_section(completion_port);
{
PS_LIST_ENTRY current_item = completion_port->alloc_data_list.next;
while (current_item != NULL)
{
EPOLL_THREAD_DATA* epoll_data = CONTAINING_RECORD(current_item, EPOLL_THREAD_DATA, link);
if (epoll_data->socket == socket)
{
COMPLETION_PORT_CALLBACK_STATE callback_state = interlocked_compare_exchange(&epoll_data->event_callback_called, COMPLETION_PORT_CALLBACK_EXECUTING, COMPLETION_PORT_CALLBACK_INIT);
// Codes_SRS_COMPLETION_PORT_LINUX_11_038: [ If the event_callback has not been called, completion_port_remove shall call the event_callback. ]
if (callback_state == COMPLETION_PORT_CALLBACK_INIT)
{
epoll_data->event_callback(epoll_data->event_callback_ctx, COMPLETION_PORT_EPOLL_ABANDONED);
(void)interlocked_exchange(&epoll_data->event_callback_called, COMPLETION_PORT_CALLBACK_EXECUTED);
}
// Codes_SRS_COMPLETION_PORT_LINUX_11_039: [ If the event_callback is currently executing, completion_port_remove shall wait for the event_callback function to finish before completing. ]
else if (callback_state == COMPLETION_PORT_CALLBACK_EXECUTING)
{
// Callback is executing, so wait till the callback is done
(void)InterlockedHL_WaitForValue(&epoll_data->event_callback_called, COMPLETION_PORT_CALLBACK_EXECUTED, UINT32_MAX);
}
else
{
// callback state is Executed Do Nothing
}
}
current_item = current_item->next;
}
}
leave_crit_section(completion_port);
(void)interlocked_decrement(&completion_port->pending_calls);
wake_by_address_single(&completion_port->pending_calls);
}
}
}