src/perf_counter/perf_counter_atomic.h (356 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <boost/asio/deadline_timer.hpp>
#include <atomic>
#include <cstdint>
#include <memory>
#include <utility>
#include "boost/asio/basic_deadline_timer.hpp"
#include "perf_counter/perf_counter.h"
#include "utils/fmt_logging.h"
#include "utils/process_utils.h"
#include "utils/time_utils.h"
namespace boost {
namespace system {
class error_code;
} // namespace system
} // namespace boost
namespace dsn {
#pragma pack(push)
#pragma pack(8)
// ----------- NUMBER perf counter ---------------------------------
#define DIVIDE_CONTAINER 107
class perf_counter_number_atomic : public perf_counter
{
public:
perf_counter_number_atomic(const char *app,
const char *section,
const char *name,
dsn_perf_counter_type_t type,
const char *dsptr)
: perf_counter(app, section, name, type, dsptr)
{
for (int i = 0; i < DIVIDE_CONTAINER; i++) {
_val[i].store(0);
}
}
~perf_counter_number_atomic(void) {}
virtual void increment()
{
uint64_t task_id = static_cast<int>(utils::get_current_tid());
_val[task_id % DIVIDE_CONTAINER].fetch_add(1, std::memory_order_relaxed);
}
virtual void decrement()
{
uint64_t task_id = static_cast<int>(utils::get_current_tid());
_val[task_id % DIVIDE_CONTAINER].fetch_sub(1, std::memory_order_relaxed);
}
virtual void add(int64_t val)
{
uint64_t task_id = static_cast<int>(utils::get_current_tid());
_val[task_id % DIVIDE_CONTAINER].fetch_add(val, std::memory_order_relaxed);
}
virtual void set(int64_t val)
{
// the set-op of number is reset the number to zero.
// for simplicity, only set other zero, not add the lock to protect, if needed, should add
// lock.
for (int i = 0; i < DIVIDE_CONTAINER; i++)
_val[i].store(0, std::memory_order_relaxed);
_val[0].store(val, std::memory_order_relaxed);
}
virtual double get_value()
{
double val = 0;
for (int i = 0; i < DIVIDE_CONTAINER; i++) {
val += static_cast<double>(_val[i].load(std::memory_order_relaxed));
}
return val;
}
virtual int64_t get_integer_value()
{
int64_t val = 0;
for (int i = 0; i < DIVIDE_CONTAINER; i++) {
val += _val[i].load(std::memory_order_relaxed);
}
return val;
}
virtual double get_percentile(dsn_perf_counter_percentile_type_t type)
{
CHECK(false, "invalid execution flow");
return 0.0;
}
protected:
std::atomic<int64_t> _val[DIVIDE_CONTAINER];
};
// ----------- VOLATILE_NUMBER perf counter ---------------------------------
class perf_counter_volatile_number_atomic : public perf_counter_number_atomic
{
public:
perf_counter_volatile_number_atomic(const char *app,
const char *section,
const char *name,
dsn_perf_counter_type_t type,
const char *dsptr)
: perf_counter_number_atomic(app, section, name, type, dsptr)
{
}
~perf_counter_volatile_number_atomic(void) {}
virtual double get_value()
{
double val = 0;
for (int i = 0; i < DIVIDE_CONTAINER; i++) {
val += static_cast<double>(_val[i].exchange(0, std::memory_order_relaxed));
}
return val;
}
virtual int64_t get_integer_value()
{
int64_t val = 0;
for (int i = 0; i < DIVIDE_CONTAINER; i++) {
val += _val[i].exchange(0, std::memory_order_relaxed);
}
return val;
}
};
// ----------- RATE perf counter ---------------------------------
class perf_counter_rate_atomic : public perf_counter
{
public:
perf_counter_rate_atomic(const char *app,
const char *section,
const char *name,
dsn_perf_counter_type_t type,
const char *dsptr)
: perf_counter(app, section, name, type, dsptr), _rate(0)
{
_last_time = utils::get_current_physical_time_ns();
for (int i = 0; i < DIVIDE_CONTAINER; i++) {
_val[i].store(0, std::memory_order_relaxed);
}
}
~perf_counter_rate_atomic(void) {}
virtual void increment()
{
uint64_t task_id = static_cast<int>(utils::get_current_tid());
_val[task_id % DIVIDE_CONTAINER].fetch_add(1, std::memory_order_relaxed);
}
virtual void decrement()
{
uint64_t task_id = static_cast<int>(utils::get_current_tid());
_val[task_id % DIVIDE_CONTAINER].fetch_sub(1, std::memory_order_relaxed);
}
virtual void add(int64_t val)
{
uint64_t task_id = static_cast<int>(utils::get_current_tid());
_val[task_id % DIVIDE_CONTAINER].fetch_add(val, std::memory_order_relaxed);
}
virtual void set(int64_t val) { CHECK(false, "invalid execution flow"); }
virtual double get_value()
{
uint64_t now = utils::get_current_physical_time_ns();
double interval = (now - _last_time) / 1e9;
if (interval <= 0.1)
return _rate;
double val = 0;
for (int i = 0; i < DIVIDE_CONTAINER; i++) {
val += _val[i].fetch_and(0, std::memory_order_relaxed);
}
_rate = val / interval;
_last_time = now;
return _rate;
}
virtual int64_t get_integer_value() { return (int64_t)get_value(); }
virtual double get_percentile(dsn_perf_counter_percentile_type_t type)
{
CHECK(false, "invalid execution flow");
return 0.0;
}
private:
std::atomic<double> _rate;
std::atomic<uint64_t> _last_time;
std::atomic<int64_t> _val[DIVIDE_CONTAINER];
};
// ----------- NUMBER_PERCENTILE perf counter ---------------------------------
#define MAX_QUEUE_LENGTH 5000
#define _LEFT 0
#define _RIGHT 1
#define _QLEFT 2
#define _QRIGHT 3
class perf_counter_number_percentile_atomic : public perf_counter
{
public:
perf_counter_number_percentile_atomic(const char *app,
const char *section,
const char *name,
dsn_perf_counter_type_t type,
const char *dsptr,
bool use_timer = true);
~perf_counter_number_percentile_atomic(void)
{
if (_timer) {
_timer->cancel();
}
}
virtual void increment() { CHECK(false, "invalid execution flow"); }
virtual void decrement() { CHECK(false, "invalid execution flow"); }
virtual void add(int64_t val) { CHECK(false, "invalid execution flow"); }
virtual void set(int64_t val)
{
uint64_t idx = _tail.fetch_add(1, std::memory_order_relaxed);
_samples[idx % MAX_QUEUE_LENGTH] = val;
}
virtual double get_value()
{
CHECK(false, "invalid execution flow");
return 0.0;
}
virtual int64_t get_integer_value() { return (int64_t)get_value(); }
virtual double get_percentile(dsn_perf_counter_percentile_type_t type)
{
if ((type < 0) || (type >= COUNTER_PERCENTILE_COUNT)) {
CHECK(false, "send a wrong counter percentile type");
return 0.0;
}
return (double)_results[type];
}
virtual int get_latest_samples(int required_sample_count,
/*out*/ samples_t &samples) const override
{
CHECK_LE(required_sample_count, MAX_QUEUE_LENGTH);
uint64_t count = _tail.load();
int return_count = count >= (uint64_t)required_sample_count ? required_sample_count : count;
samples.clear();
int end_index = (count + MAX_QUEUE_LENGTH - 1) % MAX_QUEUE_LENGTH;
int start_index = (end_index + MAX_QUEUE_LENGTH - return_count) % MAX_QUEUE_LENGTH;
if (end_index >= start_index) {
samples.push_back(std::make_pair((int64_t *)_samples + start_index, return_count));
} else {
samples.push_back(
std::make_pair((int64_t *)_samples + start_index, MAX_QUEUE_LENGTH - start_index));
samples.push_back(std::make_pair((int64_t *)_samples,
return_count - (MAX_QUEUE_LENGTH - start_index)));
}
return return_count;
}
virtual int64_t get_latest_sample() const override
{
int idx = (_tail.load() + MAX_QUEUE_LENGTH - 1) % MAX_QUEUE_LENGTH;
return _samples[idx];
}
private:
friend class perf_counter_nth_element_finder;
struct compute_context
{
int64_t ask[COUNTER_PERCENTILE_COUNT];
int64_t tmp[MAX_QUEUE_LENGTH];
int64_t mid_tmp[MAX_QUEUE_LENGTH];
int calc_queue[MAX_QUEUE_LENGTH][4];
};
void insert_calc_queue(const std::shared_ptr<compute_context> &ctx,
int left,
int right,
int qleft,
int qright,
int &calc_tail)
{
calc_tail++;
ctx->calc_queue[calc_tail][_LEFT] = left;
ctx->calc_queue[calc_tail][_RIGHT] = right;
ctx->calc_queue[calc_tail][_QLEFT] = qleft;
ctx->calc_queue[calc_tail][_QRIGHT] = qright;
return;
}
int64_t find_mid(const std::shared_ptr<compute_context> &ctx, int left, int right)
{
if (left == right)
return ctx->mid_tmp[left];
for (int index = left; index < right; index += 5) {
int remain_num = index + 5 >= right ? right - index + 1 : 5;
for (int i = index; i < index + remain_num; i++) {
int j;
int64_t k = ctx->mid_tmp[i];
for (j = i - 1; (j >= index) && (ctx->mid_tmp[j] > k); j--)
ctx->mid_tmp[j + 1] = ctx->mid_tmp[j];
ctx->mid_tmp[j + 1] = k;
}
ctx->mid_tmp[(index - left) / 5] = ctx->mid_tmp[index + remain_num / 2];
}
return find_mid(ctx, 0, (right - left - 1) / 5);
}
void select(const std::shared_ptr<compute_context> &ctx,
int left,
int right,
int qleft,
int qright,
int &calc_tail)
{
int i, j, index, now;
int64_t mid;
if (qleft > qright)
return;
if (left == right) {
for (i = qleft; i <= qright; i++)
if (ctx->ask[i] == 1) {
_results[i] = ctx->tmp[left];
} else
CHECK(false, "select percentail wrong!!!");
return;
}
for (i = left; i <= right; i++)
ctx->mid_tmp[i] = ctx->tmp[i];
mid = find_mid(ctx, left, right);
for (index = left; index <= right; index++)
if (ctx->tmp[index] == mid)
break;
ctx->tmp[index] = ctx->tmp[left];
index = left;
for (i = left, j = right; i <= j;) {
while ((i <= j) && (ctx->tmp[j] > mid))
j--;
if (i <= j)
ctx->tmp[index] = ctx->tmp[j], index = j--;
while ((i <= j) && (ctx->tmp[i] < mid))
i++;
if (i <= j)
ctx->tmp[index] = ctx->tmp[i], index = i++;
}
ctx->tmp[index] = mid;
now = index - left + 1;
for (i = qleft; (i <= qright) && (ctx->ask[i] < now); i++)
;
for (j = i; j <= qright; j++)
ctx->ask[j] -= now;
for (j = i; (j <= qright) && (ctx->ask[j] == 0); j++)
ctx->ask[j]++;
insert_calc_queue(ctx, left, index - 1, qleft, i - 1, calc_tail);
insert_calc_queue(ctx, index, index, i, j - 1, calc_tail);
insert_calc_queue(ctx, index + 1, right, j, qright, calc_tail);
return;
}
void calc(const std::shared_ptr<compute_context> &ctx)
{
uint64_t _num = _tail.load();
if (_num > MAX_QUEUE_LENGTH)
_num = MAX_QUEUE_LENGTH;
if (_num == 0)
return;
for (int i = 0; i < _num; i++)
ctx->tmp[i] = _samples[i];
ctx->ask[COUNTER_PERCENTILE_50] = (int)(_num * 0.5) + 1;
ctx->ask[COUNTER_PERCENTILE_90] = (int)(_num * 0.90) + 1;
ctx->ask[COUNTER_PERCENTILE_95] = (int)(_num * 0.95) + 1;
ctx->ask[COUNTER_PERCENTILE_99] = (int)(_num * 0.99) + 1;
ctx->ask[COUNTER_PERCENTILE_999] = (int)(_num * 0.999) + 1;
// must be sorted
// std::sort(ctx->ask, ctx->ask + MAX_TYPE_NUMBER);
int l, r = 0;
insert_calc_queue(ctx, 0, _num - 1, 0, COUNTER_PERCENTILE_COUNT - 1, r);
for (l = 1; l <= r; l++)
select(ctx,
ctx->calc_queue[l][_LEFT],
ctx->calc_queue[l][_RIGHT],
ctx->calc_queue[l][_QLEFT],
ctx->calc_queue[l][_QRIGHT],
r);
return;
}
void on_timer(std::shared_ptr<boost::asio::deadline_timer> timer,
const boost::system::error_code &ec);
std::shared_ptr<boost::asio::deadline_timer> _timer;
std::atomic<uint64_t> _tail; // should use unsigned int to avoid out of bound
int64_t _samples[MAX_QUEUE_LENGTH];
int64_t _results[COUNTER_PERCENTILE_COUNT];
};
#pragma pack(pop)
} // namespace dsn