include/ylt/coro_io/detail/client_queue.hpp (70 lines of code) (raw):
/*
* Copyright (c) 2023, Alibaba Group Holding Limited;
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include "ylt/util/concurrentqueue.h"
namespace coro_io::detail {
using namespace ylt::detail;
template <typename client_t>
class client_queue {
moodycamel::ConcurrentQueue<client_t> queue_[2];
std::atomic_int_fast16_t selected_index_ = 0;
std::atomic<std::size_t> size_[2] = {};
public:
std::atomic<std::size_t> collecter_cnt_ = 0;
private:
struct fake_client {
template <typename T>
fake_client& operator=(T&&) noexcept {
return *this;
}
};
struct fake_iter {
fake_iter operator++() { return *this; }
fake_iter operator++(int) { return *this; }
fake_client& operator*() {
static fake_client c;
return c;
}
};
public:
client_queue(std::size_t reserve_size = 0)
: queue_{moodycamel::ConcurrentQueue<client_t>{reserve_size},
moodycamel::ConcurrentQueue<client_t>{reserve_size}} {};
std::size_t size() const noexcept { return size_[0] + size_[1]; }
void reselect() noexcept { selected_index_ ^= 1; }
std::size_t enqueue(client_t&& c) {
const int_fast16_t index = selected_index_;
auto cnt = ++size_[index];
if (queue_[index].enqueue(std::move(c))) {
return cnt;
}
else {
--size_[index];
return 0;
}
}
bool try_dequeue(client_t& c) {
const int_fast16_t index = selected_index_;
if (size_[index ^ 1]) {
if (queue_[index ^ 1].try_dequeue(c)) {
--size_[index ^ 1];
return true;
}
}
if (queue_[index].try_dequeue(c)) {
--size_[index];
return true;
}
return false;
}
std::size_t clear_old(std::size_t max_clear_cnt) {
const int_fast16_t index = selected_index_ ^ 1;
if (size_[index]) {
std::size_t result =
queue_[index].try_dequeue_bulk(fake_iter{}, max_clear_cnt);
size_[index] -= result;
return result;
}
return 0;
}
};
}; // namespace coro_io::detail