vslib/NetMsgRegistrar.cpp (109 lines of code) (raw):

#include "NetMsgRegistrar.h" #include "swss/logger.h" #include "swss/netdispatcher.h" #include "swss/netlink.h" #include "swss/select.h" using namespace saivs; #define MUTEX std::lock_guard<std::mutex> _lock(m_mutex); NetMsgRegistrar::NetMsgRegistrar(): m_index(0) { SWSS_LOG_ENTER(); m_run = true; m_thread = std::make_shared<std::thread>(&NetMsgRegistrar::run, this); } NetMsgRegistrar::~NetMsgRegistrar() { SWSS_LOG_ENTER(); SWSS_LOG_NOTICE("begin"); m_run = false; m_link_thread_event.notify(); m_thread->join(); MUTEX; // this mutex makes sure that m_run flag is false when possible late // message arrive on callback SWSS_LOG_NOTICE("thread joined"); SWSS_LOG_NOTICE("end"); } NetMsgRegistrar& NetMsgRegistrar::getInstance() { SWSS_LOG_ENTER(); static NetMsgRegistrar instance; return instance; } uint64_t NetMsgRegistrar::registerCallback( _In_ Callback callback) { SWSS_LOG_ENTER(); MUTEX; m_map[m_index] = callback; return m_index++; } void NetMsgRegistrar::unregisterCallback( _In_ uint64_t index) { SWSS_LOG_ENTER(); MUTEX; auto it = m_map.find(index); if (it != m_map.end()) { m_map.erase(it); } } void NetMsgRegistrar::unregisterAll() { SWSS_LOG_ENTER(); MUTEX; m_map.clear(); } void NetMsgRegistrar::resetIndex() { SWSS_LOG_ENTER(); MUTEX; m_index = 0; } void NetMsgRegistrar::run() { SWSS_LOG_ENTER(); swss::NetDispatcher::getInstance().registerMessageHandler(RTM_NEWLINK, this); swss::NetDispatcher::getInstance().registerMessageHandler(RTM_DELLINK, this); SWSS_LOG_NOTICE("netlink msg listener started"); while (m_run) { try { swss::NetLink netlink; swss::Select s; netlink.registerGroup(RTNLGRP_LINK); netlink.dumpRequest(RTM_GETLINK); s.addSelectable(&netlink); s.addSelectable(&m_link_thread_event); while (m_run) { swss::Selectable *sel = NULL; int result = s.select(&sel); SWSS_LOG_INFO("select ended: %d", result); } } catch (const std::exception& e) { SWSS_LOG_ERROR("NetMsgRegistrar::run: exception: %s", e.what()); break; } } SWSS_LOG_NOTICE("netlink msg listener ended"); } void NetMsgRegistrar::onMsg( _In_ int nlmsg_type, _In_ struct nl_object *obj) { SWSS_LOG_ENTER(); // this is async method MUTEX; if (!m_run) { SWSS_LOG_WARN("received message after stopping thread"); return; } // since this message is received async it may happen that in this place // destructor was called and thread already joined, so we place MUTEX in // destructor ending to make sure that m_run is false if this happens // execute all callbacks under mutex for (auto& kvp: m_map) { kvp.second(nlmsg_type, obj); } }