spectator/stateful_meters.h (229 lines of code) (raw):

#pragma once #include "id.h" #include "measurement.h" #include "meter_type.h" namespace spectator { namespace detail { /// Atomically add a delta to an atomic double /// equivalent to fetch_add for integer types inline void add_double(std::atomic<double>* n, double delta) { double current; do { current = n->load(std::memory_order_relaxed); } while (!n->compare_exchange_weak( current, n->load(std::memory_order_relaxed) + delta)); } /// Atomically set the max value of an atomic number template <typename T> inline void update_max(std::atomic<T>* n, T value) { T current; do { current = n->load(std::memory_order_relaxed); } while (value > current && !n->compare_exchange_weak(current, value)); } } // namespace detail class StatefulMeter { public: explicit StatefulMeter(IdPtr id) : id_{std::move(id)} {} StatefulMeter(const StatefulMeter&) = default; virtual ~StatefulMeter() = default; virtual void Measure(std::vector<Measurement>* measurements) = 0; [[nodiscard]] virtual MeterType GetType() const = 0; [[nodiscard]] IdPtr MeterId() const { return id_; } protected: IdPtr id_; }; template <typename DistType> class TestDistribution : public StatefulMeter { public: explicit TestDistribution(IdPtr id) : StatefulMeter(std::move(id)) {} int64_t Count() const { return count_; } double TotalAmount() const { return total_; } MeterType GetType() const override { return DistType::meter_type; } void Measure(std::vector<Measurement>* measurements) override { auto cnt = count_.exchange(0); if (cnt == 0) { return; } auto total = total_.exchange(0); auto t_sq = totalSq_.exchange(0); auto mx = max_.exchange(0); measurements->emplace_back(id_->WithStat(DistType::total_name), total); measurements->emplace_back(id_->WithStat("totalOfSquares"), t_sq); measurements->emplace_back(id_->WithStat("max"), mx); measurements->emplace_back(id_->WithStat("count"), cnt); } protected: void record(double amount) { if (amount >= 0) { count_.fetch_add(1); detail::add_double(&total_, amount); detail::add_double(&totalSq_, amount * amount); detail::update_max(&max_, amount); } } private: std::atomic<int64_t> count_ = 0; std::atomic<double> total_ = 0; std::atomic<double> totalSq_ = 0; std::atomic<double> max_ = 0; }; struct timer_distribution { static constexpr auto meter_type = MeterType::Timer; static constexpr auto total_name = "totalTime"; }; struct summary_distribution { static constexpr auto meter_type = MeterType::DistSummary; static constexpr auto total_name = "totalAmount"; }; class StatefulAgeGauge : public StatefulMeter { public: explicit StatefulAgeGauge(IdPtr id) : StatefulMeter(std::move(id)) {} [[nodiscard]] double Get() const { return value_; } MeterType GetType() const override { return MeterType::AgeGauge; } void Set(double amount) { value_ = amount; } void Measure(std::vector<Measurement>* measurements) override { auto v = value_.exchange(kNaN); if (std::isnan(v)) { return; } measurements->emplace_back(Id::WithDefaultStat(id_, "gauge"), v); } private: static constexpr auto kNaN = std::numeric_limits<double>::quiet_NaN(); std::atomic<double> value_ = kNaN; }; class StatefulCounter : public StatefulMeter { public: explicit StatefulCounter(IdPtr id) : StatefulMeter(std::move(id)) {} [[nodiscard]] double Count() const { return count_; }; MeterType GetType() const override { return MeterType::Counter; } void Add(double delta) { if (delta > 0) { detail::add_double(&count_, delta); } } void Increment() { Add(1); } void Measure(std::vector<Measurement>* measurements) override { auto count = count_.exchange(0.0); if (count > 0) { measurements->emplace_back(Id::WithDefaultStat(id_, "count"), count); } } private: std::atomic<double> count_ = 0.0; }; class StatefulDistSum : public TestDistribution<summary_distribution> { public: explicit StatefulDistSum(IdPtr id): TestDistribution<summary_distribution>(std::move(id)) {} void Record(double amount) { record(amount); } }; class StatefulGauge : public StatefulMeter { public: explicit StatefulGauge(IdPtr id) : StatefulMeter(std::move(id)) {} [[nodiscard]] double Get() const { return value_; } MeterType GetType() const override { return MeterType::Gauge; } void Set(double amount) { value_ = amount; } void Measure(std::vector<Measurement>* measurements) override { auto v = value_.exchange(kNaN); if (std::isnan(v)) { return; } measurements->emplace_back(Id::WithDefaultStat(id_, "gauge"), v); } private: static constexpr auto kNaN = std::numeric_limits<double>::quiet_NaN(); std::atomic<double> value_ = kNaN; }; class StatefulMaxGauge : public StatefulMeter { public: explicit StatefulMaxGauge(IdPtr id) : StatefulMeter(std::move(id)) {} [[nodiscard]] double Get() const { return value_; } MeterType GetType() const override { return MeterType::MaxGauge; } void Set(double amount) { detail::update_max(&value_, amount); } void Update(double amount) { Set(amount); } void Measure(std::vector<Measurement>* measurements) override { auto v = value_.exchange(kMinValue); if (v == kMinValue) { return; } measurements->emplace_back(Id::WithDefaultStat(id_, "max"), v); } private: static constexpr auto kMinValue = std::numeric_limits<double>::lowest(); std::atomic<double> value_ = kMinValue; }; class StatefulMonoCounter : public StatefulMeter { public: explicit StatefulMonoCounter(IdPtr id) : StatefulMeter(std::move(id)) {} MeterType GetType() const override { return MeterType::MonotonicCounter; } [[nodiscard]] double Delta() const { return value_ - prev_value_; } void Set(double amount) { value_ = amount; } void Measure(std::vector<Measurement>* measurements) override { auto delta = Delta(); prev_value_ = value_.load(); if (delta > 0) { measurements->emplace_back(id_->WithStat("count"), delta); } } private: static constexpr auto kNaN = std::numeric_limits<double>::quiet_NaN(); std::atomic<double> value_ = kNaN; std::atomic<double> prev_value_ = kNaN; }; class StatefulMonoCounterUint : public StatefulMeter { public: explicit StatefulMonoCounterUint(IdPtr id) : StatefulMeter(std::move(id)) {} MeterType GetType() const override { return MeterType::MonotonicCounterUint; } [[nodiscard]] double Delta() const { if (value_ < prev_value_) { return kMax - prev_value_ + value_ + 1; } else { return value_ - prev_value_; } } void Set(uint64_t amount) { value_ = amount; } void Measure(std::vector<Measurement>* measurements) override { auto delta = Delta(); prev_value_ = value_.load(); if (delta > 0) { measurements->emplace_back(id_->WithStat("count"), delta); } } private: static constexpr auto kMax = std::numeric_limits<uint64_t>::max(); std::atomic<uint64_t> value_ = 0; std::atomic<uint64_t> prev_value_ = 0; }; class StatefulPercTimer : public StatefulMeter { public: StatefulPercTimer(IdPtr id, std::chrono::nanoseconds, std::chrono::nanoseconds) : StatefulMeter(std::move(id)) {} [[nodiscard]] MeterType GetType() const override { return MeterType::PercentileTimer; } void Measure(std::vector<Measurement>*) override {} private: }; class StatefulPercDistSum : public StatefulMeter { public: StatefulPercDistSum(IdPtr id, int64_t, int64_t): StatefulMeter(std::move(id)) {} [[nodiscard]] MeterType GetType() const override { return MeterType::PercentileDistSummary; } void Measure(std::vector<Measurement>*) override {} }; class StatefulTimer : public TestDistribution<timer_distribution> { public: explicit StatefulTimer(IdPtr id): TestDistribution<timer_distribution>(std::move(id)) {} void Record(absl::Duration amount) { record(absl::ToDoubleSeconds(amount)); } void Record(std::chrono::nanoseconds amount) { Record(absl::FromChrono(amount)); } }; struct stateful_meters { using counter_t = StatefulCounter; using ds_t = StatefulDistSum; using gauge_t = StatefulGauge; using max_gauge_t = StatefulMaxGauge; using age_gauge_t = StatefulAgeGauge; using monotonic_counter_t = StatefulMonoCounter; using monotonic_counter_uint_t = StatefulMonoCounterUint; using perc_timer_t = StatefulPercTimer; using perc_ds_t = StatefulPercDistSum; using timer_t = StatefulTimer; }; } // namespace spectator