in linux/src/completion_port_linux.c [319:419]
int completion_port_add(COMPLETION_PORT_HANDLE completion_port, int epoll_op, SOCKET_HANDLE socket,
ON_COMPLETION_PORT_EVENT_COMPLETE event_callback, void* event_callback_ctx)
{
int result;
if (
// Codes_SRS_COMPLETION_PORT_LINUX_11_016: [ If completion_port is NULL, completion_port_add shall return a non-NULL value. ]
completion_port == NULL ||
// Codes_SRS_COMPLETION_PORT_LINUX_11_017: [ If socket is INVALID_SOCKET, completion_port_add shall return a non-NULL value. ]
socket == INVALID_SOCKET ||
// Codes_SRS_COMPLETION_PORT_LINUX_11_018: [ If event_callback is NULL, completion_port_add shall return a non-NULL value. ]
event_callback == NULL)
{
LogError("Invalid arguments: COMPLETION_PORT_HANDLE completion_port=%p, epoll_op=%d, SOCKET_HANDLE socket=%" PRI_SOCKET ", ON_COMPLETION_PORT_EVENT_COMPLETE event_callback=%p, void* event_callback_ctx=%p",
completion_port, epoll_op, socket, event_callback, event_callback_ctx);
result = MU_FAILURE;
}
else
{
// Codes_SRS_COMPLETION_PORT_LINUX_11_019: [ completion_port_add shall ensure the thread completion flag is not set. ]
if (interlocked_add(&completion_port->worker_thread_continue, 0) != 0)
{
LogError("Invalid call sequence completion port module is destroying");
result = MU_FAILURE;
}
else
{
// Codes_SRS_COMPLETION_PORT_LINUX_11_020: [ completion_port_add shall increment the ongoing call count value to prevent close. ]
(void)interlocked_increment(&completion_port->pending_calls);
// Codes_SRS_COMPLETION_PORT_LINUX_11_021: [ completion_port_add shall allocate a EPOLL_THREAD_DATA object to store thread data. ]
EPOLL_THREAD_DATA* epoll_thread_data = malloc(sizeof(EPOLL_THREAD_DATA));
if (epoll_thread_data == NULL)
{
LogError("failure allocating epoll thread data size: %zu", sizeof(EPOLL_THREAD_DATA));
result = MU_FAILURE;
}
else
{
epoll_thread_data->event_callback = event_callback;
epoll_thread_data->event_callback_ctx = event_callback_ctx;
(void)interlocked_exchange(&epoll_thread_data->event_callback_called, COMPLETION_PORT_CALLBACK_INIT);
epoll_thread_data->socket = socket;
epoll_thread_data->link.next = NULL;
struct epoll_event ev = {0};
ev.events = epoll_op;
ev.data.ptr = (void*)epoll_thread_data;
#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_BEFORE(epoll_thread_data);
#endif
// Codes_SRS_COMPLETION_PORT_LINUX_11_022: [ completion_port_add shall add the EPOLL_THREAD_DATA object to a list for later removal. ]
if (add_thread_data_to_list(completion_port, epoll_thread_data) != 0)
{
LogError("failure adding data to list");
}
else
{
// Codes_SRS_COMPLETION_PORT_LINUX_11_023: [ completion_port_add shall add the socket in the epoll system by calling epoll_ctl with EPOLL_CTL_MOD along with the epoll_op variable. ]
if (epoll_ctl(completion_port->epoll, EPOLL_CTL_MOD, epoll_thread_data->socket, &ev) < 0)
{
if (errno == ENOENT)
{
// Codes_SRS_COMPLETION_PORT_LINUX_11_024: [ If the epoll_ctl call fails with ENOENT, completion_port_add shall call epoll_ctl again with EPOLL_CTL_ADD. ]
if (epoll_ctl(completion_port->epoll, EPOLL_CTL_ADD, epoll_thread_data->socket, &ev) < 0)
{
LogErrorNo("failure with epoll_ctl EPOLL_CTL_ADD");
}
else
{
result = 0;
// (void)interlocked_decrement(&completion_port->pending_calls);
// wake_by_address_single(&completion_port->pending_calls);
goto all_ok;
}
}
else
{
LogErrorNo("failure with epoll_ctl EPOLL_CTL_MOD");
}
}
else
{
// Codes_SRS_COMPLETION_PORT_LINUX_11_026: [ On success, completion_port_add shall return 0. ]
result = 0;
goto all_ok;
}
// Remove
remove_thread_data_from_list(completion_port, epoll_thread_data);
}
free(epoll_thread_data);
}
// Codes_SRS_COMPLETION_PORT_LINUX_11_027: [ If any error occurs, completion_port_add shall fail and return a non-zero value. ]
result = MU_FAILURE;
all_ok:
// Codes_SRS_COMPLETION_PORT_LINUX_11_025: [ completion_port_add shall decrement the ongoing call count value to unblock close. ]
(void)interlocked_decrement(&completion_port->pending_calls);
wake_by_address_single(&completion_port->pending_calls);
}
}
return result;
}