include/ylt/metric/summary_impl.hpp (323 lines of code) (raw):
#include <algorithm>
#include <array>
#include <atomic>
#include <chrono>
#include <cmath>
#include <cstdint>
#include <iterator>
#include <limits>
#include <memory>
#include <type_traits>
#include <vector>
#ifdef SUMMARY_DEBUG_STABLE_TEST
#include "ylt/easylog.hpp"
#endif
namespace ylt::metric::detail {
template <typename uint_type, std::size_t frac_bit = 6>
class summary_impl {
static_assert(sizeof(uint_type) >= 4);
static_assert(std::is_unsigned_v<uint_type>);
constexpr static uint32_t decode_impl(uint16_t float16_value) {
float16_value <<= (8 - frac_bit);
uint32_t sign = float16_value >> 15;
uint32_t exponent = (float16_value >> 8) & 0x7F;
uint32_t fraction = (float16_value & 0xFF);
uint32_t float32_value;
if (exponent == 0) {
/*discard Denormals, in encode they may not correct so we just decode it
* as zero */
float32_value = (sign << 31);
}
else if (exponent == 0x7F) {
/* Inf or NaN */
/* we just use return it as value 2^64 */
float32_value = (sign << 31) | ((127 + (127 - 63)) << 23);
}
else {
/* ordinary number */
float32_value =
(sign << 31) | ((exponent + (127 - 63)) << 23) | (fraction << 15);
}
return float32_value;
}
constexpr static auto generate_decode_table() {
constexpr size_t bucket_size =
1 << (frac_bit + 1 /*sign bit*/ + 7 /*exp bit*/);
std::array<uint32_t, bucket_size> table{};
for (uint16_t i = 0; i < bucket_size; ++i) {
table[i] = decode_impl(i);
}
return table;
};
static auto& get_decode_table() {
static constexpr auto table = generate_decode_table();
return table;
};
/*my float16: | 1bit positive/negative flag | 6bit exp | 9bit frac |*/
static_assert(frac_bit < 8);
static constexpr float float16_max = (1ull << 63) * 2.0f; // 2^64
static uint16_t encode(float flt) {
static_assert(sizeof(float) == 4);
uint32_t& fltInt32 = *(uint32_t*)&flt;
if (std::abs(flt) >= float16_max || std::isnan(flt)) {
flt = (fltInt32 & 0x8000'0000) ? (-float16_max) : (float16_max);
}
unsigned short fltInt16;
fltInt16 = (fltInt32 >> 31) << 7; /*float32 flag: 1bit*/
unsigned short tmp = (fltInt32 >> 23) & 0xff; /*float32 exp: 8bit*/
tmp = (tmp - 0x40) & ((unsigned int)((int)(0x40 - tmp) >> 6) >> 25);
fltInt16 = (fltInt16 | tmp) << 8;
// this step cause error denormals for flt<2^-63, but we decode it as zero
// later
fltInt16 |= (fltInt32 >> 15) & 0xff;
auto i = fltInt16 >> (8 - frac_bit);
return i;
}
static float decode(uint16_t float16_value) {
static_assert(frac_bit < 8);
return *(float*)&(get_decode_table()[float16_value]);
}
static constexpr inline size_t bucket_size =
1 << (frac_bit + 1 /*sign bit*/ + 7 /*exp bit*/);
static constexpr size_t piece_cnt = 1 << 7;
struct data_t {
static constexpr size_t piece_size = bucket_size / piece_cnt;
using piece_t = std::array<std::atomic<uint_type>, piece_size>;
std::atomic<uint_type>& operator[](std::size_t index) {
piece_t* piece = arr[index / piece_size];
if (piece == nullptr) {
auto ptr = new piece_t{};
if (!arr[index / piece_size].compare_exchange_strong(piece, ptr)) {
delete ptr;
}
return (*arr[index / piece_size].load())[index % piece_size];
}
else {
return (*piece)[index % piece_size];
}
}
void refresh() {
for (auto& piece_ptr : arr) {
if (piece_ptr) {
for (auto& e : *piece_ptr) {
e.store(0, std::memory_order::relaxed);
}
}
}
}
static uint16_t get_ordered_index(int16_t raw_index) {
return (raw_index >= bucket_size / 2) ? (bucket_size / 2 - 1 - raw_index)
: (raw_index);
}
static uint16_t get_raw_index(int16_t ordered_index) {
return (ordered_index < 0) ? (bucket_size / 2 - 1 - ordered_index)
: (ordered_index);
}
template <bool inc_order>
void stat_impl(uint64_t& count,
std::vector<std::pair<int16_t, uint_type>>& result, int i) {
auto piece = arr[i].load(std::memory_order_acquire);
if (piece) {
if constexpr (inc_order) {
for (int j = 0; j < piece->size(); ++j) {
// tsan check data race here is expected. stat dont need to be very
// strict. we allow old value.
auto value = (*piece)[j].load(std::memory_order_relaxed);
if (value) {
result.emplace_back(get_ordered_index(i * piece_size + j), value);
count += value;
}
}
}
else {
for (int j = piece->size() - 1; j >= 0; --j) {
auto value = (*piece)[j].load(std::memory_order_relaxed);
if (value) {
result.emplace_back(get_ordered_index(i * piece_size + j), value);
count += value;
}
}
}
}
}
void stat(uint64_t& count,
std::vector<std::pair<int16_t, uint_type>>& result) {
for (int i = piece_cnt - 1; i >= piece_cnt / 2; --i) {
stat_impl<false>(count, result, i);
}
for (int i = 0; i < piece_cnt / 2; ++i) {
stat_impl<true>(count, result, i);
}
}
~data_t() {
for (auto& e : arr) {
delete e;
}
}
std::array<std::atomic<piece_t*>, piece_cnt> arr;
// fixed_thread_local_value<double,32> cnt;
};
data_t& get_data() {
data_t* data = data_[frontend_data_index_];
if (data == nullptr) [[unlikely]] {
auto pointer = new data_t{};
if (!data_[frontend_data_index_].compare_exchange_strong(data, pointer)) {
delete pointer;
}
return *data_[frontend_data_index_];
}
else {
return *data;
}
}
static inline const unsigned long ms_count =
std::chrono::steady_clock::duration{std::chrono::milliseconds{1}}.count();
constexpr static uint32_t near_uint32_max = 4290000000U;
void increase(data_t& arr, uint16_t pos) {
auto res = arr[pos].fetch_add(1, std::memory_order::relaxed);
if constexpr (std::is_same_v<uint_type, uint32_t>) {
if (res > near_uint32_max) /*no overflow*/ [[likely]] {
arr[pos].fetch_sub(1, std::memory_order::relaxed);
int upper = (pos < bucket_size / 2) ? (bucket_size / 2) : (bucket_size);
int lower = (pos < bucket_size / 2) ? (0) : (bucket_size / 2);
for (int delta = 1, lim = (std::max)(upper - pos, pos - lower + 1);
delta < lim; ++delta) {
if (pos + delta < upper) {
if (arr[pos + delta].fetch_add(1, std::memory_order::relaxed) <=
near_uint32_max) {
break;
}
arr[pos + delta].fetch_sub(1, std::memory_order::relaxed);
}
if (pos - delta >= lower) {
if (arr[pos - delta].fetch_add(1, std::memory_order::relaxed) <=
near_uint32_max) {
break;
}
arr[pos - delta].fetch_sub(1, std::memory_order::relaxed);
}
}
}
}
}
struct data_copy_t {
std::vector<std::pair<int16_t, uint_type>> arr[2];
int index[2] = {}, smaller_one;
void init() {
if (arr[0][0] <= arr[1][0]) {
smaller_one = 0;
}
else {
smaller_one = 1;
}
}
void inc() {
index[smaller_one]++;
if (arr[0][index[0]] <= arr[1][index[1]]) {
smaller_one = 0;
}
else {
smaller_one = 1;
}
}
int16_t value() { return arr[smaller_one][index[smaller_one]].first; }
uint_type count() { return arr[smaller_one][index[smaller_one]].second; }
};
public:
void refresh() {
if (refresh_time_.count() <= 0) {
return;
}
uint64_t old_tp = tp_;
auto new_tp = std::chrono::steady_clock::now().time_since_epoch().count();
auto ms = (new_tp - old_tp) / ms_count;
if (; ms >= refresh_time_.count()) [[unlikely]] {
if (tp_.compare_exchange_strong(old_tp, new_tp)) {
if (ms >= 2 * refresh_time_.count()) {
for (auto& data : data_) {
if (data != nullptr) {
data.load()->refresh();
}
}
}
else {
auto pos = frontend_data_index_ ^ 1;
if (auto data = data_[pos].load(); data != nullptr) {
data->refresh();
}
frontend_data_index_ = pos;
}
}
}
}
void insert(float value) {
refresh();
auto& data = get_data();
increase(data, encode(value));
return;
}
std::vector<float> stat(double& sum, uint64_t& count) {
refresh();
count = 0;
sum = 0;
data_copy_t data_copy;
{
data_t* ar[2] = {data_[0], data_[1]};
if (ar[0] == nullptr && ar[1] == nullptr) [[unlikely]] {
return std::vector<float>(rate_.size(), 0.0f);
}
if (ar[0]) {
ar[0]->stat(count, data_copy.arr[0]);
}
if (ar[1]) {
ar[1]->stat(count, data_copy.arr[1]);
}
}
if (count == 0) {
return std::vector<float>(rate_.size(), 0);
}
uint64_t count_now = 0;
data_copy.arr[0].emplace_back(bucket_size / 2, 0);
data_copy.arr[1].emplace_back(bucket_size / 2, 0);
data_copy.init();
std::vector<float> result;
result.reserve(rate_.size());
float v = -float16_max;
for (double e : rate_) {
if (std::isnan(e) || e < 0) {
result.push_back(v);
continue;
}
else if (e > 1) [[unlikely]] {
e = 1;
}
auto target_count = std::min<double>(e * count, count);
if (e == 0) {
target_count = std::min(uint64_t{1}, count);
}
while (true) {
if (target_count <= count_now) [[unlikely]] {
result.push_back(v);
break;
}
auto tmp = data_copy.count();
count_now += tmp;
v = decode(data_t::get_raw_index(data_copy.value()));
sum += v * tmp;
data_copy.inc();
}
}
while (data_copy.value() < bucket_size / 2) {
sum +=
decode(data_t::get_raw_index(data_copy.value())) * data_copy.count();
data_copy.inc();
}
return result;
};
#ifdef SUMMARY_DEBUG_STABLE_TEST
static inline constexpr size_t ms_per_second = 1;
#else
static inline constexpr size_t ms_per_second = 1000;
#endif
summary_impl(std::vector<double>& rate,
std::chrono::seconds refresh_time = std::chrono::seconds{0})
: rate_(rate),
refresh_time_(refresh_time.count() * ms_per_second / 2),
tp_(std::chrono::steady_clock::now().time_since_epoch().count()) {
#ifdef SUMMARY_DEBUG_STABLE_TEST
ELOG_WARN << "summary max_age is ms now! dont use it in production! It's "
"just for test";
#endif
};
~summary_impl() {
for (auto& data : data_) {
delete data;
}
}
private:
const std::chrono::milliseconds refresh_time_;
std::atomic<uint64_t> tp_;
std::vector<double>& rate_;
std::array<std::atomic<data_t*>, 2> data_;
std::atomic<int> frontend_data_index_;
};
} // namespace ylt::metric::detail