syncd/NotificationQueue.cpp (92 lines of code) (raw):

#include "NotificationQueue.h" #include "sairediscommon.h" #define NOTIFICATION_QUEUE_DROP_COUNT_INDICATOR (1000) using namespace syncd; #define MUTEX std::lock_guard<std::mutex> _lock(m_mutex); NotificationQueue::NotificationQueue( _In_ size_t queueLimit, _In_ size_t consecutiveThresholdLimit): m_queueSizeLimit(queueLimit), m_thresholdLimit(consecutiveThresholdLimit), m_dropCount(0), m_lastEventCount(0), m_lastEvent(SAI_SWITCH_NOTIFICATION_NAME_FDB_EVENT) { SWSS_LOG_ENTER(); m_queue = std::make_shared<std::queue<swss::KeyOpFieldsValuesTuple>>(); } NotificationQueue::~NotificationQueue() { SWSS_LOG_ENTER(); // empty } bool NotificationQueue::enqueue( _In_ const swss::KeyOpFieldsValuesTuple& item) { MUTEX; SWSS_LOG_ENTER(); bool candidateToDrop = false; std::string currentEvent; /* * If the queue exceeds the limit, then drop all further FDB events This is * a temporary solution to handle high memory usage by syncd and the * notification queue keeps growing. The permanent solution would be to * make this stateful so that only the *latest* event is published. * * We have also seen other notification storms that can also cause this queue issue * So the new scheme is to keep the last notification event and its consecutive count * If threshold limit reached and the consecutive count also reached then this notification * will also be dropped regardless of its event type to protect the device from crashing due to * running out of memory */ auto queueSize = m_queue->size(); currentEvent = kfvKey(item); if (currentEvent == m_lastEvent) { m_lastEventCount++; } else { m_lastEventCount = 1; m_lastEvent = currentEvent; } if (queueSize >= m_queueSizeLimit) { /* * Too many queued up already check if notification fits condition to e dropped * 1. All FDB events should be dropped at this point. * 2. All other notification events will start to drop if it reached the consecutive threshold limit */ if (currentEvent == SAI_SWITCH_NOTIFICATION_NAME_FDB_EVENT) { candidateToDrop = true; } else { if (m_lastEventCount >= m_thresholdLimit) { candidateToDrop = true; } } } if (!candidateToDrop) { m_queue->push(item); return true; } m_dropCount++; if (!(m_dropCount % NOTIFICATION_QUEUE_DROP_COUNT_INDICATOR)) { SWSS_LOG_NOTICE( "Too many messages in queue (%zu), dropped (%zu), lastEventCount (%zu) Dropping %s !", queueSize, m_dropCount, m_lastEventCount, m_lastEvent.c_str()); } return false; } bool NotificationQueue::tryDequeue( _Out_ swss::KeyOpFieldsValuesTuple& item) { MUTEX; SWSS_LOG_ENTER(); if (m_queue->empty()) { return false; } item = m_queue->front(); m_queue->pop(); if (m_queue->empty()) { /* * Since there could be burst of notifications, that allocated memory * can be over 2GB, but when queue will be drained that memory will not * be automatically released. Underlying deque container contains * function shrink_to_fit but that is just a request, and usually this * function does nothing. * * Make sure we will destroy queue and allocate new one. Assignment * operator is not enough here, since internal deque container will not * release memory under assignment. While making sure queue is deleted * all memory will be released. * * Downside of this approach is that even if we will have steady stream * of single notifications, each time we will allocate new queue. * Partial solution for this could allocating new queue only when * previous queue exceeded some size limit, for example 128 items. */ m_queue = nullptr; m_queue = std::make_shared<std::queue<swss::KeyOpFieldsValuesTuple>>(); } return true; } size_t NotificationQueue::getQueueSize() { MUTEX; SWSS_LOG_ENTER(); return m_queue->size(); }