src/util/BlockingQueue.h (56 lines of code) (raw):

// Rkernel is an execution kernel for R interpreter // Copyright (C) 2019 JetBrains s.r.o. // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. #ifndef RWRAPPER_BLOCKING_QUEUE_H #define RWRAPPER_BLOCKING_QUEUE_H #include <condition_variable> #include <mutex> #include <deque> template <typename T> class BlockingQueue { public: explicit BlockingQueue(size_t maxSize = 0): maxSize(maxSize) { } void push(T const& value) { std::unique_lock<std::mutex> lock(mutex); if (maxSize != 0) { condVar.wait(lock, [&] { return queue.size() < maxSize; }); } queue.push_front(value); condVar.notify_one(); } T pop() { std::unique_lock<std::mutex> lock(mutex); condVar.wait(lock, [&] { return !queue.empty(); }); T result(std::move(queue.back())); queue.pop_back(); condVar.notify_one(); return result; } bool poll(T &value) { std::unique_lock<std::mutex> lock(mutex); if (queue.empty()) return false; value = std::move(queue.back()); queue.pop_back(); condVar.notify_one(); return true; } template<class TimePoint> bool popWithDeadline(TimePoint const& deadline, T &value) { std::unique_lock<std::mutex> lock(mutex); condVar.wait_until(lock, deadline, [&] { return !queue.empty(); }); if (queue.empty()) return false; value = std::move(queue.back()); queue.pop_back(); condVar.notify_one(); return true; } void setMaxSize(size_t newSize) { std::unique_lock<std::mutex> lock(mutex); maxSize = newSize; condVar.notify_one(); } private: std::deque<T> queue; std::mutex mutex; std::condition_variable condVar; size_t maxSize; }; #endif //RWRAPPER_BLOCKING_QUEUE_H