static int epoll_worker_func()

in linux/src/completion_port_linux.c [128:199]


static int epoll_worker_func(void* parameter)
{
    COMPLETION_PORT_HANDLE completion_port = (COMPLETION_PORT_HANDLE)parameter;
    // Codes_SRS_COMPLETION_PORT_LINUX_11_031: [ If parameter is NULL, epoll_worker_func shall do nothing. ]
    if (completion_port == NULL)
    {
        LogCritical("Invalid arguement epoll_worker_function COMPLETION_PORT_HANDLE completion_port=%p", completion_port);
    }
    else
    {
        int log_only_once = 0;
        // Loop while true
        do
        {
            struct epoll_event events[MAX_EVENTS_NUM];
            // Codes_SRS_COMPLETION_PORT_LINUX_11_032: [ epoll_worker_func shall call epoll_wait to wait for an epoll event to become signaled with a timeout of 2 Seconds. ]
            int num_ready = epoll_wait(completion_port->epoll, events, MAX_EVENTS_NUM, EVENTS_TIMEOUT_MS);
            if (num_ready == -1)
            {
                if (log_only_once < 1)
                {
                    LogErrorNo("Failure epoll_wait, MAX_EVENTS_NUM: %d, EVENTS_TIMEOUT_MS: %d event: %p", MAX_EVENTS_NUM, EVENTS_TIMEOUT_MS, events);
                    log_only_once++;
                }
            }

            // Codes_SRS_COMPLETION_PORT_LINUX_11_034: [ epoll_worker_func shall loop through the num of descriptors that was returned. ]
            for (int index = 0; index < num_ready; index++)
            {
                COMPLETION_PORT_EPOLL_ACTION epoll_action;
                EPOLL_THREAD_DATA* epoll_data = events[index].data.ptr;
                if (events[index].events & EPOLLRDHUP)
                {
                    epoll_action = COMPLETION_PORT_EPOLL_EPOLLRDHUP;
                }
                else if (events[index].events & EPOLLIN)
                {
                    epoll_action = COMPLETION_PORT_EPOLL_EPOLLIN;
                }
                else if (events[index].events & EPOLLOUT)
                {
                    epoll_action = COMPLETION_PORT_EPOLL_EPOLLOUT;
                }
                else
                {
                    LogWarning("Unexpected epoll event %d", events[index].events);
                    continue;
                }
                #ifdef USE_VALGRIND
                    ANNOTATE_HAPPENS_AFTER(epoll_data);
                #endif
                // If we haven't called into event_callback yet
                if (interlocked_compare_exchange(&epoll_data->event_callback_called, COMPLETION_PORT_CALLBACK_EXECUTING, COMPLETION_PORT_CALLBACK_INIT) == COMPLETION_PORT_CALLBACK_INIT)
                {
                    // Codes_SRS_COMPLETION_PORT_LINUX_11_035: [ epoll_worker_func shall call the event_callback with the specified COMPLETION_PORT_EPOLL_ACTION that was returned. ]
                    epoll_data->event_callback(epoll_data->event_callback_ctx, epoll_action);

                    (void)interlocked_exchange(&epoll_data->event_callback_called, COMPLETION_PORT_CALLBACK_EXECUTED);
                    wake_by_address_single(&epoll_data->event_callback_called);
                }
                // Codes_SRS_COMPLETION_PORT_LINUX_11_036: [ Then epoll_worker_func shall remove the EPOLL_THREAD_DATA from the list and free the object. ]
                if (remove_thread_data_from_list(completion_port, epoll_data) != 0)
                {
                    LogWarning("failure removing receive data from list %p, action: %" PRI_MU_ENUM "", epoll_data, MU_ENUM_VALUE(COMPLETION_PORT_EPOLL_ACTION, epoll_action));
                }
                free(epoll_data);
            }
            // Codes_SRS_COMPLETION_PORT_LINUX_11_033: [ On a epoll_wait timeout epoll_worker_func shall ensure it should not exit and issue another epoll_wait. ]
        } while (interlocked_add(&completion_port->worker_thread_continue, 0) == 0);
    }
    return 0;
}