syncd/ConcurrentQueue.h (77 lines of code) (raw):

#pragma once #include <mutex> #include <queue> #include "swss/logger.h" #include "swss/sal.h" namespace syncd { template <class T> class ConcurrentQueue { public: explicit ConcurrentQueue( _In_ size_t queueSizeLimit = UNLIMITED); virtual ~ConcurrentQueue() = default; bool enqueue( _In_ const T& val); bool dequeue( _Out_ T* valOut); size_t size(); bool empty(); private: // Queue size = 0 means there is no limit on queue size. static constexpr size_t UNLIMITED = 0; std::mutex m_mutex; std::queue<T> m_queue; size_t m_queueSizeLimit; ConcurrentQueue<T>(const ConcurrentQueue<T>&) = delete; ConcurrentQueue<T>& operator=(const ConcurrentQueue<T>&) = delete; }; template <class T> ConcurrentQueue<T>::ConcurrentQueue( _In_ size_t queueSizeLimit) : m_queueSizeLimit(queueSizeLimit) { SWSS_LOG_ENTER(); } template <class T> bool ConcurrentQueue<T>::enqueue( _In_ const T& val) { SWSS_LOG_ENTER(); std::lock_guard<std::mutex> mutex_lock(m_mutex); // If the queue exceeds the limit, return false. if ((m_queueSizeLimit == UNLIMITED) || (m_queue.size() < m_queueSizeLimit)) { m_queue.push(val); return true; } return false; } template <class T> bool ConcurrentQueue<T>::dequeue( _Out_ T* valOut) { SWSS_LOG_ENTER(); std::lock_guard<std::mutex> mutex_lock(m_mutex); if (m_queue.empty()) { return false; } *valOut = m_queue.front(); m_queue.pop(); return true; } template <class T> size_t ConcurrentQueue<T>::size() { SWSS_LOG_ENTER(); std::lock_guard<std::mutex> mutex_lock(m_mutex); return m_queue.size(); } template <class T> bool ConcurrentQueue<T>::empty() { SWSS_LOG_ENTER(); std::lock_guard<std::mutex> mutex_lock(m_mutex); return m_queue.empty(); } } // namespace syncd