util/Stats.h (270 lines of code) (raw):

/** * Copyright (c) 2014-present, Facebook, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. */ /* * Stats.h * * Lockless, threadsafe, efficient semi log periodic histogram/stats. * * Originally written for wormhole but now opensource as part of WDT * * This file is performance sensitive - for any change make sure : * (print interval set at 1ms to check that there is no contention between * the printing thread and the others) * With opt build, sends the stats to stderr as benchmark now prints to stdout: * * ./buck-out/gen/wdt/stats_benchmark -bm_regex="MtPer" \ * -print_interval=0.001 -minloglevel=3 -num_threads=6 2> /tmp/out * * 2016 results: on Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz - 6 threads: * MtPerHistogram 1 thread (no concurrency/best case:) 12.40ns 80.64M, * 6 threads: 12.78ns 78.24, 12 threads: 13.04ns 76.67M * so it scales linearly * * Old 2011 results: * gives above 45M/s (on 8 core Xeon L5410 @ 2.33GHz): * BM_mt_per_histogram 400000000 8.432 s 21.08 ns 45.24 M * and above 80M/s (on 32 htcores Xeon(R) CPU E5-2660 0 @ 2.20GHz): * BM_mt_per_histogram 400000000 4.644 s 11.61 ns 82.14 M * should also scale up to # thread equal to number of real cores, * ie 16 for the above * * and make sure that despite resetting every millisecond we do get: * awk -F, '($1 ~ /[<>]/) {sum+=$4} END {print "sum is", sum}' /tmp/out * gives (for -num_threads=12): * sum is 4800000000 * (12 threads doing 400M each -> 4.8B should be found in the histograms) * and /tmp/out should be large (~3 Mbytes) * * ps: could be even faster if templating the divider as this is the most * expensive part * * Created on: Oct 18, 2011 * Author: ldemailly */ #ifndef STATS_H_ #define STATS_H_ #include <sys/types.h> #include <cmath> #include <condition_variable> #include <iostream> #include <mutex> #include <thread> #include <boost/noncopyable.hpp> #include <folly/ThreadLocal.h> #include "glog/logging.h" namespace facebook { namespace wdt { // Note that atomic operations across threads are actually quite slow // so we use ThreadLocal storage instead and swipe the aggregates typedef volatile int64_t vint64_t; /** * Atomic add */ inline static int64_t atomic_add(vint64_t& data, int64_t v) { return __sync_add_and_fetch(&data, v); } /** * Atomic set - older/previous likely value is known/fetched already * Safest (spin lock until it's set) */ inline static void atomic_set_old(vint64_t& dest, int64_t oldVal, int64_t val) { int64_t prevVal; while (1) { prevVal = __sync_val_compare_and_swap(&dest, oldVal, val); if (prevVal == oldVal) { break; // swap/assign did work } VLOG(5) << "one more iter to set " << oldVal << " found " << prevVal; oldVal = prevVal; // try again } } /** * Atomic set when the previous value is unknown. Safest. */ inline static void atomic_set(vint64_t& dest, int64_t value) { atomic_set_old(dest, dest, value); } /** * Atomic set when the previous value is unknown * Avoid loop and might be working like the above - or not :-) * http://gcc.gnu.org/onlinedocs/gcc-4.6.2/gcc/Atomic-Builtins.html */ inline static void atomic_set2(vint64_t& dest, int64_t value) { __sync_lock_test_and_set(&dest, value); } /** * Atomic get * not const arg because we actually +0 ... */ inline static int64_t atomic_get(vint64_t& data) { return atomic_add(data, 0); } /** * Because of gcc strangeness in array initialization (in Hold2 below): * it requires the presence of a visible copy constructor - yet (thankfully!) * doesn't call it so we need a replacement for boost::noncopyable */ class crashifcopied { public: crashifcopied(const crashifcopied& /*c*/) { CHECK(false); } protected: crashifcopied() { } ~crashifcopied() { } private: const crashifcopied& operator=(const crashifcopied&); }; /** * Basic counter with stats: count, min, max, avg, stddev. * Not atomic and not locked - use PeriodicCounters or ThreadLocalCounter for * thread local and aggregated version which is faster than atomic */ class Counter : crashifcopied { public: explicit Counter(double printScale = 1.0) : count_(0), realMin_(0), min_(0), max_(0), sum_(0), sumOfSquares_(0), printScale_(printScale) { } virtual ~Counter() { } void record(int64_t v); inline int64_t getCount() const { return count_; } // Includes 0s inline int64_t getRealMin() const { return realMin_; } // Excludes 0 inline int64_t getMin() const { return min_; } inline int64_t getMax() const { return max_; } inline int64_t getSum() const { return sum_; } inline double getAverage() const { if (count_ == 0) { return NAN; } return (double)sum_ / count_; } double getStdDev() const; /** * Prints values - data is multipled by printScale_ * coma separated values: * count, avg, min, max, stddev * optionally override the constructor passed scale */ void print(std::ostream& os, double scale = 0) const; /** * prints "count,avg,min,max,stddev" */ void printCounterHeader(std::ostream& os) const; /** * resets all the data (not quite thread safe - see ThreadLocalHistogram) */ inline void reset() { count_ = min_ = max_ = sum_ = sumOfSquares_ = 0; } /** * Takes one counter and flush it into this one as if data for both * has been set on this one (used to merge threadlocal copies) */ void merge(const Counter& c); private: int64_t count_, realMin_, min_, max_, sum_, sumOfSquares_; double printScale_; }; /** * Semi logarithmic non atomic and unlocked Histogram. ThreadLocalHistogram * is the lock free thread safe version/wrapper - see below. */ class Histogram : public Counter { public: /** * Data will have "offset" substracted and be divided by "scale" before * being placed into a bucket. * The counter will have the row data (stddev may still overflow for large * values - good value for scale would be 1-1000) * * So for instance if the value is in usec and you only care about * 1/10th ms resolution; pass 100 as the scale * * If all your values are between 7000 and 8000 you may want to pass * 7000 as the offset and use 100 as the scale. * * @param scale divider for buckets, multiplier for printing data back * @param percentile to calculate the value for during print * 85% should be close to avg + stddev (1 sigma) * 99.9% should be close to 3 sigmas * Unless there is a lot of data with a dense/stable long tail, expect the * 99.9% to be noisy/close to "max" * @param offset substracted select a bucket, added back for printing */ explicit Histogram(int32_t scale = 1, double percentile1 = 85.0, double percentile2 = 99.9, int64_t offset = 0); ~Histogram() override; /** * record one value */ void record(int64_t value); /** * Dumps histogram data to the output stream. * * ex: * Histogram numbers from 1 to 10 * * Output: * # count,avg,min,max,stddev,10,5.5,1,10,2.88097 * # range, mid point, percentile, count * < 1 , 0 , 0, 0 * >= 1 < 2 , 1 , 10, 1 * >= 2 < 3 , 2 , 20, 1 * >= 3 < 4 , 3 , 30, 1 * >= 4 < 5 , 4 , 40, 1 * >= 5 < 6 , 5 , 50, 1 * >= 6 < 7 , 6 , 60, 1 * >= 7 < 8 , 7 , 70, 1 * >= 8 < 9 , 8 , 80, 1 * >= 9 < 10 , 9 , 90, 1 * >= 10 < 11 , 10 , 100, 1 * # target 85.0%,9.5 * # target 99.9%,10.0 */ void print(std::ostream& os) const; /** * clears all the value */ void reset(); /** * Merges the passed in histogram data into this histogram * as if all the addToHist() calls had been made on this object */ void merge(const Histogram& h); /** * Change the percentile targets for print() */ void setPercentile1(double p); void setPercentile2(double p); /** * Calculate the value that reaches target percentile. * It uses a linear fit for the range of the bucket which is greater or equal * to the target. Unless it would return a value > getMax() in which case it * will use the maximum as the boundary */ double calcPercentile(double p) const; // We need the number of buckets to know in advance the // size of the Histogram objects (without extra new/malloc; they can fit // on the stack...) /** * Semi log bucket definitions covering 5 order of magnitude (more * could be added) with high resolution in small numbers and relatively * small number of total buckets * For efficiency a look up table is created so the last value shouldn't * be too large (or will incur large memory overhead) * value between [ bucket(i-1), bucket(i) [ go in slot i * plus every value > bucket(last) in last bucket and every * value < bucket(0) */ static constexpr int32_t kHistogramBuckets[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, // by 1 - my amp goes to 11 ! 12, 14, 16, 18, 20, // by 2 25, 30, 35, 40, 45, 50, // by 5 60, 70, 80, 90, 100, // by 10 120, 140, 160, 180, 200, // line2 *10 250, 300, 350, 400, 450, 500, // line3 *10 600, 700, 800, 900, 1000, // line4 *10 2000, 3000, 4000, 5000, 7500, 10000, 20000, 30000, 40000, 50000, 75000, 100000}; /** constant with the index of the last data bucket (1 more bucket than * the histograms as we need a bucket for > than last entry in the list) */ static const size_t kLastIndex = sizeof(kHistogramBuckets) / sizeof(kHistogramBuckets[0]); private: /** buckets counters */ int32_t hdata_[kLastIndex + 1]; // n+1 buckets (for last one) /** value divider for the buckets */ const int32_t divider_; /** value offset for the buckets */ const int64_t offset_; /** target percentile to printout by default */ double percentile1_, percentile2_; }; /** * To use lock-free, thread-local counters, defined below, the following * examples should get you started. For those interested in the details, you * may continue reading after this comment block. * * The following is an example of using ThreadLocalCounter/Histogram where you * manually read all the counts and do something with them. * * <code> * ThreadLocalCounter counter; * * // From any thread: * counter.record(100); * * // In a single-threaded manner (i.e. from a single-thread), periodically: * ThreadLocalCounter merged; * counter.readAndReset(merged); * LOG(INFO) << "average: " << merged.getAverage(); * </code> * * If you want to periodically read thread-local counters and do something with * the counters, such as printing them, then you can use an instance of * PeriodicCounters that provides this common functionality. In the following * example we create two thread-local counters and periodically print them. * * <code> * // Create the counter & histogram. * ThreadLocalCounter counter(1.0, [](const Counter& c) { * // Function will be called periodically with fully merged counter, c. * c.print(std::cout); * }); * ThreadLocalHistogram histogram(1.0, [](const Histogram& h) { * h.print(std::cout); * }); * * // Create a PeriodicCounters to periodically run callbacks every second. * PeriodicCounters periodic({&counter, &histogram}); * periodic.schedule(1.0); * </code> */ /** * A SwapableNode is a base class to allow for polymorphism for the * PeriodicCounters (and ThreadLocalHistogram) classes. A SwapableNode * represents a wraper around a Counter-style class that can record data * and be merged. The idea is to double-buffer the Counter so that one can * be actively written into and the other can be read from. * * Clients should be using a sub-classes, such as ThreadLocalSwapableNode. */ class SwapableNode : private boost::noncopyable { protected: SwapableNode() : ptr_(0) { } virtual ~SwapableNode() { } /// Swap active object with inactive by flip-flopping from 0 to 1. inline int swap() { ptr_ ^= 1; return ptr_; } /// @return Index of current active object (0 or 1). inline int getActiveIndex() { return ptr_; } /** * Abstract function that interacts with PeriodicCounters and is called * periodically after the node has been swapped. */ virtual void process() = 0; private: int ptr_; //!< 0 or 1; index of current active object. friend class PeriodicCounters; }; /** * Holds 2 objects that can merge() into a result object upon destruction. * This object is basic and meant to be used with thread-local-storage. * Objects of type C are assumed to have a constructor that takes one argument * of type P. */ template <class C, class P> class Hold2 : boost::noncopyable { public: Hold2(C* result, const P& p) : datav_{C(p), C(p)}, result_(result) { VLOG(100) << "new Hold2 " << this; CHECK(result); } ~Hold2() { VLOG(100) << "~Hold2 " << this; result_->merge(datav_[0]); result_->merge(datav_[1]); } inline C& get(int idx) { return datav_[idx]; } private: C datav_[2]; //!< Holds 2 objects. C* result_; //!< Result object to merge into upon destruction. }; /** * ThreadLocalSwapableNode is a template class that implements a SwapableNode * counter using thread-local storage and no locking. Each thread will get its * own double-buffered Counter. Threads write into the active Counter. When the * needs to be read, we swap the counters and merge all the data together into * a new Counter that can be safely read. The class C must be a Counter-style * class that implements reset(), merge(), and record(), while class P is the * type of the first argument to the single-argument constructor to class C. */ template <class C, class P> class ThreadLocalSwapableNode : public SwapableNode { public: explicit ThreadLocalSwapableNode(const P& param, std::function<void(const C&)> func = nullptr) : param_(param), func_(func) { // Note: Logging crashes for static historgrams (which should be avoided) VLOG(100) << "new ThreadLocalSwapableNode " << this; } ~ThreadLocalSwapableNode() override { VLOG(100) << "~ThreadLocalSwapableNode " << this; } /// Call the underlying C::record() on the active object. inline void record(int64_t value) { getActiveCounter().record(value); } /// @return Reference to the current active C object. inline C& getActiveCounter() { // Works without locking because data_ is thread-local. auto* h = data_.get(); if (h == nullptr) { // Using UNLIKELY() here isn't faster. // ThreadLocalPtr uses a lock to maintain the list of Hold2s. h = new Hold2<C, P>(&aggregated_, param_); data_.reset(h); } return h->get(getActiveIndex()); } /** * Read all the counters and reset them. The values of the counters up to * this point will be dumped into the merged output argument. * * @param merged Object that will return merged counters. * @param delay Internally delay slightly when reading values; this is used * to avoid locking. */ inline void readAndReset(C& merged, bool delay = false) { swap(); if (delay) { usleep(10); } mergeAndReset(!getActiveIndex(), merged); } protected: /** * Merge all counters from all thread-local storage objects and reset all * the counters. * * @param idx Index of object to merge (active or in-active). * @param merged Object that will contain fully merged data. */ inline void mergeAndReset(int idx, C& merged) { for (Hold2<C, P>& p : data_.accessAllThreads()) { auto& h = p.get(idx); aggregated_.merge(h); h.reset(); } merged.merge(aggregated_); aggregated_.reset(); } /// @see PeriodicCounters for details on how this is used. void process() override { // If we have a callback function, give it the fully aggregated results. if (func_ != nullptr) { C merged; mergeAndReset(!getActiveIndex(), merged); func_(merged); } } private: P param_; //!< Value for constructor for class C. C aggregated_; //!< Object to hold aggregated results. std::function<void(const C&)> func_; /// Thread-local-storage for two swapable objects. folly::ThreadLocalPtr<Hold2<C, P>, SwapableNode> data_; }; // ThreadLocal versions of Histogram and Counter. typedef ThreadLocalSwapableNode<Histogram, int32_t> ThreadLocalHistogram; typedef ThreadLocalSwapableNode<Counter, double> ThreadLocalCounter; /** * PeriodicCounters, given a list of SwapableNode counters, will run a * periodic job that will read and reset all the counters. The action taken * after reading the contents of the counter is simply calling the optional * func() argument to SwapableNode. This class can be used, for example, to * periodically collect and print counters. */ class PeriodicCounters : private boost::noncopyable { public: explicit PeriodicCounters(const std::vector<SwapableNode*>& counters) : counters_(counters) { } virtual ~PeriodicCounters() { std::unique_lock<std::mutex> lk(mutex_); stop_ = true; lk.unlock(); cv_.notify_one(); if (thread_.joinable()) { thread_.join(); } swapAndRead(false); } void swapAndRead(bool delay = true) { for (SwapableNode* n : counters_) { n->swap(); } // Wait a tiny bit for threads using old value to be done. // // TODO: This is a hack, but avoid locks which kill performance. Look into // sacrificing some performance for 100% accurate results using some // sort of ultra-fast micro lock that pajoux wrote. if (delay) { usleep(100); } for (SwapableNode* n : counters_) { n->process(); } } bool schedule(double intervalInSec) { intervalInMs_ = (1000.0 * intervalInSec + 0.5); thread_ = std::thread(&PeriodicCounters::run, this); return thread_.joinable(); } private: void run() { std::unique_lock<std::mutex> lk(mutex_); while (!stop_) { if (cv_.wait_for(lk, std::chrono::milliseconds(intervalInMs_)) == std::cv_status::timeout) { swapAndRead(true); } } LOG(INFO) << "Periodic stats thread ending"; } std::vector<SwapableNode*> counters_; int64_t intervalInMs_; bool stop_{false}; std::mutex mutex_; std::condition_variable cv_; std::thread thread_; }; } } /* namespace facebook::wormhole */ #endif /* STATS_H_ */