common/select.cpp (166 lines of code) (raw):

#include "common/selectable.h" #include "common/logger.h" #include "common/select.h" #include <algorithm> #include <stdio.h> #include <sys/time.h> #include <sys/types.h> #include <sys/epoll.h> #include <unistd.h> #include <string.h> using namespace std; namespace swss { Select::Select() { m_epoll_fd = ::epoll_create1(0); if (m_epoll_fd == -1) { std::string error = std::string("Select::constructor:epoll_create1: error=(" + std::to_string(errno) + "}:" + strerror(errno)); throw std::runtime_error(error); } } Select::~Select() { (void)::close(m_epoll_fd); } void Select::addSelectable(Selectable *selectable) { const int fd = selectable->getFd(); if(m_objects.find(fd) != m_objects.end()) { SWSS_LOG_WARN("Selectable is already added to the list, ignoring."); return; } m_objects[fd] = selectable; if (selectable->initializedWithData()) { m_ready.insert(selectable); } struct epoll_event ev = { .events = EPOLLIN, .data = { .fd = fd, }, }; int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &ev); if (res == -1) { std::string error = std::string("Select::add_fd:epoll_ctl: error=(" + std::to_string(errno) + "}:" + strerror(errno)); throw std::runtime_error(error); } } void Select::removeSelectable(Selectable *selectable) { const int fd = selectable->getFd(); m_objects.erase(fd); m_ready.erase(selectable); int res = ::epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, NULL); if (res == -1) { std::string error = std::string("Select::del_fd:epoll_ctl: error=(" + std::to_string(errno) + "}:" + strerror(errno)); throw std::runtime_error(error); } } void Select::addSelectables(vector<Selectable *> selectables) { for(auto it : selectables) { addSelectable(it); } } int Select::poll_descriptors(Selectable **c, unsigned int timeout, bool interrupt_on_signal = false) { int sz_selectables = static_cast<int>(m_objects.size()); std::vector<struct epoll_event> events(sz_selectables); int ret; while(true) { ret = ::epoll_wait(m_epoll_fd, events.data(), sz_selectables, timeout); // on signal interrupt check if we need to return if (ret == -1 && errno == EINTR) { if (interrupt_on_signal) { return Select::SIGNALINT; } } // on all other errors break the loop else { break; } } if (ret < 0) { return Select::ERROR; } for (int i = 0; i < ret; ++i) { int fd = events[i].data.fd; Selectable* sel = m_objects[fd]; try { sel->readData(); } catch (const std::runtime_error& ex) { SWSS_LOG_ERROR("readData error: %s", ex.what()); return Select::ERROR; } m_ready.insert(sel); } while (!m_ready.empty()) { auto sel = *m_ready.begin(); m_ready.erase(sel); // we must update clock only when the selector out of the m_ready // otherwise we break invariant of the m_ready sel->updateLastUsedTime(); if (!sel->hasData()) { continue; } *c = sel; if (sel->hasCachedData()) { // reinsert Selectable back to the m_ready set, when there're more messages in the cache m_ready.insert(sel); } sel->updateAfterRead(); return Select::OBJECT; } return Select::TIMEOUT; } int Select::select(Selectable **c, int timeout, bool interrupt_on_signal) { SWSS_LOG_ENTER(); int ret; *c = NULL; /* check if we have some data */ ret = poll_descriptors(c, 0); /* return if we have data, we have an error or desired timeout was 0 */ if (ret != Select::TIMEOUT || timeout == 0) return ret; /* wait for data */ ret = poll_descriptors(c, timeout, interrupt_on_signal); return ret; } bool Select::isQueueEmpty() { return m_ready.empty(); } std::string Select::resultToString(int result) { SWSS_LOG_ENTER(); switch (result) { case swss::Select::OBJECT: return "OBJECT"; case swss::Select::ERROR: return "ERROR"; case swss::Select::TIMEOUT: return "TIMEOUT"; case swss::Select::SIGNALINT: return "SIGNALINT"; default: SWSS_LOG_WARN("unknown select result: %d", result); return "UNKNOWN"; } } };