src/utils/metrics.cpp (446 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "utils/metrics.h"
#include <boost/asio/basic_deadline_timer.hpp>
#include <boost/date_time/posix_time/posix_time_duration.hpp>
#include <boost/system/error_code.hpp>
#include <fmt/core.h>
#include <new>
#include "runtime/api_layer1.h"
#include "utils/flags.h"
#include "utils/rand.h"
#include "utils/shared_io_service.h"
#include "utils/string_conv.h"
#include "utils/strings.h"
namespace dsn {
DSN_DEFINE_uint64(metrics,
entity_retirement_delay_ms,
10 * 60 * 1000,
"The retention internal (milliseconds) for an entity after it becomes stale.");
metric_entity::metric_entity(const metric_entity_prototype *prototype,
const std::string &id,
const attr_map &attrs)
: _prototype(prototype), _id(id), _attrs(attrs), _retire_time_ms(0)
{
}
metric_entity::~metric_entity()
{
// We have to wait for all of close operations to be finished. Waiting for close operations to
// be finished in the destructor of each metirc may lead to memory leak detected in ASAN test
// for dsn_utils_test, since the percentile is also referenced by shared_io_service which is
// still alive without being destructed after ASAN test for dsn_utils_test is finished.
close(close_option::kWait);
}
void metric_entity::close(close_option option)
{
utils::auto_write_lock l(_lock);
// To close all metrics owned by an entity, it's more efficient to firstly issue an asynchronous
// close request to each metric; then, just wait for all of the close operations to be finished.
// It's inefficient to wait for each metric to be closed one by one. Therefore, the metric is
// not closed in its destructor.
for (auto &m : _metrics) {
if (m.second->prototype()->type() == metric_type::kPercentile) {
auto p = down_cast<closeable_metric *>(m.second.get());
p->close();
}
}
if (option == close_option::kNoWait) {
return;
}
// Wait for all of the close operations to be finished.
for (auto &m : _metrics) {
if (m.second->prototype()->type() == metric_type::kPercentile) {
auto p = down_cast<closeable_metric *>(m.second.get());
p->wait();
}
}
}
metric_entity::attr_map metric_entity::attributes() const
{
utils::auto_read_lock l(_lock);
return _attrs;
}
metric_entity::metric_map metric_entity::metrics() const
{
utils::auto_read_lock l(_lock);
return _metrics;
}
void metric_entity::set_attributes(const attr_map &attrs)
{
utils::auto_write_lock l(_lock);
_attrs = attrs;
}
void metric_entity::encode_type(metric_json_writer &writer) const
{
writer.Key(kMetricEntityTypeField.c_str());
json::json_encode(writer, _prototype->name());
}
void metric_entity::encode_id(metric_json_writer &writer) const
{
writer.Key(kMetricEntityIdField.c_str());
json::json_encode(writer, _id);
}
namespace {
void encode_attrs(dsn::metric_json_writer &writer, const dsn::metric_entity::attr_map &attrs)
{
// Empty attributes are allowed and will just be encoded as {}.
writer.Key(dsn::kMetricEntityAttrsField.c_str());
writer.StartObject();
for (const auto &attr : attrs) {
writer.Key(attr.first.c_str());
dsn::json::json_encode(writer, attr.second);
}
writer.EndObject();
}
void encode_metrics(dsn::metric_json_writer &writer,
const dsn::metric_entity::metric_map &metrics,
const dsn::metric_filters &filters)
{
// We shouldn't reach here if no metric is chosen, thus just mark an assertion.
CHECK(!metrics.empty(),
"this entity should not be encoded into the response since no metric is chosen");
writer.Key(dsn::kMetricEntityMetricsField.c_str());
writer.StartArray();
for (const auto &m : metrics) {
m.second->take_snapshot(writer, filters);
}
writer.EndArray();
}
} // anonymous namespace
void metric_entity::take_snapshot(metric_json_writer &writer, const metric_filters &filters) const
{
if (!filters.match_entity_type(_prototype->name())) {
return;
}
if (!filters.match_entity_id(_id)) {
return;
}
attr_map my_attrs;
metric_map target_metrics;
{
utils::auto_read_lock l(_lock);
if (!filters.match_entity_attrs(_attrs)) {
return;
}
filters.extract_entity_metrics(_metrics, target_metrics);
if (target_metrics.empty()) {
// None of metrics is chosen, there is no need to take snapshot for
// this entity.
return;
}
my_attrs = _attrs;
}
// At least one metric of this entity has been chosen, thus take snapshot and encode
// this entity as json format.
writer.StartObject();
encode_type(writer);
encode_id(writer);
encode_attrs(writer, my_attrs);
encode_metrics(writer, target_metrics, filters);
writer.EndObject();
}
bool metric_entity::is_stale() const
{
// Since this entity itself is still being accessed, its reference count should be 1
// at least.
CHECK_GE(get_count(), 1);
// This entity is considered stale once there is only one reference for it kept in the
// registry.
return get_count() == 1;
}
void metric_filters::extract_entity_metrics(const metric_entity::metric_map &candidates,
metric_entity::metric_map &target_metrics) const
{
if (entity_metrics.empty()) {
target_metrics = candidates;
return;
}
target_metrics.clear();
for (const auto &candidate : candidates) {
if (match(candidate.first->name().data(), entity_metrics)) {
target_metrics.emplace(candidate.first, candidate.second);
}
}
}
metric_entity_ptr metric_entity_prototype::instantiate(const std::string &id,
const metric_entity::attr_map &attrs) const
{
return metric_registry::instance().find_or_create_entity(this, id, attrs);
}
metric_entity_ptr metric_entity_prototype::instantiate(const std::string &id) const
{
return instantiate(id, {});
}
metric_entity_prototype::metric_entity_prototype(const char *name) : _name(name) {}
metric_entity_prototype::~metric_entity_prototype() {}
metrics_http_service::metrics_http_service(metric_registry *registry) : _registry(registry)
{
register_handler("metrics",
std::bind(&metrics_http_service::get_metrics_handler,
this,
std::placeholders::_1,
std::placeholders::_2),
"ip:port/metrics");
}
namespace {
template <typename Container>
void parse_as(const std::string &field_value, Container &container)
{
utils::split_args(field_value.c_str(), container, ',');
}
inline void encode_error(dsn::metric_json_writer &writer, const char *error_message)
{
writer.StartObject();
writer.Key("error_message");
dsn::json::json_encode(writer, error_message);
writer.EndObject();
}
inline std::string encode_error_as_json(const char *error_message)
{
return encode_as_json(
[error_message](metric_json_writer &writer) { encode_error(writer, error_message); });
}
dsn::metric_filters::metric_fields_type get_brief_metric_fields()
{
dsn::metric_filters::metric_fields_type fields = {kMetricNameField, kMetricSingleValueField};
for (const auto &kth : kAllKthPercentiles) {
fields.insert(kth.name);
}
return fields;
}
const dsn::metric_filters::metric_fields_type kBriefMetricFields = get_brief_metric_fields();
} // anonymous namespace
void metrics_http_service::get_metrics_handler(const http_request &req, http_response &resp)
{
if (req.method != http_method::HTTP_METHOD_GET) {
resp.body = encode_error_as_json("please use 'GET' method while querying for metrics");
resp.status_code = http_status_code::bad_request;
return;
}
metric_filters filters;
bool with_metric_fields = false;
bool detail = false;
for (const auto &field : req.query_args) {
if (field.first == "with_metric_fields") {
parse_as(field.second, filters.with_metric_fields);
with_metric_fields = true;
} else if (field.first == "types") {
parse_as(field.second, filters.entity_types);
} else if (field.first == "ids") {
parse_as(field.second, filters.entity_ids);
} else if (field.first == "attributes") {
parse_as(field.second, filters.entity_attrs);
if ((filters.entity_attrs.size() & 1) != 0) {
resp.body =
encode_error_as_json("the number of arguments for attributes should be even, "
"since each attribute name always pairs with a value");
resp.status_code = http_status_code::bad_request;
return;
}
} else if (field.first == "metrics") {
parse_as(field.second, filters.entity_metrics);
} else if (field.first == "detail") {
if (!buf2bool(field.second, detail)) {
resp.body = encode_error_as_json("the value of detail should be a boolean value, "
"i.e. true or false");
resp.status_code = http_status_code::bad_request;
return;
}
} else {
auto error_message = fmt::format("unknown field {}={}", field.first, field.second);
resp.body = encode_error_as_json(error_message.c_str());
resp.status_code = http_status_code::bad_request;
return;
}
}
// `with_metric_fields` takes precedence over `detail`: once `with_metric_fields` is
// specified, it will be considered firstly.
if (!with_metric_fields && !detail) {
filters.with_metric_fields = kBriefMetricFields;
}
resp.body = take_snapshot_as_json(_registry, filters);
resp.status_code = http_status_code::ok;
}
metric_registry::metric_registry() : _http_service(this)
{
// We should ensure that metric_registry is destructed before shared_io_service is destructed.
// Once shared_io_service is destructed before metric_registry is destructed,
// boost::asio::io_service needed by metrics in metric_registry such as metric_timer will
// be released firstly, then will lead to heap-use-after-free error since percentiles in
// metric_registry are still running but the resources they needed have been released.
tools::shared_io_service::instance();
start_timer();
}
metric_registry::~metric_registry()
{
utils::auto_write_lock l(_lock);
// Once the registery is chosen to be destructed, all of the entities and metrics owned by it
// will no longer be needed.
//
// The reason why each entity is closed in the registery rather than in the destructor of each
// entity is that close(kNoWait) for the entity will return immediately without waiting for any
// close operation to be finished.
//
// Thus, to close all entities owned by a registery, it's more efficient to firstly issue a
// close request for all entities; then, just wait for all of the close operations to be
// finished in the destructor of each entity. It's inefficient to wait for each entity to be
// closed one by one.
for (auto &entity : _entities) {
entity.second->close(metric_entity::close_option::kNoWait);
}
stop_timer();
}
void metric_registry::on_close() {}
void metric_registry::start_timer()
{
if (_timer) {
return;
}
// Once an entity is considered stale, it will be retired after the retention interval,
// namely FLAGS_entity_retirement_delay_ms milliseconds. Therefore, if the interval of
// the timer is also set to FLAGS_entity_retirement_delay_ms, in the next round, it's
// just about time to retire this entity.
_timer.reset(new metric_timer(FLAGS_entity_retirement_delay_ms,
std::bind(&metric_registry::process_stale_entities, this),
std::bind(&metric_registry::on_close, this)));
}
void metric_registry::stop_timer()
{
if (!_timer) {
return;
}
// Close the timer synchronously.
_timer->close();
_timer->wait();
// Reset the timer to mark that it has been stopped, now it could be started.
_timer.reset();
}
metric_registry::entity_map metric_registry::entities() const
{
utils::auto_read_lock l(_lock);
return _entities;
}
void metric_registry::take_snapshot(metric_json_writer &writer, const metric_filters &filters) const
{
utils::auto_read_lock l(_lock);
writer.StartArray();
for (const auto &entity : _entities) {
entity.second->take_snapshot(writer, filters);
}
writer.EndArray();
}
metric_entity_ptr metric_registry::find_or_create_entity(const metric_entity_prototype *prototype,
const std::string &id,
const metric_entity::attr_map &attrs)
{
utils::auto_write_lock l(_lock);
entity_map::const_iterator iter = _entities.find(id);
metric_entity_ptr entity;
if (iter == _entities.end()) {
entity = new metric_entity(prototype, id, attrs);
_entities[id] = entity;
} else {
CHECK_STREQ_MSG(
prototype->name(),
iter->second->prototype()->name(),
"new prototype '{}' is inconsistent with old prototype '{}' for entity '{}'",
prototype->name(),
iter->second->prototype()->name(),
id);
iter->second->set_attributes(attrs);
entity = iter->second;
}
return entity;
}
metric_registry::collected_entities_info metric_registry::collect_stale_entities() const
{
collected_entities_info collected_info;
auto now = dsn_now_ms();
utils::auto_read_lock l(_lock);
for (const auto &entity : _entities) {
if (!entity.second->is_stale()) {
if (entity.second->_retire_time_ms > 0) {
// This entity had been scheduled to be retired. However, it was reemployed
// after that. It has been in use since then, therefore its scheduled time
// for retirement should be reset to 0.
collected_info.collected_entities.insert(entity.first);
}
continue;
}
if (entity.second->_retire_time_ms > now) {
// This entity has been scheduled to be retired, however it is still within
// the retention interval. Thus do not collect it.
++collected_info.num_scheduled_entities;
continue;
}
collected_info.collected_entities.insert(entity.first);
}
collected_info.num_all_entities = _entities.size();
return collected_info;
}
metric_registry::retired_entities_stat
metric_registry::retire_stale_entities(const collected_entity_list &collected_entities)
{
if (collected_entities.empty()) {
// Do not lock for empty list.
return retired_entities_stat();
}
retired_entities_stat retired_stat;
auto now = dsn_now_ms();
utils::auto_write_lock l(_lock);
for (const auto &collected_entity : collected_entities) {
auto iter = _entities.find(collected_entity);
if (dsn_unlikely(iter == _entities.end())) {
// The entity has been removed from the registry for some unusual reason.
continue;
}
if (!iter->second->is_stale()) {
if (iter->second->_retire_time_ms > 0) {
// For those entities which are reemployed, their scheduled time for retirement
// should be reset to 0 though previously they could have been scheduled to be
// retired.
iter->second->_retire_time_ms = 0;
++retired_stat.num_reemployed_entities;
}
continue;
}
if (dsn_unlikely(iter->second->_retire_time_ms > now)) {
// Since in collect_stale_entities() we've filtered the metrics which have been
// outside the retention interval, this is unlikely to happen. However, we still
// check here.
continue;
}
if (iter->second->_retire_time_ms == 0) {
// The entity should be marked with a scheduled time for retirement, since it has
// already been considered stale.
iter->second->_retire_time_ms = now + FLAGS_entity_retirement_delay_ms;
++retired_stat.num_recently_scheduled_entities;
continue;
}
// Once the entity is outside the retention interval, retire it from the registry.
_entities.erase(iter);
++retired_stat.num_retired_entities;
}
return retired_stat;
}
void metric_registry::process_stale_entities()
{
LOG_INFO("begin to process stale metric entities");
const auto &collected_info = collect_stale_entities();
const auto &retired_stat = retire_stale_entities(collected_info.collected_entities);
LOG_INFO("stat for metric entities: total={}, collected={}, retired={}, scheduled={}, "
"recently_scheduled={}, reemployed={}",
collected_info.num_all_entities,
collected_info.collected_entities.size(),
retired_stat.num_retired_entities,
collected_info.num_scheduled_entities,
retired_stat.num_recently_scheduled_entities,
retired_stat.num_reemployed_entities);
}
metric_prototype::metric_prototype(const ctor_args &args) : _args(args) {}
metric_prototype::~metric_prototype() {}
metric::metric(const metric_prototype *prototype) : _prototype(prototype) {}
closeable_metric::closeable_metric(const metric_prototype *prototype) : metric(prototype) {}
uint64_t metric_timer::generate_initial_delay_ms(uint64_t interval_ms)
{
CHECK_GT(interval_ms, 0);
if (interval_ms < 1000) {
return rand::next_u64() % interval_ms + 50;
}
uint64_t interval_seconds = interval_ms / 1000;
return (rand::next_u64() % interval_seconds + 1) * 1000 + rand::next_u64() % 1000;
}
metric_timer::metric_timer(uint64_t interval_ms, on_exec_fn on_exec, on_close_fn on_close)
: _initial_delay_ms(generate_initial_delay_ms(interval_ms)),
_interval_ms(interval_ms),
_on_exec(on_exec),
_on_close(on_close),
_state(state::kRunning),
_completed(),
_timer(new boost::asio::deadline_timer(tools::shared_io_service::instance().ios))
{
_timer->expires_from_now(boost::posix_time::milliseconds(_initial_delay_ms));
_timer->async_wait(std::bind(&metric_timer::on_timer, this, std::placeholders::_1));
}
void metric_timer::close()
{
// If the timer has already expired when cancel() is called, then the handlers for asynchronous
// wait operations will:
// * have already been invoked; or
// * have been queued for invocation in the near future.
//
// These handlers can no longer be cancelled, and therefore are passed an error code that
// indicates the successful completion of the wait operation. Thus set the state of timer to
// kClosing to tell on_timer() that the timer should be closed even if it is not called with
// operation_canceled.
auto expected_state = state::kRunning;
if (_state.compare_exchange_strong(expected_state, state::kClosing)) {
_timer->cancel();
}
}
void metric_timer::wait() { _completed.wait(); }
void metric_timer::on_close()
{
_on_close();
_completed.notify();
}
void metric_timer::on_timer(const boost::system::error_code &ec)
{
// This macro is defined for the case that handlers for asynchronous wait operations are no
// longer cancelled. It just checks the internal state atomically (since close() can also be
// called simultaneously) for kClosing; once it's matched, it will stop the timer by not executing
// any future handler.
#define TRY_PROCESS_TIMER_CLOSING() \
do { \
auto expected_state = state::kClosing; \
if (_state.compare_exchange_strong(expected_state, state::kClosed)) { \
on_close(); \
return; \
} \
} while (0)
if (dsn_unlikely(!!ec)) {
CHECK_EQ_MSG(ec,
boost::system::errc::operation_canceled,
"failed to exec on_timer with an error that cannot be handled: {}",
ec.message());
// Cancel can only be launched by close().
auto expected_state = state::kClosing;
CHECK(_state.compare_exchange_strong(expected_state, state::kClosed),
"wrong state for metric_timer: {}, while expecting closing state",
static_cast<int>(expected_state));
on_close();
return;
}
TRY_PROCESS_TIMER_CLOSING();
_on_exec();
TRY_PROCESS_TIMER_CLOSING();
_timer->expires_from_now(boost::posix_time::milliseconds(_interval_ms));
_timer->async_wait(std::bind(&metric_timer::on_timer, this, std::placeholders::_1));
#undef TRY_PROCESS_TIMER_CLOSING
}
} // namespace dsn