src/utils/metrics.h (1,155 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include <boost/asio/deadline_timer.hpp> #include <boost/asio/detail/impl/epoll_reactor.hpp> #include <boost/asio/detail/impl/timer_queue_ptime.ipp> #include <rapidjson/ostreamwrapper.h> #include <stddef.h> #include <algorithm> #include <atomic> #include <bitset> #include <chrono> #include <cstdint> #include <functional> #include <memory> #include <new> #include <ratio> #include <set> #include <sstream> #include <string> #include <type_traits> #include <unordered_map> #include <unordered_set> #include <utility> #include <vector> #include <string_view> #include "common/json_helper.h" #include "gutil/map_util.h" #include "http/http_server.h" #include "utils/alloc.h" #include "utils/autoref_ptr.h" #include "utils/blob.h" #include "utils/casts.h" #include "utils/enum_helper.h" #include "utils/error_code.h" #include "utils/errors.h" #include "utils/fmt_logging.h" #include "utils/long_adder.h" #include "utils/macros.h" #include "utils/nth_element.h" #include "utils/ports.h" #include "utils/singleton.h" #include "utils/string_conv.h" #include "utils/synchronize.h" #include "utils/time_utils.h" #include "utils/utils.h" namespace boost { namespace system { class error_code; } // namespace system } // namespace boost // A metric library (for details pls see https://github.com/apache/incubator-pegasus/issues/922) // inspired by Kudu metrics (https://github.com/apache/kudu/blob/master/src/kudu/util/metrics.h). // // // Example of defining and instantiating a metric entity // ----------------------------------------------------- // Define an entity type at the top of your .cpp file (not within any namespace): // METRIC_DEFINE_entity(my_entity); // // To use the entity type, declare it at the top of any .h/.cpp file (not within any namespace): // METRIC_DECLARE_entity(my_entity); // // Instantiating the entity in whatever class represents it: // entity_instance = METRIC_ENTITY_my_entity.instantiate(my_entity_id, ...); // // // Example of defining and instantiating a metric // ----------------------------------------------------- // Define an entity type at the top of your .cpp file (not within any namespace): // METRIC_DEFINE_gauge_int64(my_entity, // my_gauge_name, // dsn::metric_unit::kMilliSeconds, // "the description for my gauge"); // // To use the metric prototype, declare it at the top of any .h/.cpp file (not within any // namespace): // METRIC_DECLARE_gauge_int64(my_gauge_name); // // Instantiating the metric in whatever class represents it with some initial arguments, if any: // metric_instance = METRIC_my_gauge_name.instantiate(entity_instance, ...); // The following are convenient macros provided to define entity types and metric prototypes. #define METRIC_DEFINE_entity(name) ::dsn::metric_entity_prototype METRIC_ENTITY_##name(#name) #define METRIC_DEFINE_gauge_int64(entity_type, name, unit, desc, ...) \ ::dsn::gauge_prototype<int64_t> METRIC_##name( \ {#entity_type, dsn::metric_type::kGauge, #name, unit, desc, ##__VA_ARGS__}) #define METRIC_DEFINE_gauge_double(entity_type, name, unit, desc, ...) \ ::dsn::gauge_prototype<double> METRIC_##name( \ {#entity_type, dsn::metric_type::kGauge, #name, unit, desc, ##__VA_ARGS__}) // There are 2 kinds of counters: // * `counter` is the general type of counter that is implemented by striped_long_adder, which can // achieve high performance while consuming less memory if it's not updated very frequently. // * `concurrent_counter` uses concurrent_long_adder as the underlying implementation. It has // higher performance while consuming more memory if it's updated very frequently. // See also include/dsn/utility/long_adder.h for details. #define METRIC_DEFINE_counter(entity_type, name, unit, desc, ...) \ dsn::counter_prototype<dsn::striped_long_adder, false> METRIC_##name( \ {#entity_type, dsn::metric_type::kCounter, #name, unit, desc, ##__VA_ARGS__}) #define METRIC_DEFINE_concurrent_counter(entity_type, name, unit, desc, ...) \ dsn::counter_prototype<dsn::concurrent_long_adder, false> METRIC_##name( \ {#entity_type, dsn::metric_type::kCounter, #name, unit, desc, ##__VA_ARGS__}) #define METRIC_DEFINE_volatile_counter(entity_type, name, unit, desc, ...) \ dsn::counter_prototype<dsn::striped_long_adder, true> METRIC_##name( \ {#entity_type, dsn::metric_type::kVolatileCounter, #name, unit, desc, ##__VA_ARGS__}) #define METRIC_DEFINE_concurrent_volatile_counter(entity_type, name, unit, desc, ...) \ dsn::counter_prototype<dsn::concurrent_long_adder, true> METRIC_##name( \ {#entity_type, dsn::metric_type::kVolatileCounter, #name, unit, desc, ##__VA_ARGS__}) // The percentile supports both integral and floating types. #define METRIC_DEFINE_percentile_int64(entity_type, name, unit, desc, ...) \ dsn::percentile_prototype<int64_t> METRIC_##name( \ {#entity_type, dsn::metric_type::kPercentile, #name, unit, desc, ##__VA_ARGS__}) #define METRIC_DEFINE_percentile_double(entity_type, name, unit, desc, ...) \ dsn::floating_percentile_prototype<double> METRIC_##name( \ {#entity_type, dsn::metric_type::kPercentile, #name, unit, desc, ##__VA_ARGS__}) // The following macros act as forward declarations for entity types and metric prototypes. #define METRIC_DECLARE_entity(name) extern ::dsn::metric_entity_prototype METRIC_ENTITY_##name #define METRIC_DECLARE_gauge_int64(name) extern ::dsn::gauge_prototype<int64_t> METRIC_##name #define METRIC_DECLARE_gauge_double(name) extern ::dsn::gauge_prototype<double> METRIC_##name #define METRIC_DECLARE_counter(name) \ extern dsn::counter_prototype<dsn::striped_long_adder, false> METRIC_##name #define METRIC_DECLARE_concurrent_counter(name) \ extern dsn::counter_prototype<dsn::concurrent_long_adder, false> METRIC_##name #define METRIC_DECLARE_volatile_counter(name) \ extern dsn::counter_prototype<dsn::striped_long_adder, true> METRIC_##name #define METRIC_DECLARE_concurrent_volatile_counter(name) \ extern dsn::counter_prototype<dsn::concurrent_long_adder, true> METRIC_##name #define METRIC_DECLARE_percentile_int64(name) \ extern dsn::percentile_prototype<int64_t> METRIC_##name #define METRIC_DECLARE_percentile_double(name) \ extern dsn::floating_percentile_prototype<double> METRIC_##name // Following METRIC_VAR* macros are introduced so that: // * only need to use prototype name to operate each metric variable; // * uniformly name each variable in user class; // * differentiate operations on metrics significantly from main logic, improving code readability. // Declare a metric variable in user class. // // Since a type tends to be a class template where there might be commas, use variadic arguments // instead of a single fixed argument to represent a type. #define METRIC_VAR_NAME(name) _metric_##name #define METRIC_VAR_DECLARE(name, ...) __VA_ARGS__ METRIC_VAR_NAME(name) // Variadic arguments are possible qualifiers for the variable, such as `static`. #define METRIC_VAR_DECLARE_gauge_int64(name, ...) \ METRIC_VAR_DECLARE(name, __VA_ARGS__ dsn::gauge_ptr<int64_t>) #define METRIC_VAR_DECLARE_counter(name, ...) \ METRIC_VAR_DECLARE(name, __VA_ARGS__ dsn::counter_ptr<dsn::striped_long_adder, false>) #define METRIC_VAR_DECLARE_percentile_int64(name, ...) \ METRIC_VAR_DECLARE(name, __VA_ARGS__ dsn::percentile_ptr<int64_t>) // Macro METRIC_VAR_DEFINE* are used for the metric that is a static member of a class: // * `clazz` is the name of the class; // * variadic arguments are possible qualifiers for the variable. #define METRIC_VAR_DEFINE(name, clazz, ...) __VA_ARGS__ clazz::METRIC_VAR_NAME(name) #define METRIC_VAR_DEFINE_gauge_int64(name, clazz, ...) \ METRIC_VAR_DEFINE(name, clazz, __VA_ARGS__ dsn::gauge_ptr<int64_t>) #define METRIC_VAR_DEFINE_counter(name, clazz, ...) \ METRIC_VAR_DEFINE(name, clazz, __VA_ARGS__ dsn::counter_ptr<dsn::striped_long_adder, false>) #define METRIC_VAR_DEFINE_percentile_int64(name, clazz, ...) \ METRIC_VAR_DEFINE(name, clazz, __VA_ARGS__ dsn::percentile_ptr<int64_t>) // Initialize a metric variable in user class: // * macros METRIC_VAR_INIT* could be used to initialize metric variables in member initializer // lists of the constructor of user class; // * macros METRIC_VAR_ASSIGN* could be used to initialize metric variables by assignment operator // (=). #define METRIC_VAR_INSTANTIATE(name, entity, op, ...) \ METRIC_VAR_NAME(name) op(METRIC_##name.instantiate(entity##_metric_entity(), ##__VA_ARGS__)) #define METRIC_VAR_ASSIGN(name, entity, ...) METRIC_VAR_INSTANTIATE(name, entity, =, ##__VA_ARGS__) #define METRIC_VAR_INIT(name, entity, ...) METRIC_VAR_INSTANTIATE(name, entity, , ##__VA_ARGS__) #define METRIC_VAR_INIT_replica(name, ...) METRIC_VAR_INIT(name, replica, ##__VA_ARGS__) #define METRIC_VAR_ASSIGN_server(name, ...) METRIC_VAR_ASSIGN(name, server, ##__VA_ARGS__) #define METRIC_VAR_INIT_server(name, ...) METRIC_VAR_INIT(name, server, ##__VA_ARGS__) #define METRIC_VAR_INIT_disk(name, ...) METRIC_VAR_INIT(name, disk, ##__VA_ARGS__) #define METRIC_VAR_INIT_table(name, ...) METRIC_VAR_INIT(name, table, ##__VA_ARGS__) #define METRIC_VAR_INIT_partition(name, ...) METRIC_VAR_INIT(name, partition, ##__VA_ARGS__) #define METRIC_VAR_INIT_backup_policy(name, ...) METRIC_VAR_INIT(name, backup_policy, ##__VA_ARGS__) #define METRIC_VAR_INIT_queue(name, ...) METRIC_VAR_INIT(name, queue, ##__VA_ARGS__) #define METRIC_VAR_ASSIGN_profiler(name, ...) METRIC_VAR_ASSIGN(name, profiler, ##__VA_ARGS__) #define METRIC_VAR_INIT_latency_tracer(name, ...) \ METRIC_VAR_INIT(name, latency_tracer, ##__VA_ARGS__) // Perform increment_by() operations on gauges and counters. #define METRIC_VAR_INCREMENT_BY(name, x) \ do { \ const auto v = (x); \ if (v != 0) { \ METRIC_VAR_NAME(name)->increment_by(v); \ } \ } while (0) // Perform increment() operations on gauges and counters. #define METRIC_VAR_INCREMENT(name) METRIC_VAR_NAME(name)->increment() // Perform decrement_by() operations on gauges. #define METRIC_VAR_DECREMENT_BY(name, x) \ do { \ const auto v = (x); \ if (v != 0) { \ METRIC_VAR_NAME(name)->decrement_by(v); \ } \ } while (0) // Perform decrement() operations on gauges. #define METRIC_VAR_DECREMENT(name) METRIC_VAR_NAME(name)->decrement() // Perform set() operations on gauges and percentiles. // // There are 2 kinds of invocations of set() for a metric: // * set(val): set a single value for a metric, such as gauge, percentile; // * set(n, val): set multiple repeated values (the number of duplicates is n) for a metric, // such as percentile. #define METRIC_VAR_SET(name, ...) METRIC_VAR_NAME(name)->set(__VA_ARGS__) // Read the current measurement of gauges and counters. #define METRIC_VAR_VALUE(name) METRIC_VAR_NAME(name)->value() // Convenient macro that is used to compute latency automatically, which is dedicated to percentile. #define METRIC_VAR_AUTO_LATENCY(name, ...) \ dsn::auto_latency __##name##_auto_latency(METRIC_VAR_NAME(name), ##__VA_ARGS__) #define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name) __##name##_auto_latency.duration_ns() // Convenient macro that is used to increment/decrement gauge automatically in current scope. #define METRIC_VAR_AUTO_COUNT(name, ...) \ dsn::auto_count __##name##_auto_count(METRIC_VAR_NAME(name), ##__VA_ARGS__) // Implement a member function that runs `method` on the metric variable, without any argument. #define METRIC_DEFINE_NO_ARG(method, name) \ void METRIC_FUNC_NAME_##method(name)() { METRIC_VAR_##method(name); } // Implement a member function that runs `method` on the metric variable if NOT NULL, // without any argument. #define METRIC_DEFINE_NO_ARG_NOTNULL(method, name) \ void METRIC_FUNC_NAME_##method(name)() \ { \ if (METRIC_VAR_NAME(name) != nullptr) { \ METRIC_VAR_##method(name); \ } \ } // Implement a member function that runs `method` on the metric variable and return `ret_type`, // without any argument. #define METRIC_DEFINE_RET_AND_NO_ARG(ret_type, method, name) \ ret_type METRIC_FUNC_NAME_##method(name)() { return METRIC_VAR_##method(name); } // Implement a member function that runs `method` on the metric variable, with an argument. #define METRIC_DEFINE_ONE_ARG(method, name, arg_type) \ void METRIC_FUNC_NAME_##method(name)(arg_type arg) { METRIC_VAR_##method(name, arg); } // Implement a member function that runs `method` on the metric variable if NOT NULL, // with an argument. #define METRIC_DEFINE_ONE_ARG_NOTNULL(method, name, arg_type) \ void METRIC_FUNC_NAME_##method(name)(arg_type arg) \ { \ if (METRIC_VAR_NAME(name) != nullptr) { \ METRIC_VAR_##method(name, arg); \ } \ } // Call the member function of `obj` to run `method` on the metric variable. #define METRIC_CALL(obj, method, name, ...) (obj).METRIC_FUNC_NAME_##method(name)(__VA_ARGS__) // The name of the member function that increments the metric variable by some value. #define METRIC_FUNC_NAME_INCREMENT_BY(name) increment_##name##_by // Implement a member function that increments the metric variable by some value. #define METRIC_DEFINE_INCREMENT_BY(name) METRIC_DEFINE_ONE_ARG(INCREMENT_BY, name, int64_t) // To be adaptive to self-defined `increment_by` methods, arguments are declared as variadic. #define METRIC_INCREMENT_BY(obj, name, ...) METRIC_CALL(obj, INCREMENT_BY, name, ##__VA_ARGS__) // The name of the member function that increments the metric variable by one. #define METRIC_FUNC_NAME_INCREMENT(name) increment_##name // Implement a member function that increments the metric variable by one. #define METRIC_DEFINE_INCREMENT(name) METRIC_DEFINE_NO_ARG(INCREMENT, name) // Implement a member function that increments the metric variable by one if NOT NULL. #define METRIC_DEFINE_INCREMENT_NOTNULL(name) METRIC_DEFINE_NO_ARG_NOTNULL(INCREMENT, name) // To be adaptive to self-defined `increment` methods, arguments are declared as variadic. #define METRIC_INCREMENT(obj, name, ...) METRIC_CALL(obj, INCREMENT, name, ##__VA_ARGS__) // The name of the member function that decrements the metric variable by one. #define METRIC_FUNC_NAME_DECREMENT(name) decrement_##name // Implement a member function that decrements the metric variable by one. #define METRIC_DEFINE_DECREMENT(name) METRIC_DEFINE_NO_ARG(DECREMENT, name) // Implement a member function that decrements the metric variable by one if NOT NULL. #define METRIC_DEFINE_DECREMENT_NOTNULL(name) METRIC_DEFINE_NO_ARG_NOTNULL(DECREMENT, name) // To be adaptive to self-defined `decrement` methods, arguments are declared as variadic. #define METRIC_DECREMENT(obj, name, ...) METRIC_CALL(obj, DECREMENT, name, ##__VA_ARGS__) // The name of the member function that sets the metric variable with some value. #define METRIC_FUNC_NAME_SET(name) set_##name // Implement a member function that sets the metric variable with some value. #define METRIC_DEFINE_SET(name, value_type) METRIC_DEFINE_ONE_ARG(SET, name, value_type) // Implement a member function that sets the metric variable with some value if NOT NULL. #define METRIC_DEFINE_SET_NOTNULL(name, value_type) \ METRIC_DEFINE_ONE_ARG_NOTNULL(SET, name, value_type) // To be adaptive to self-defined `set` methods, arguments are declared as variadic. #define METRIC_SET(obj, name, ...) METRIC_CALL(obj, SET, name, ##__VA_ARGS__) // The name of the member function that gets the value of the metric variable. #define METRIC_FUNC_NAME_VALUE(name) get_##name // Implement a member function that gets the value of the metric variable. #define METRIC_DEFINE_VALUE(name, value_type) METRIC_DEFINE_RET_AND_NO_ARG(value_type, VALUE, name) // To be adaptive to self-defined `value` methods, arguments are declared as variadic. #define METRIC_VALUE(obj, name, ...) METRIC_CALL(obj, VALUE, name, ##__VA_ARGS__) namespace dsn { class metric; // IWYU pragma: keep class metric_entity_prototype; // IWYU pragma: keep class metric_prototype; // IWYU pragma: keep struct metric_filters; // IWYU pragma: keep using metric_ptr = ref_ptr<metric>; using metric_json_writer = dsn::json::PrettyJsonWriter; const std::string kMetricEntityTypeField = "type"; const std::string kMetricEntityIdField = "id"; const std::string kMetricEntityAttrsField = "attributes"; const std::string kMetricEntityMetricsField = "metrics"; const std::string kMetricClusterField = "cluster"; const std::string kMetricRoleField = "role"; const std::string kMetricHostField = "host"; const std::string kMetricPortField = "port"; const std::string kMetricTimestampNsField = "timestamp_ns"; const std::string kMetricEntitiesField = "entities"; class metric_entity : public ref_counter { public: using attr_map = std::unordered_map<std::string, std::string>; using metric_map = std::unordered_map<const metric_prototype *, metric_ptr>; const metric_entity_prototype *prototype() const { return _prototype; } const std::string &id() const { return _id; } attr_map attributes() const; metric_map metrics() const; // `args` are the parameters that are used to construct the object of MetricType. template <typename MetricType, typename... Args> ref_ptr<MetricType> find_or_create(const metric_prototype *prototype, Args &&...args); void take_snapshot(metric_json_writer &writer, const metric_filters &filters) const; private: friend class metric_registry; friend class ref_ptr<metric_entity>; friend class scoped_entity; metric_entity(const metric_entity_prototype *prototype, const std::string &id, const attr_map &attrs); ~metric_entity(); // Close all "closeable" metrics owned by this entity. // // `option` is used to control how the close operations are performed: // * kWait: close() will be blocked until all of the close operations are finished. // * kNoWait: once the close requests are issued, close() will return immediately without // waiting for any close operation to be finished. enum class close_option : int { kWait, kNoWait, }; void close(close_option option); void set_attributes(const attr_map &attrs); void encode_type(metric_json_writer &writer) const; void encode_id(metric_json_writer &writer) const; // Decide if an entity is stale. An entity becomes stale if it is no longer used by any other // object. // // An entity could be bound to one or multiple objects. Once all of these objects are // destroyed, this entity will become stale, which means all of the metrics held by this // entity are also stale. // // For example, once a replica is removed, the replica entity (and all metrics it holds) will // become stale; then, this entity is scheduled to be retired after a configurable retention // interval; finally, this entity will be removed from the registry with all metrics it holds. bool is_stale() const; const metric_entity_prototype *const _prototype; const std::string _id; mutable utils::rw_lock_nr _lock; attr_map _attrs; metric_map _metrics; // The timestamp when this entity should be retired: // * default value is 0, which means this entity has not been scheduled to be retired; // * otherwise, non-zero value means this entity has been scheduled to be retired, and will // be retired at any time once current time has reached or exceeded this timestamp. uint64_t _retire_time_ms; DISALLOW_COPY_AND_ASSIGN(metric_entity); }; using metric_entity_ptr = ref_ptr<metric_entity>; // This struct includes a set of filters for both entities and metrics requested by client. struct metric_filters { using metric_fields_type = std::unordered_set<std::string>; using entity_types_type = std::vector<std::string>; using entity_ids_type = std::unordered_set<std::string>; using entity_attrs_type = std::vector<std::string>; using entity_metrics_type = std::vector<std::string>; // NOTICE: empty `white_list` means every field is required by client. #define RETURN_MATCHED_WITH_EMPTY_WHITE_LIST(white_list) \ do { \ if (white_list.empty()) { \ return true; \ } \ } while (0) #define DEFINE_SIMPLE_MATCHER(name) \ template <typename T> \ inline bool match_##name(const T &candidate) const \ { \ return match(candidate, name##s); \ } static inline bool match(const char *candidate, const std::vector<std::string> &white_list) { RETURN_MATCHED_WITH_EMPTY_WHITE_LIST(white_list); // Will use `bool operator==(const string &lhs, const char *rhs);` to compare each element // in `white_list` with `candidate`. return utils::contains(white_list, candidate); } static inline bool match(const std::string &candidate, const std::unordered_set<std::string> &white_list) { RETURN_MATCHED_WITH_EMPTY_WHITE_LIST(white_list); return white_list.find(candidate) != white_list.end(); } // According to the parameters requested by client, this function will filter metric // fields that will be put in the response. DEFINE_SIMPLE_MATCHER(with_metric_field) DEFINE_SIMPLE_MATCHER(entity_type) DEFINE_SIMPLE_MATCHER(entity_id) bool match_entity_attrs(const metric_entity::attr_map &candidates) const { RETURN_MATCHED_WITH_EMPTY_WHITE_LIST(entity_attrs); // The size of container must be divisible by 2, since attribute name always pairs // with value in it. CHECK_EQ(entity_attrs.size() & 1, 0); for (entity_attrs_type::size_type i = 0; i < entity_attrs.size(); i += 2) { const auto &iter = candidates.find(entity_attrs[i]); if (iter == candidates.end()) { continue; } if (iter->second == entity_attrs[i + 1]) { // It will be considered as matched once any attribute is matched // for both name and value. return true; } } return false; } #undef DEFINE_SIMPLE_MATCHER #undef RETURN_MATCHED_WITH_EMPTY_WHITE_LIST void extract_entity_metrics(const metric_entity::metric_map &candidates, metric_entity::metric_map &target_metrics) const; // Build the http query string based on metric filters. This is useful when an http request // is performed for metrics query: firstly, set metric filters with what you want; then, // get query string by this function conveniently and put it into the http request. std::string to_query_string() const; // `with_metric_fields` includes all the metric fields that are wanted by client. If it // is empty, there will be no restriction: in other words, all fields owned by the metric // will be put in the response. metric_fields_type with_metric_fields; entity_types_type entity_types; entity_ids_type entity_ids; entity_attrs_type entity_attrs; entity_metrics_type entity_metrics; }; inline std::string encode_as_json(std::function<void(metric_json_writer &)> encoder) { std::ostringstream out; rapidjson::OStreamWrapper wrapper(out); metric_json_writer writer(wrapper); encoder(writer); return out.str(); } template <typename T> inline std::string take_snapshot_as_json(T *m, const metric_filters &filters) { return encode_as_json( [m, &filters](metric_json_writer &writer) { m->take_snapshot(writer, filters); }); } class metric_entity_prototype { public: explicit metric_entity_prototype(const char *name); ~metric_entity_prototype(); const char *name() const { return _name; } // Create an entity with the given ID and attributes, if any. metric_entity_ptr instantiate(const std::string &id, const metric_entity::attr_map &attrs) const; metric_entity_ptr instantiate(const std::string &id) const; private: const char *const _name; DISALLOW_COPY_AND_ASSIGN(metric_entity_prototype); }; class metric_registry; // IWYU pragma: keep class metrics_http_service : public http_server_base { public: static const std::string kMetricsRootPath; static const std::string kMetricsQuerySubPath; static const std::string kMetricsQueryPath; explicit metrics_http_service(metric_registry *registry); ~metrics_http_service() = default; // There is only one API now whose URI is "/metrics", thus just make // this URI as sub path while leaving the root path empty. std::string path() const override { return kMetricsRootPath; } private: friend void test_get_metrics_handler(const http_request &req, http_response &resp); void get_metrics_handler(const http_request &req, http_response &resp); metric_registry *_registry; DISALLOW_COPY_AND_ASSIGN(metrics_http_service); }; // `metric_timer` is a timer class that runs metric-related computations periodically, such as // calculating percentile, checking if there are stale entities. It accepts `on_exec` and // `on_close` as the callbacks for execution and close. // // In case that all metrics (such as percentiles) are computed at the same time and lead to very // high load, first calculation will be delayed at a random interval. class metric_timer { public: enum class state : int { kRunning, kClosing, kClosed, }; using on_exec_fn = std::function<void()>; using on_close_fn = std::function<void()>; metric_timer(uint64_t interval_ms, on_exec_fn on_exec, on_close_fn on_close); ~metric_timer() = default; void close(); void wait(); // Get the initial delay that is randomly generated by `generate_initial_delay_ms()`. uint64_t get_initial_delay_ms() const { return _initial_delay_ms; } private: // Generate an initial delay randomly in case that all percentiles are computed at the // same time. static uint64_t generate_initial_delay_ms(uint64_t interval_ms); void on_close(); void on_timer(const boost::system::error_code &ec); const uint64_t _initial_delay_ms; const uint64_t _interval_ms; const on_exec_fn _on_exec; const on_close_fn _on_close; std::atomic<state> _state; utils::notify_event _completed; std::unique_ptr<boost::asio::deadline_timer> _timer; DISALLOW_COPY_AND_ASSIGN(metric_timer); }; class metric_registry : public utils::singleton<metric_registry> { public: using entity_map = std::unordered_map<std::string, metric_entity_ptr>; using collected_entity_list = std::unordered_set<std::string>; struct collected_entities_info { // The collected entities that will be processed by retire_stale_entities(). Following // kinds of entities will be collected: // * entities that should be retired immediately. The entities that are still within // the retention interval will not be collected. // * entities that were previously considered stale however have already been reemployed, // which means its retirement should be cancelled by retire_stale_entities(). collected_entity_list collected_entities; // The number of all entities in the registry. size_t num_all_entities = 0; // The number of the entities that have been scheduled to be retired. size_t num_scheduled_entities = 0; collected_entities_info() = default; }; struct retired_entities_stat { // The number of retired entities. size_t num_retired_entities = 0; // The number of entities that were recently considered stale and scheduled to be // retired. size_t num_recently_scheduled_entities = 0; // The number of the entities that had previously been scheduled to be retired and // were recently reemployed. size_t num_reemployed_entities = 0; retired_entities_stat() = default; }; entity_map entities() const; void take_snapshot(metric_json_writer &writer, const metric_filters &filters) const; private: friend class metric_entity_prototype; friend class utils::singleton<metric_registry>; friend void test_get_metrics_handler(const http_request &req, http_response &resp); friend class scoped_entity; friend class MetricsRetirementTest; metric_registry(); ~metric_registry(); void on_close(); void start_timer(); void stop_timer(); metric_entity_ptr find_or_create_entity(const metric_entity_prototype *prototype, const std::string &id, const metric_entity::attr_map &attrs); void encode_entities(metric_json_writer &writer, const metric_filters &filters) const; // These functions are used to retire stale entities. // // Since retirement is infrequent, there tend to be no entity that should be retired. // Therefore, the whole retirement process is divided into two phases: "collect" and // "retire". // // At the first phase "collect", we just check if there are entities that: // * has become stale, but has not been scheduled to be retired, or // * should be retired immediately, or // * previously were scheduled to be retired, now has been reemployed. // // All operations in the first phase are read-only, needing just read lock which is more // lightweight. If some entities were found following above conditions, albeit infrequenly, // they would be collected to be processed at the next phase. // // Collected entities, if any, will be processed at the second phase "retire": // * stale entities will be schedule to be retired; // * the expired entities will be retired; // * reset the retirement timestamp to 0 for reemployed entities. collected_entities_info collect_stale_entities() const; retired_entities_stat retire_stale_entities(const collected_entity_list &collected_entities); void process_stale_entities(); mutable utils::rw_lock_nr _lock; entity_map _entities; metrics_http_service _http_service; std::unique_ptr<metric_timer> _timer; DISALLOW_COPY_AND_ASSIGN(metric_registry); }; // metric_type is needed while metrics are collected to monitoring systems. Generally // each monitoring system has its own types of metrics: firstly we should know which // type our metric belongs to; then we can know how to "translate" it to the specific // monitoring system. // // On the other hand, it is also needed when some special operation should be done // for a metric type. For example, percentile should be closed while it's no longer // used. #define ENUM_FOREACH_METRIC_TYPE(DEF) \ DEF(Gauge) \ DEF(Counter) \ DEF(VolatileCounter) \ DEF(Percentile) enum class metric_type { ENUM_FOREACH_METRIC_TYPE(ENUM_CONST_DEF) kInvalidType, }; #define ENUM_CONST_REG_STR_METRIC_TYPE(str) ENUM_CONST_REG_STR(metric_type, str) ENUM_BEGIN(metric_type, metric_type::kInvalidType) ENUM_FOREACH_METRIC_TYPE(ENUM_CONST_REG_STR_METRIC_TYPE) ENUM_END(metric_type) #define ENUM_FOREACH_METRIC_UNIT(DEF) \ DEF(NanoSeconds) \ DEF(MicroSeconds) \ DEF(MilliSeconds) \ DEF(Seconds) \ DEF(Bytes) \ DEF(MegaBytes) \ DEF(BytesPerSec) \ DEF(CapacityUnits) \ DEF(Percent) \ DEF(Replicas) \ DEF(Partitions) \ DEF(PartitionSplittings) \ DEF(Servers) \ DEF(Requests) \ DEF(Responses) \ DEF(Seeks) \ DEF(PointLookups) \ DEF(Values) \ DEF(Keys) \ DEF(Files) \ DEF(Dirs) \ DEF(Amplification) \ DEF(Checkpoints) \ DEF(Flushes) \ DEF(Compactions) \ DEF(Mutations) \ DEF(Writes) \ DEF(Changes) \ DEF(Operations) \ DEF(Tasks) \ DEF(Disconnections) \ DEF(Sessions) \ DEF(Learns) \ DEF(Rounds) \ DEF(Resets) \ DEF(Backups) \ DEF(FileLoads) \ DEF(FileUploads) \ DEF(BulkLoads) \ DEF(Beacons) enum class metric_unit : size_t { ENUM_FOREACH_METRIC_UNIT(ENUM_CONST_DEF) kInvalidUnit, }; #define METRIC_ASSERT_UNIT_LATENCY(unit, index) \ static_assert(static_cast<size_t>(metric_unit::unit) == index, \ #unit " should be at index " #index) METRIC_ASSERT_UNIT_LATENCY(kNanoSeconds, 0); METRIC_ASSERT_UNIT_LATENCY(kMicroSeconds, 1); METRIC_ASSERT_UNIT_LATENCY(kMilliSeconds, 2); METRIC_ASSERT_UNIT_LATENCY(kSeconds, 3); const std::vector<uint64_t> kMetricLatencyConverterFromNS = { 1, 1000, 1000 * 1000, 1000 * 1000 * 1000}; inline uint64_t convert_metric_latency_from_ns(uint64_t latency_ns, metric_unit target_unit) { if (dsn_likely(target_unit == metric_unit::kNanoSeconds)) { // Since nanoseconds are used as the latency unit more frequently, eliminate unnecessary // conversion by branch prediction. return latency_ns; } auto index = static_cast<size_t>(target_unit); CHECK_LT(index, kMetricLatencyConverterFromNS.size()); return latency_ns / kMetricLatencyConverterFromNS[index]; } #define ENUM_CONST_REG_STR_METRIC_UNIT(str) ENUM_CONST_REG_STR(metric_unit, str) ENUM_BEGIN(metric_unit, metric_unit::kInvalidUnit) ENUM_FOREACH_METRIC_UNIT(ENUM_CONST_REG_STR_METRIC_UNIT) ENUM_END(metric_unit) class metric_prototype { public: struct ctor_args { const std::string_view entity_type; const metric_type type; const std::string_view name; const metric_unit unit; const std::string_view desc; }; std::string_view entity_type() const { return _args.entity_type; } metric_type type() const { return _args.type; } std::string_view name() const { return _args.name; } metric_unit unit() const { return _args.unit; } std::string_view description() const { return _args.desc; } protected: explicit metric_prototype(const ctor_args &args); virtual ~metric_prototype(); private: const ctor_args _args; DISALLOW_COPY_AND_ASSIGN(metric_prototype); }; // metric_prototype_with<MetricType> can help to implement the prototype of each type of metric // to construct a metric object conveniently. template <typename MetricType> class metric_prototype_with : public metric_prototype { public: explicit metric_prototype_with(const ctor_args &args) : metric_prototype(args) {} virtual ~metric_prototype_with() = default; // Construct a metric object based on the instance of metric_entity. template <typename... Args> ref_ptr<MetricType> instantiate(const metric_entity_ptr &entity, Args &&...args) const { return entity->find_or_create<MetricType>(this, std::forward<Args>(args)...); } private: DISALLOW_COPY_AND_ASSIGN(metric_prototype_with); }; template <typename MetricType, typename... Args> ref_ptr<MetricType> metric_entity::find_or_create(const metric_prototype *prototype, Args &&...args) { CHECK_STREQ_MSG(prototype->entity_type().data(), _prototype->name(), "the entity type '{}' of the metric '{}' is inconsistent with the prototype " "'{}' of the attached entity '{}'", prototype->entity_type().data(), prototype->name().data(), _prototype->name(), _id); utils::auto_write_lock l(_lock); metric_map::const_iterator iter = _metrics.find(prototype); if (iter != _metrics.end()) { auto raw_ptr = down_cast<MetricType *>(iter->second.get()); return raw_ptr; } ref_ptr<MetricType> ptr(new MetricType(prototype, std::forward<Args>(args)...)); _metrics[prototype] = ptr; return ptr; } const std::string kMetricTypeField = "type"; const std::string kMetricNameField = "name"; const std::string kMetricUnitField = "unit"; const std::string kMetricDescField = "desc"; const std::string kMetricSingleValueField = "value"; // Base class for each type of metric. // Every metric class should inherit from this class. // // User object should hold a ref_ptr of a metric, while the entity will hold another ref_ptr. // The ref count of a metric may becomes 1, which means the metric is only held by the entity: // After a period of configurable time, if the ref count is still 1, the metric will be dropped // in that it's considered to be useless. During the period when the metric is retained, once // the same one is instantiated again, it will not be removed; whether the metric is instantiated, // however, its lastest value is visible. class metric : public ref_counter { public: const metric_prototype *prototype() const { return _prototype; } // Take snapshot of each metric to collect current values as json format with fields chosen // by `filters`. virtual void take_snapshot(metric_json_writer &writer, const metric_filters &filters) = 0; protected: explicit metric(const metric_prototype *prototype); virtual ~metric() = default; // Encode a metric field specified by `field_name` as json format. However, once the field // are not chosen by `filters`, this function will do nothing. template <typename T> static inline void encode(metric_json_writer &writer, const std::string &field_name, const T &value, const metric_filters &filters) { if (!filters.match_with_metric_field(field_name)) { return; } writer.Key(field_name.c_str()); json::json_encode(writer, value); } // Encode the metric type as json format, if it is chosen by `filters`. inline void encode_type(metric_json_writer &writer, const metric_filters &filters) const { encode(writer, kMetricTypeField, enum_to_string(prototype()->type()), filters); } // Encode the metric name as json format, if it is chosen by `filters`. inline void encode_name(metric_json_writer &writer, const metric_filters &filters) const { encode(writer, kMetricNameField, prototype()->name().data(), filters); } // Encode the metric unit as json format, if it is chosen by `filters`. inline void encode_unit(metric_json_writer &writer, const metric_filters &filters) const { encode(writer, kMetricUnitField, enum_to_string(prototype()->unit()), filters); } // Encode the metric description as json format, if it is chosen by `filters`. inline void encode_desc(metric_json_writer &writer, const metric_filters &filters) const { encode(writer, kMetricDescField, prototype()->description().data(), filters); } // Encode the metric prototype as json format, if some attributes in it are chosen by `filters`. inline void encode_prototype(metric_json_writer &writer, const metric_filters &filters) const { encode_type(writer, filters); encode_name(writer, filters); encode_unit(writer, filters); encode_desc(writer, filters); } // Encode the unique value of a metric as json format, if it is chosen by `filters`. Notice // that the metric should have only one value. like gauge and counter. template <typename T> static inline void encode_single_value(metric_json_writer &writer, const T &value, const metric_filters &filters) { encode(writer, kMetricSingleValueField, value, filters); } const metric_prototype *const _prototype; private: friend class metric_entity; DISALLOW_COPY_AND_ASSIGN(metric); }; // closeable_metric is a metric that implements close() method to execute some necessary close // operations before the destructor is invoked. close() will return immediately without waiting // for any close operation to be finished, while wait() is used to wait for all of the close // operations to be finished. // // It's guaranteed that close() for each metric will be called before it is destructed. Generally // both of close() and wait() are invoked by its manager, namely metric_entity. class closeable_metric : public metric { public: virtual void close() = 0; virtual void wait() = 0; protected: explicit closeable_metric(const metric_prototype *prototype); virtual ~closeable_metric() = default; private: DISALLOW_COPY_AND_ASSIGN(closeable_metric); }; // A gauge is a metric that represents a single numerical value that can arbitrarily go up and // down. Usually there are 2 scenarios for a guage. // // Firstly, a gauge can be used as an instantaneous measurement of a discrete value. Typical // usages in this scenario are current memory usage, the total capacity and available ratio of // a disk, etc. // // Secondly, a gauge can be used as a counter that increases and decreases. In this scenario only // integral types are supported, and its typical usages are the number of tasks in queues, current // number of running manual compacts, etc. template <typename T, typename = typename std::enable_if<std::is_arithmetic<T>::value>::type> class gauge : public metric { public: using value_type = T; value_type value() const { return _value.load(std::memory_order_relaxed); } // The snapshot collected has following json format: // { // "name": "<metric_name>", // "value": ... // } // where "name" is the name of the gauge in string type, and "value" is just current value // of the gauge fetched by `value()`, in numeric types (i.e. integral or floating-point type, // determined by `value_type`). void take_snapshot(metric_json_writer &writer, const metric_filters &filters) override { writer.StartObject(); encode_prototype(writer, filters); encode_single_value(writer, value(), filters); writer.EndObject(); } void set(const value_type &val) { _value.store(val, std::memory_order_relaxed); } template <typename Int = value_type, typename = typename std::enable_if<std::is_integral<Int>::value>::type> void increment_by(Int x) { _value.fetch_add(x, std::memory_order_relaxed); } template <typename Int = value_type, typename = typename std::enable_if<std::is_integral<Int>::value>::type> void decrement_by(Int x) { increment_by(-x); } template <typename Int = value_type, typename = typename std::enable_if<std::is_integral<Int>::value>::type> void increment() { increment_by(1); } template <typename Int = value_type, typename = typename std::enable_if<std::is_integral<Int>::value>::type> void decrement() { increment_by(-1); } protected: gauge(const metric_prototype *prototype, const value_type &initial_val) : metric(prototype), _value(initial_val) { } gauge(const metric_prototype *prototype) : gauge(prototype, value_type()) {} virtual ~gauge() = default; private: friend class metric_entity; friend class ref_ptr<gauge<value_type>>; std::atomic<value_type> _value; DISALLOW_COPY_AND_ASSIGN(gauge); }; template <typename T> using gauge_ptr = ref_ptr<gauge<T>>; template <typename T> using gauge_prototype = metric_prototype_with<gauge<T>>; // A counter in essence is a 64-bit integer that increases monotonically. It should be noted that // the counter does not support to decrease. If decrease is needed, please consider to use the // gauge instead. // // The counter can be typically used to measure the number of processed requests, which in the // future can be help to compute the QPS. All counters start out at 0, and are non-negative // since they are monotonic. // // `IsVolatile` is false by default. Once it's specified as true, the counter will be volatile. // The value() function of a volatile counter will reset the counter atomically after its value // is fetched. A volatile counter can also be called as a "recent" counter. // // Sometimes "recent" counters are needed, such as the number of recent failed beacons sent from // replica server, the count of updating configurations of partitions recently, etc. The "recent" // count can be considered to be the accumulated count since it has been fetched last by value(). // // In most cases, a general (i.e. non-volatile) counter is enough, which means it can also work // for "recent" counters. For example, in Prometheus, delta() can be used to compute "recent" // count for a general counter. Therefore, declare a counter as volatile only when necessary. template <typename Adder = striped_long_adder, bool IsVolatile = false> class counter : public metric { public: // To decide which member function should be called by template parameter, the parameter // should be one of the class template parameters in case that the parameter is needed to // be written each time the member function is called. // // Using class template parameter to decide which member function should be called, another // function template parameter with the same meaning should be introduced, since the class // template parameter cannot be used as a function template parameter again and will lead // to compilation error. template <bool Volatile = IsVolatile, typename = typename std::enable_if<!Volatile && !IsVolatile>::type> int64_t value() const { return _adder.value(); } template <bool Volatile = IsVolatile, typename = typename std::enable_if<Volatile && IsVolatile>::type> int64_t value() { return _adder.fetch_and_reset(); } // The snapshot collected has following json format: // { // "name": "<metric_name>", // "value": ... // } // where "name" is the name of the counter in string type, and "value" is just current value // of the counter fetched by `value()`, in integral type (namely int64_t). void take_snapshot(metric_json_writer &writer, const metric_filters &filters) override { writer.StartObject(); encode_prototype(writer, filters); encode_single_value(writer, value(), filters); writer.EndObject(); } // NOTICE: x MUST be a non-negative integer. void increment_by(int64_t x) { CHECK_GE_MSG(x, 0, "delta({}) by increment for counter must be a non-negative integer", x); _adder.increment_by(x); } void increment() { _adder.increment(); } void reset() { _adder.reset(); } protected: counter(const metric_prototype *prototype) : metric(prototype) {} virtual ~counter() = default; private: friend class metric_entity; friend class ref_ptr<counter<Adder, IsVolatile>>; long_adder_wrapper<Adder> _adder; DISALLOW_COPY_AND_ASSIGN(counter); }; template <typename Adder = striped_long_adder, bool IsVolatile = false> using counter_ptr = ref_ptr<counter<Adder, IsVolatile>>; template <bool IsVolatile = false> using concurrent_counter_ptr = counter_ptr<concurrent_long_adder, IsVolatile>; template <typename Adder = striped_long_adder, bool IsVolatile = false> using counter_prototype = metric_prototype_with<counter<Adder, IsVolatile>>; template <typename Adder = striped_long_adder> using volatile_counter_ptr = ref_ptr<counter<Adder, true>>; using concurrent_volatile_counter_ptr = counter_ptr<concurrent_long_adder, true>; template <typename Adder = striped_long_adder> using volatile_counter_prototype = metric_prototype_with<counter<Adder, true>>; #define KTH_PERCENTILE(prefix, kth) prefix##kth #define KTH_PERCENTILE_TYPE(kth) KTH_PERCENTILE(P, kth) #define ENUM_KTH_PERCENTILE_TYPE(qualifier, kth) qualifier KTH_PERCENTILE_TYPE(kth) #define KTH_PERCENTILE_NAME(kth) KTH_PERCENTILE(p, kth) #define ENUM_REG_WITH_KTH_PERCENTILE_TYPE(kth) \ ENUM_REG_WITH_CUSTOM_NAME(ENUM_KTH_PERCENTILE_TYPE(kth_percentile_type::, kth), \ KTH_PERCENTILE_NAME(kth)) struct kth_percentile_property { std::string name; double decimal; }; #define STRINGIFY_KTH_PERCENTILE_NAME(kth) STRINGIFY(KTH_PERCENTILE_NAME(kth)) #define KTH_TO_DECIMAL(kth) 0.##kth #define KTH_PERCENTILE_PROPERTY_LIST(kth) \ { \ STRINGIFY_KTH_PERCENTILE_NAME(kth), KTH_TO_DECIMAL(kth) \ } // All supported kinds of kth percentiles. User can configure required kth percentiles for // each percentile. Only configured kth percentiles will be computed. This can reduce CPU // consumption. #define ALL_KTH_PERCENTILE_TYPES(qualifier) \ ENUM_KTH_PERCENTILE_TYPE(qualifier, 50) \ , ENUM_KTH_PERCENTILE_TYPE(qualifier, 90), ENUM_KTH_PERCENTILE_TYPE(qualifier, 95), \ ENUM_KTH_PERCENTILE_TYPE(qualifier, 99), ENUM_KTH_PERCENTILE_TYPE(qualifier, 999) enum class kth_percentile_type : size_t { ALL_KTH_PERCENTILE_TYPES(), COUNT, INVALID, }; // Support to load from configuration files for percentiles. ENUM_BEGIN(kth_percentile_type, kth_percentile_type::INVALID) ENUM_REG_WITH_KTH_PERCENTILE_TYPE(50) ENUM_REG_WITH_KTH_PERCENTILE_TYPE(90) ENUM_REG_WITH_KTH_PERCENTILE_TYPE(95) ENUM_REG_WITH_KTH_PERCENTILE_TYPE(99) ENUM_REG_WITH_KTH_PERCENTILE_TYPE(999) ENUM_END(kth_percentile_type) // Generate decimals from kth percentiles. const std::vector<kth_percentile_property> kAllKthPercentiles = {KTH_PERCENTILE_PROPERTY_LIST(50), KTH_PERCENTILE_PROPERTY_LIST(90), KTH_PERCENTILE_PROPERTY_LIST(95), KTH_PERCENTILE_PROPERTY_LIST(99), KTH_PERCENTILE_PROPERTY_LIST(999)}; const std::set<kth_percentile_type> kAllKthPercentileTypes = { ALL_KTH_PERCENTILE_TYPES(kth_percentile_type::)}; inline std::string kth_percentile_to_name(const kth_percentile_type &type) { auto index = static_cast<size_t>(type); CHECK_LT(index, kAllKthPercentiles.size()); return kAllKthPercentiles[index].name; } inline size_t kth_percentile_to_nth_index(size_t size, size_t kth_index) { CHECK_LT(kth_index, kAllKthPercentiles.size()); auto decimal = kAllKthPercentiles[kth_index].decimal; // Since the kth percentile is the value that is greater than k percent of the data values after // ranking them (https://people.richland.edu/james/ictcm/2001/descriptive/helpposition.html), // compute the nth index by size * decimal rather than size * decimal - 1. return static_cast<size_t>(size * decimal); } inline size_t kth_percentile_to_nth_index(size_t size, kth_percentile_type type) { return kth_percentile_to_nth_index(size, static_cast<size_t>(type)); } // The percentile is a metric type that samples observations. The size of samples has an upper // bound. Once the maximum size is reached, the earliest observations will be overwritten. // // On the other hand, kth percentiles, such as P50, P90, P95, P99, P999, will be calculated // periodically over all samples. The kth percentiles which are calculated are configurable // provided that they are of valid kth_percentile_type (i.e. in kAllKthPercentileTypes). // // The most common usage of percentile is latency, such as server-level and replica-level // latencies. For example, if P99 latency is 10 ms, it means the latencies of 99% requests // are less than 10 ms. // // The percentile is implemented by the finder for nth elements. Each kth percentile is firstly // converted to nth index; then, find the element corresponding to the nth index. template <typename T, typename NthElementFinder = stl_nth_element_finder<T>, typename = typename std::enable_if<std::is_arithmetic<T>::value>::type> class percentile : public closeable_metric { public: using value_type = T; using size_type = typename NthElementFinder::size_type; void set(const value_type &val) { const auto index = _tail.fetch_add(1, std::memory_order_relaxed); _samples.get()[index & (_sample_size - 1)] = val; } // Set the same value for n times, used to treat a single value as the result of multiple // observations, e.g. taking the latency of executing the entire batch as the latency for // processing each request within it (see pegasus_write_service::batch_finish()). void set(size_t n, const value_type &val) { for (size_t i = 0; i < n; ++i) { set(val); } } // If `type` is not configured, it will return false with zero value stored in `val`; // otherwise, it will always return true with the value corresponding to `type`. bool get(kth_percentile_type type, value_type &val) const { const auto index = static_cast<size_t>(type); CHECK_LT(index, static_cast<size_t>(kth_percentile_type::COUNT)); val = value(index); return _kth_percentile_bitset.test(index); } // The snapshot collected has following json format: // { // "name": "<metric_name>", // "p50": ..., // "p90": ..., // "p95": ..., // ... // } // where "name" is the name of the percentile in string type, with each configured kth // percentile followed, such as "p50", "p90", "p95", etc. All of them are in numeric types // (i.e. integral or floating-point type, determined by `value_type`). void take_snapshot(metric_json_writer &writer, const metric_filters &filters) override { writer.StartObject(); encode_prototype(writer, filters); for (size_t i = 0; i < static_cast<size_t>(kth_percentile_type::COUNT); ++i) { if (!_kth_percentile_bitset.test(i)) { continue; } encode(writer, kAllKthPercentiles[i].name, value(i), filters); } writer.EndObject(); } bool timer_enabled() const { return !!_timer; } uint64_t get_initial_delay_ms() const { return timer_enabled() ? _timer->get_initial_delay_ms() : 0; } static const size_type kDefaultSampleSize = 4096; protected: // interval_ms is the interval between the computations for percentiles. Its unit is // milliseconds. It's suggested that interval_ms should be near the period between pulls // from or pushes to the monitoring system. // TODO(wangdan): we can also support constructing percentiles from the parameters in // the configuration file. percentile(const metric_prototype *prototype, uint64_t interval_ms = 10000, const std::set<kth_percentile_type> &kth_percentiles = kAllKthPercentileTypes, size_type sample_size = kDefaultSampleSize) : closeable_metric(prototype), _sample_size(sample_size), _last_real_sample_size(0), _samples(cacheline_aligned_alloc_array<value_type>(sample_size, value_type{})), _tail(0), _kth_percentile_bitset(), _full_nth_elements(static_cast<size_t>(kth_percentile_type::COUNT)), _nth_element_finder(), _timer() { CHECK(_sample_size > 0 && (_sample_size & (_sample_size - 1)) == 0, "sample_sizes should be > 0 and power of 2"); CHECK(_samples, ""); for (const auto &kth : kth_percentiles) { _kth_percentile_bitset.set(static_cast<size_t>(kth)); } for (size_type i = 0; i < _full_nth_elements.size(); ++i) { _full_nth_elements[i].store(value_type{}, std::memory_order_relaxed); } #ifdef MOCK_TEST if (interval_ms == 0) { // Timer is disabled. return; } #else CHECK_GT(interval_ms, 0); #endif // Increment ref count of percentile, since it will be referenced by timer. // This will extend the lifetime of percentile and prevent from heap-use-after-free // error. // // The ref count will be decremented at the moment when the percentile will // never be used by timer, which means the percentile can be destructed safely. // See on_close() for details which is registered in timer and will be called // back once close() is invoked. add_ref(); _timer.reset(new metric_timer( interval_ms, std::bind(&percentile<value_type, NthElementFinder>::find_nth_elements, this), std::bind(&percentile<value_type, NthElementFinder>::on_close, this))); } virtual ~percentile() = default; private: using nth_container_type = typename NthElementFinder::nth_container_type; friend class metric_entity; friend class ref_ptr<percentile<value_type, NthElementFinder>>; friend class MetricVarTest; virtual void close() override { if (_timer) { _timer->close(); } } virtual void wait() override { if (_timer) { _timer->wait(); } } void on_close() { // This will be called back after timer is closed, which means the percentile is // no longer needed by timer and can be destructed safely. release_ref(); } std::vector<value_type> samples_for_test() { size_type real_sample_size = std::min(static_cast<size_type>(_tail.load()), _sample_size); if (real_sample_size == 0) { return std::vector<value_type>(); } std::vector<value_type> real_samples(real_sample_size); std::copy(_samples.get(), _samples.get() + real_sample_size, real_samples.begin()); return real_samples; } void reset_tail_for_test() { _tail.store(0); } value_type value(size_t index) const { return _full_nth_elements[index].load(std::memory_order_relaxed); } void find_nth_elements() { size_type real_sample_size = std::min(static_cast<size_type>(_tail.load()), _sample_size); if (real_sample_size == 0) { // No need to find since there has not been any sample yet. return; } // If the size of samples changes, the nth indexs should be updated. if (real_sample_size != _last_real_sample_size) { set_real_nths(real_sample_size); _last_real_sample_size = real_sample_size; } // Find nth elements. std::vector<value_type> array(real_sample_size); std::copy(_samples.get(), _samples.get() + real_sample_size, array.begin()); _nth_element_finder(array.begin(), array.begin(), array.end()); // Store nth elements. const auto &elements = _nth_element_finder.elements(); for (size_t i = 0, next = 0; i < static_cast<size_t>(kth_percentile_type::COUNT); ++i) { if (!_kth_percentile_bitset.test(i)) { continue; } _full_nth_elements[i].store(elements[next++], std::memory_order_relaxed); } } void set_real_nths(size_type real_sample_size) { nth_container_type nths; for (size_t i = 0; i < static_cast<size_t>(kth_percentile_type::COUNT); ++i) { if (!_kth_percentile_bitset.test(i)) { continue; } auto size = static_cast<size_t>(real_sample_size); auto nth = static_cast<size_type>(kth_percentile_to_nth_index(size, i)); nths.push_back(nth); } _nth_element_finder.set_nths(nths); } const size_type _sample_size; size_type _last_real_sample_size; cacheline_aligned_ptr<value_type> _samples; std::atomic<uint64_t> _tail; // use unsigned int to avoid running out of bound std::bitset<static_cast<size_t>(kth_percentile_type::COUNT)> _kth_percentile_bitset; std::vector<std::atomic<value_type>> _full_nth_elements; NthElementFinder _nth_element_finder; std::unique_ptr<metric_timer> _timer; DISALLOW_COPY_AND_ASSIGN(percentile); }; template <typename T, typename NthElementFinder = stl_nth_element_finder<T>, typename = typename std::enable_if<std::is_arithmetic<T>::value>::type> using percentile_ptr = ref_ptr<percentile<T, NthElementFinder>>; template <typename T, typename NthElementFinder = stl_nth_element_finder<T>, typename = typename std::enable_if<std::is_arithmetic<T>::value>::type> using percentile_prototype = metric_prototype_with<percentile<T, NthElementFinder>>; template <typename T, typename NthElementFinder = floating_stl_nth_element_finder<T>, typename = typename std::enable_if<std::is_floating_point<T>::value>::type> using floating_percentile = percentile<T, NthElementFinder>; template <typename T, typename NthElementFinder = floating_stl_nth_element_finder<T>, typename = typename std::enable_if<std::is_floating_point<T>::value>::type> using floating_percentile_ptr = ref_ptr<floating_percentile<T, NthElementFinder>>; template <typename T, typename NthElementFinder = floating_stl_nth_element_finder<T>, typename = typename std::enable_if<std::is_floating_point<T>::value>::type> using floating_percentile_prototype = metric_prototype_with<floating_percentile<T, NthElementFinder>>; // Compute latency automatically at the end of the scope, which is set to percentile which it has // bound to. class auto_latency { public: auto_latency(const percentile_ptr<int64_t> &p) : _percentile(p) {} auto_latency(const percentile_ptr<int64_t> &p, std::function<void(uint64_t)> callback) : _percentile(p), _callback(std::move(callback)) { } auto_latency(const percentile_ptr<int64_t> &p, uint64_t start_time_ns) : _percentile(p), _chrono(start_time_ns) { } auto_latency(const percentile_ptr<int64_t> &p, uint64_t start_time_ns, std::function<void(uint64_t)> callback) : _percentile(p), _chrono(start_time_ns), _callback(std::move(callback)) { } ~auto_latency() { auto latency = convert_metric_latency_from_ns(_chrono.duration_ns(), _percentile->prototype()->unit()); _percentile->set(static_cast<int64_t>(latency)); if (_callback) { _callback(latency); } } inline uint64_t duration_ns() const { return _chrono.duration_ns(); } private: percentile_ptr<int64_t> _percentile; utils::chronograph _chrono; std::function<void(uint64_t)> _callback; DISALLOW_COPY_AND_ASSIGN(auto_latency); }; // Increment gauge and decrement it automatically at the end of the scope. class auto_count { public: auto_count(const gauge_ptr<int64_t> &g) : _gauge(g) { _gauge->increment(); } auto_count(const gauge_ptr<int64_t> &g, std::function<void()> callback) : _gauge(g), _callback(std::move(callback)) { _gauge->increment(); } ~auto_count() { if (_callback) { _callback(); } _gauge->decrement(); } private: gauge_ptr<int64_t> _gauge; std::function<void()> _callback; DISALLOW_COPY_AND_ASSIGN(auto_count); }; #define DEF_METRIC_BRIEF_SNAPSHOT(field) \ struct metric_brief_##field##_snapshot \ { \ std::string name; \ double field = 0.0; \ \ DEFINE_JSON_SERIALIZATION(name, field) \ } #define DEF_METRIC_ENTITY_BRIEF_SNAPSHOT(field) \ struct metric_entity_brief_##field##_snapshot \ { \ std::string type; \ std::string id; \ metric_entity::attr_map attributes; \ std::vector<metric_brief_##field##_snapshot> metrics; \ \ DEFINE_JSON_SERIALIZATION(type, id, attributes, metrics) \ } #define DEF_METRIC_QUERY_BRIEF_SNAPSHOT(field) \ struct metric_query_brief_##field##_snapshot \ { \ std::string cluster; \ std::string role; \ std::string host; \ uint16_t port; \ uint64_t timestamp_ns; \ std::vector<metric_entity_brief_##field##_snapshot> entities; \ \ DEFINE_JSON_SERIALIZATION(cluster, role, host, port, timestamp_ns, entities) \ } #define DEF_ALL_METRIC_BRIEF_SNAPSHOTS(field) \ DEF_METRIC_BRIEF_SNAPSHOT(field); \ DEF_METRIC_ENTITY_BRIEF_SNAPSHOT(field); \ DEF_METRIC_QUERY_BRIEF_SNAPSHOT(field) DEF_ALL_METRIC_BRIEF_SNAPSHOTS(value); DEF_ALL_METRIC_BRIEF_SNAPSHOTS(p99); // Deserialize the json string into the snapshot. template <typename TMetricSnapshot> inline error_s deserialize_metric_snapshot(const std::string &json_string, TMetricSnapshot &snapshot) { dsn::blob bb(json_string.data(), 0, json_string.size()); if (dsn_unlikely(!dsn::json::json_forwarder<TMetricSnapshot>::decode(bb, snapshot))) { return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid json string: {}", json_string); } return error_s::ok(); } #define DESERIALIZE_METRIC_SNAPSHOT(json_string, query_snapshot) \ do { \ const auto &res = deserialize_metric_snapshot(json_string, query_snapshot); \ if (dsn_unlikely(!res)) { \ return res; \ } \ } while (0) // Deserialize the json string into the snapshot specially for metric query which is declared // internally. #define DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(field, json_string, query_snapshot) \ dsn::metric_query_brief_##field##_snapshot query_snapshot; \ DESERIALIZE_METRIC_SNAPSHOT(json_string, query_snapshot) // Deserialize both json string samples into respective snapshots. template <typename TMetricSnapshot> inline error_s deserialize_metric_2_samples(const std::string &json_string_start, const std::string &json_string_end, TMetricSnapshot &snapshot_start, TMetricSnapshot &snapshot_end) { DESERIALIZE_METRIC_SNAPSHOT(json_string_start, snapshot_start); DESERIALIZE_METRIC_SNAPSHOT(json_string_end, snapshot_end); return error_s::ok(); } // Deserialize both json string samples into respective snapshots specially for metric queries. template <typename TMetricQuerySnapshot> inline error_s deserialize_metric_query_2_samples(const std::string &json_string_start, const std::string &json_string_end, TMetricQuerySnapshot &snapshot_start, TMetricQuerySnapshot &snapshot_end) { const auto &res = deserialize_metric_2_samples( json_string_start, json_string_end, snapshot_start, snapshot_end); if (!res) { return res; } if (snapshot_end.timestamp_ns <= snapshot_start.timestamp_ns) { return FMT_ERR(dsn::ERR_INVALID_DATA, "duration for metric samples should be > 0: timestamp_ns_start={}, " "timestamp_ns_end={}", snapshot_start.timestamp_ns, snapshot_end.timestamp_ns); } return error_s::ok(); } // Deserialize both json string samples into respective snapshots specially for metric queries // which are declared internally. // // Currently only Gauge and Counter are considered to have "increase" and "rate", which means // samples are needed. Thus brief `value` field is enough. #define DESERIALIZE_METRIC_QUERY_BRIEF_2_SAMPLES( \ json_string_start, json_string_end, query_snapshot_start, query_snapshot_end) \ dsn::metric_query_brief_value_snapshot query_snapshot_start; \ dsn::metric_query_brief_value_snapshot query_snapshot_end; \ \ do { \ const auto &res = deserialize_metric_query_2_samples( \ json_string_start, json_string_end, query_snapshot_start, query_snapshot_end); \ if (dsn_unlikely(!res)) { \ return res; \ } \ } while (0) // Find the duration between the 2 timestamps, generally used for calculate the rates over the // metrics, such as QPS. inline double calc_metric_sample_duration_s(uint64_t timestamp_ns_start, uint64_t timestamp_ns_end) { CHECK_LT(timestamp_ns_start, timestamp_ns_end); const std::chrono::duration<double, std::nano> duration_ns( static_cast<double>(timestamp_ns_end - timestamp_ns_start)); const std::chrono::duration<double> duration_s = duration_ns; return duration_s.count(); } // Parse the attributes as their original types. template <typename TAttrValue, typename = typename std::enable_if<std::is_arithmetic<TAttrValue>::value>::type> inline error_s parse_metric_attribute(const metric_entity::attr_map &attrs, const std::string &name, TAttrValue &value) { const auto *value_ptr = gutil::FindOrNull(attrs, name); if (dsn_unlikely(value_ptr == nullptr)) { return FMT_ERR(dsn::ERR_INVALID_DATA, "{} field was not found", name); } if (dsn_unlikely(!dsn::buf2numeric(*value_ptr, value))) { return FMT_ERR(dsn::ERR_INVALID_DATA, "invalid {}: {}", name, *value_ptr); } return error_s::ok(); } inline error_s parse_metric_table_id(const metric_entity::attr_map &attrs, int32_t &table_id) { return parse_metric_attribute(attrs, "table_id", table_id); } inline error_s parse_metric_partition_id(const metric_entity::attr_map &attrs, int32_t &partition_id) { return parse_metric_attribute(attrs, "partition_id", partition_id); } } // namespace dsn // Since server_metric_entity() will be called in macros such as METRIC_VAR_INIT_server(), its // declaration should be put outside any namespace (for example dsn). server_metric_entity() // will not be qualified with any namespace. Once it was qualified with some namespace, its name // would not be resolved in any other namespace. dsn::metric_entity_ptr server_metric_entity();