in linux/src/completion_port_linux.c [128:199]
static int epoll_worker_func(void* parameter)
{
COMPLETION_PORT_HANDLE completion_port = (COMPLETION_PORT_HANDLE)parameter;
// Codes_SRS_COMPLETION_PORT_LINUX_11_031: [ If parameter is NULL, epoll_worker_func shall do nothing. ]
if (completion_port == NULL)
{
LogCritical("Invalid arguement epoll_worker_function COMPLETION_PORT_HANDLE completion_port=%p", completion_port);
}
else
{
int log_only_once = 0;
// Loop while true
do
{
struct epoll_event events[MAX_EVENTS_NUM];
// Codes_SRS_COMPLETION_PORT_LINUX_11_032: [ epoll_worker_func shall call epoll_wait to wait for an epoll event to become signaled with a timeout of 2 Seconds. ]
int num_ready = epoll_wait(completion_port->epoll, events, MAX_EVENTS_NUM, EVENTS_TIMEOUT_MS);
if (num_ready == -1)
{
if (log_only_once < 1)
{
LogErrorNo("Failure epoll_wait, MAX_EVENTS_NUM: %d, EVENTS_TIMEOUT_MS: %d event: %p", MAX_EVENTS_NUM, EVENTS_TIMEOUT_MS, events);
log_only_once++;
}
}
// Codes_SRS_COMPLETION_PORT_LINUX_11_034: [ epoll_worker_func shall loop through the num of descriptors that was returned. ]
for (int index = 0; index < num_ready; index++)
{
COMPLETION_PORT_EPOLL_ACTION epoll_action;
EPOLL_THREAD_DATA* epoll_data = events[index].data.ptr;
if (events[index].events & EPOLLRDHUP)
{
epoll_action = COMPLETION_PORT_EPOLL_EPOLLRDHUP;
}
else if (events[index].events & EPOLLIN)
{
epoll_action = COMPLETION_PORT_EPOLL_EPOLLIN;
}
else if (events[index].events & EPOLLOUT)
{
epoll_action = COMPLETION_PORT_EPOLL_EPOLLOUT;
}
else
{
LogWarning("Unexpected epoll event %d", events[index].events);
continue;
}
#ifdef USE_VALGRIND
ANNOTATE_HAPPENS_AFTER(epoll_data);
#endif
// If we haven't called into event_callback yet
if (interlocked_compare_exchange(&epoll_data->event_callback_called, COMPLETION_PORT_CALLBACK_EXECUTING, COMPLETION_PORT_CALLBACK_INIT) == COMPLETION_PORT_CALLBACK_INIT)
{
// Codes_SRS_COMPLETION_PORT_LINUX_11_035: [ epoll_worker_func shall call the event_callback with the specified COMPLETION_PORT_EPOLL_ACTION that was returned. ]
epoll_data->event_callback(epoll_data->event_callback_ctx, epoll_action);
(void)interlocked_exchange(&epoll_data->event_callback_called, COMPLETION_PORT_CALLBACK_EXECUTED);
wake_by_address_single(&epoll_data->event_callback_called);
}
// Codes_SRS_COMPLETION_PORT_LINUX_11_036: [ Then epoll_worker_func shall remove the EPOLL_THREAD_DATA from the list and free the object. ]
if (remove_thread_data_from_list(completion_port, epoll_data) != 0)
{
LogWarning("failure removing receive data from list %p, action: %" PRI_MU_ENUM "", epoll_data, MU_ENUM_VALUE(COMPLETION_PORT_EPOLL_ACTION, epoll_action));
}
free(epoll_data);
}
// Codes_SRS_COMPLETION_PORT_LINUX_11_033: [ On a epoll_wait timeout epoll_worker_func shall ensure it should not exit and issue another epoll_wait. ]
} while (interlocked_add(&completion_port->worker_thread_continue, 0) == 0);
}
return 0;
}