src/shell/command_helper.h (1,918 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <getopt.h>
#include <fstream>
#include <functional>
#include <iomanip>
#include <memory>
#include <queue>
#include <thread>
#include <utility>
#include <boost/algorithm/string.hpp>
#include <fmt/color.h>
#include <fmt/ostream.h>
#include <rocksdb/db.h>
#include <rocksdb/env.h>
#include <rocksdb/sst_dump_tool.h>
#include <rocksdb/statistics.h>
#include <geo/lib/geo_client.h>
#include <pegasus/error.h>
#include <pegasus/git_commit.h>
#include <pegasus/version.h>
#include <rrdb/rrdb.code.definition.h>
#include <rrdb/rrdb_types.h>
#include "base/pegasus_key_schema.h"
#include "base/pegasus_utils.h"
#include "base/pegasus_value_schema.h"
#include "client/replication_ddl_client.h"
#include "command_executor.h"
#include "command_utils.h"
#include "common/json_helper.h"
#include "http/http_client.h"
#include "perf_counter/perf_counter_utils.h"
#include "remote_cmd/remote_command.h"
#include "task/async_calls.h"
#include "tools/mutation_log_tool.h"
#include "utils/fmt_utils.h"
#include <string_view>
#include "utils/errors.h"
#include "utils/metrics.h"
#include "utils/ports.h"
#include "utils/string_conv.h"
#include "utils/strings.h"
#include "utils/synchronize.h"
#include "utils/time_utils.h"
#define SHELL_PRINTLN_ERROR(msg, ...) \
fmt::print(stderr, \
fmt::emphasis::bold | fmt::fg(fmt::color::red), \
"ERROR: {}\n", \
fmt::format(msg, ##__VA_ARGS__))
#define SHELL_PRINT_WARNING_BASE(msg, ...) \
fmt::print(stdout, \
fmt::emphasis::bold | fmt::fg(fmt::color::yellow), \
"WARNING: {}", \
fmt::format(msg, ##__VA_ARGS__))
#define SHELL_PRINT_WARNING(msg, ...) SHELL_PRINT_WARNING_BASE(msg, ##__VA_ARGS__)
#define SHELL_PRINTLN_WARNING(msg, ...) \
SHELL_PRINT_WARNING_BASE("{}\n", fmt::format(msg, ##__VA_ARGS__))
#define SHELL_PRINT_OK_BASE(msg, ...) \
fmt::print(stdout, fmt::emphasis::bold | fmt::fg(fmt::color::green), msg, ##__VA_ARGS__)
#define SHELL_PRINT_OK(msg, ...) SHELL_PRINT_OK_BASE(msg, ##__VA_ARGS__)
#define SHELL_PRINTLN_OK(msg, ...) SHELL_PRINT_OK_BASE("{}\n", fmt::format(msg, ##__VA_ARGS__))
// Print messages to stderr and return false if `exp` is evaluated to false.
#define SHELL_PRINT_AND_RETURN_FALSE_IF_NOT(exp, ...) \
do { \
if (dsn_unlikely(!(exp))) { \
SHELL_PRINTLN_ERROR(__VA_ARGS__); \
return false; \
} \
} while (0)
#define RETURN_FALSE_IF_SAMPLE_INTERVAL_MS_INVALID() \
SHELL_PRINT_AND_RETURN_FALSE_IF_NOT(dsn::buf2uint32(optarg, sample_interval_ms), \
"parse sample_interval_ms({}) failed", \
optarg); \
SHELL_PRINT_AND_RETURN_FALSE_IF_NOT(sample_interval_ms > 0, "sample_interval_ms should be > 0")
DEFINE_TASK_CODE(LPC_SCAN_DATA, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
DEFINE_TASK_CODE(LPC_GET_METRICS, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
enum scan_data_operator
{
SCAN_COPY,
SCAN_CLEAR,
SCAN_COUNT,
SCAN_GEN_GEO,
SCAN_AND_MULTI_SET
};
USER_DEFINED_ENUM_FORMATTER(scan_data_operator)
class top_container
{
public:
struct top_heap_item
{
std::string hash_key;
std::string sort_key;
long row_size;
top_heap_item(std::string &&hash_key_, std::string &&sort_key_, long row_size_)
: hash_key(std::move(hash_key_)), sort_key(std::move(sort_key_)), row_size(row_size_)
{
}
};
struct top_heap_compare
{
bool operator()(top_heap_item i1, top_heap_item i2) { return i1.row_size < i2.row_size; }
};
typedef std::priority_queue<top_heap_item, std::vector<top_heap_item>, top_heap_compare>
top_heap;
top_container(int count) : _count(count) {}
void push(std::string &&hash_key, std::string &&sort_key, long row_size)
{
dsn::utils::auto_lock<dsn::utils::ex_lock_nr> l(_lock);
if (_heap.size() < _count) {
_heap.emplace(std::move(hash_key), std::move(sort_key), row_size);
} else {
const top_heap_item &top = _heap.top();
if (top.row_size < row_size) {
_heap.pop();
_heap.emplace(std::move(hash_key), std::move(sort_key), row_size);
}
}
}
top_heap &all() { return _heap; }
private:
int _count;
top_heap _heap;
dsn::utils::ex_lock_nr _lock;
};
enum class histogram_type
{
HASH_KEY_SIZE,
SORT_KEY_SIZE,
VALUE_SIZE,
ROW_SIZE
};
struct scan_data_context
{
scan_data_operator op;
int split_id;
int max_batch_count;
int timeout_ms;
bool no_overwrite; // if set true, then use check_and_set() instead of set()
// when inserting data to destination table for copy_data,
// to not overwrite old data if it aleady exist.
pegasus::pegasus_client::filter_type sort_key_filter_type;
std::string sort_key_filter_pattern;
pegasus::pegasus_client::filter_type value_filter_type;
std::string value_filter_pattern;
pegasus::pegasus_client::pegasus_scanner_wrapper scanner;
pegasus::pegasus_client *client;
pegasus::geo::geo_client *geoclient;
std::atomic_bool *error_occurred;
std::atomic_long split_rows;
std::atomic_long split_request_count;
std::atomic_bool split_completed;
bool stat_size;
std::shared_ptr<rocksdb::Statistics> statistics;
int top_count;
top_container top_rows;
bool count_hash_key;
std::string last_hash_key;
std::atomic_long split_hash_key_count;
long data_count;
uint32_t multi_ttl_seconds;
std::unordered_map<std::string, std::map<std::string, std::string>> multi_kvs;
dsn::utils::semaphore sema;
scan_data_context(scan_data_operator op_,
int split_id_,
int max_batch_count_,
int timeout_ms_,
pegasus::pegasus_client::pegasus_scanner_wrapper scanner_,
pegasus::pegasus_client *client_,
pegasus::geo::geo_client *geoclient_,
std::atomic_bool *error_occurred_,
int max_multi_set_concurrency = 20,
bool stat_size_ = false,
std::shared_ptr<rocksdb::Statistics> statistics_ = nullptr,
int top_count_ = 0,
bool count_hash_key_ = false)
: op(op_),
split_id(split_id_),
max_batch_count(max_batch_count_),
timeout_ms(timeout_ms_),
no_overwrite(false),
sort_key_filter_type(pegasus::pegasus_client::FT_NO_FILTER),
value_filter_type(pegasus::pegasus_client::FT_NO_FILTER),
scanner(scanner_),
client(client_),
geoclient(geoclient_),
error_occurred(error_occurred_),
split_rows(0),
split_request_count(0),
split_completed(false),
stat_size(stat_size_),
statistics(statistics_),
top_count(top_count_),
top_rows(top_count_),
count_hash_key(count_hash_key_),
split_hash_key_count(0),
data_count(0),
multi_ttl_seconds(0),
sema(max_multi_set_concurrency)
{
// max_batch_count should > 1 because scan may be terminated
// when split_request_count = 1
CHECK_GT(max_batch_count, 1);
}
void set_sort_key_filter(pegasus::pegasus_client::filter_type type, const std::string &pattern)
{
sort_key_filter_type = type;
sort_key_filter_pattern = pattern;
}
void set_value_filter(pegasus::pegasus_client::filter_type type, const std::string &pattern)
{
value_filter_type = type;
value_filter_pattern = pattern;
}
void set_no_overwrite() { no_overwrite = true; }
};
inline void update_atomic_max(std::atomic_long &max, long value)
{
while (true) {
long old = max.load();
if (value <= old || max.compare_exchange_weak(old, value)) {
break;
}
}
}
inline pegasus::pegasus_client::filter_type parse_filter_type(const std::string &name,
bool include_exact)
{
if (include_exact && name == "exact")
return pegasus::pegasus_client::FT_MATCH_EXACT;
else
return (pegasus::pegasus_client::filter_type)type_from_string(
dsn::apps::_filter_type_VALUES_TO_NAMES,
std::string("ft_match_") + name,
::dsn::apps::filter_type::FT_NO_FILTER);
}
// return true if the data is valid for the filter
inline bool validate_filter(pegasus::pegasus_client::filter_type filter_type,
const std::string &filter_pattern,
const std::string &value)
{
switch (filter_type) {
case pegasus::pegasus_client::FT_NO_FILTER:
return true;
case pegasus::pegasus_client::FT_MATCH_EXACT:
return filter_pattern == value;
case pegasus::pegasus_client::FT_MATCH_ANYWHERE:
case pegasus::pegasus_client::FT_MATCH_PREFIX:
case pegasus::pegasus_client::FT_MATCH_POSTFIX: {
if (filter_pattern.length() == 0)
return true;
if (value.length() < filter_pattern.length())
return false;
if (filter_type == pegasus::pegasus_client::FT_MATCH_ANYWHERE) {
return std::string_view(value).find(filter_pattern) != std::string_view::npos;
} else if (filter_type == pegasus::pegasus_client::FT_MATCH_PREFIX) {
return dsn::utils::mequals(
value.data(), filter_pattern.data(), filter_pattern.length());
} else { // filter_type == pegasus::pegasus_client::FT_MATCH_POSTFIX
return dsn::utils::mequals(value.data() + value.length() - filter_pattern.length(),
filter_pattern.data(),
filter_pattern.length());
}
}
default:
LOG_FATAL("unsupported filter type: {}", filter_type);
}
return false;
}
// return true if the data is valid for the filter
inline bool
validate_filter(scan_data_context *context, const std::string &sort_key, const std::string &value)
{
// for sort key, we only need to check MATCH_EXACT, because it is not supported
// on the server side, but MATCH_PREFIX is already satisified.
if (context->sort_key_filter_type == pegasus::pegasus_client::FT_MATCH_EXACT &&
sort_key.length() > context->sort_key_filter_pattern.length())
return false;
return validate_filter(context->value_filter_type, context->value_filter_pattern, value);
}
inline int compute_ttl_seconds(uint32_t expire_ts_seconds, bool &ts_expired)
{
auto epoch_now = pegasus::utils::epoch_now();
ts_expired = pegasus::check_if_ts_expired(epoch_now, expire_ts_seconds);
if (expire_ts_seconds > 0 && !ts_expired) {
return static_cast<int>(expire_ts_seconds - epoch_now);
}
return 0;
}
inline void batch_execute_multi_set(scan_data_context *context)
{
for (const auto &kv : context->multi_kvs) {
// wait for satisfied with max_multi_set_concurrency
context->sema.wait();
int multi_size = kv.second.size();
context->client->async_multi_set(
kv.first,
kv.second,
[context, multi_size](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async_multi_set set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows += multi_size;
}
context->sema.signal();
},
context->timeout_ms,
context->multi_ttl_seconds);
}
context->multi_kvs.clear();
context->data_count = 0;
}
// copy data by async_multi_set
inline void scan_multi_data_next(scan_data_context *context)
{
if (!context->split_completed.load() && !context->error_occurred->load()) {
context->scanner->async_next([context](int ret,
std::string &&hash_key,
std::string &&sort_key,
std::string &&value,
pegasus::pegasus_client::internal_info &&info,
uint32_t expire_ts_seconds,
uint32_t kv_count) {
if (ret == pegasus::PERR_OK) {
if (validate_filter(context, sort_key, value)) {
bool ts_expired = false;
int ttl_seconds = 0;
ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
if (!ts_expired) {
// empty hashkey should get hashkey by sortkey
if (hash_key == "") {
// wait for satisfied with max_multi_set_concurrency
context->sema.wait();
auto callback = [context](
int err,
pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async check and set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
}
context->sema.signal();
};
context->client->async_set(hash_key,
sort_key,
value,
std::move(callback),
context->timeout_ms,
ttl_seconds);
} else {
context->data_count++;
if (context->multi_kvs.find(hash_key) == context->multi_kvs.end()) {
context->multi_kvs.emplace(hash_key,
std::map<std::string, std::string>());
}
if (context->multi_ttl_seconds < ttl_seconds || ttl_seconds == 0) {
context->multi_ttl_seconds = ttl_seconds;
}
context->multi_kvs[hash_key].emplace(std::move(sort_key),
std::move(value));
if (context->data_count >= context->max_batch_count) {
batch_execute_multi_set(context);
}
}
}
}
scan_multi_data_next(context);
} else if (ret == pegasus::PERR_SCAN_COMPLETE) {
batch_execute_multi_set(context);
context->split_completed.store(true);
} else {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] scan next failed: %s\n",
context->split_id,
context->client->get_error_string(ret));
context->error_occurred->store(true);
}
}
});
}
}
inline void scan_data_next(scan_data_context *context)
{
while (!context->split_completed.load() && !context->error_occurred->load() &&
context->split_request_count.load() < context->max_batch_count) {
context->split_request_count++;
context->scanner->async_next([context](int ret,
std::string &&hash_key,
std::string &&sort_key,
std::string &&value,
pegasus::pegasus_client::internal_info &&info,
uint32_t expire_ts_seconds,
int32_t kv_count) {
if (ret == pegasus::PERR_OK) {
if (kv_count != -1 || validate_filter(context, sort_key, value)) {
bool ts_expired = false;
int ttl_seconds = 0;
switch (context->op) {
case SCAN_COPY:
context->split_request_count++;
ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
if (ts_expired) {
scan_data_next(context);
} else if (context->no_overwrite) {
auto callback =
[context](int err,
pegasus::pegasus_client::check_and_set_results &&results,
pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(
stderr,
"ERROR: split[%d] async check and set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
if (results.set_succeed) {
context->split_rows++;
}
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
};
pegasus::pegasus_client::check_and_set_options options;
options.set_value_ttl_seconds = ttl_seconds;
context->client->async_check_and_set(
hash_key,
sort_key,
pegasus::pegasus_client::cas_check_type::CT_VALUE_NOT_EXIST,
"",
sort_key,
value,
options,
std::move(callback),
context->timeout_ms);
} else {
auto callback =
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
};
context->client->async_set(hash_key,
sort_key,
value,
std::move(callback),
context->timeout_ms,
ttl_seconds);
}
break;
case SCAN_CLEAR:
context->split_request_count++;
context->client->async_del(
hash_key,
sort_key,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async del failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
case SCAN_COUNT:
if (kv_count != -1) {
context->split_rows += kv_count;
scan_data_next(context);
break;
}
context->split_rows++;
if (context->stat_size && context->statistics) {
long hash_key_size = hash_key.size();
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::HASH_KEY_SIZE),
hash_key_size);
long sort_key_size = sort_key.size();
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::SORT_KEY_SIZE),
sort_key_size);
long value_size = value.size();
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::VALUE_SIZE), value_size);
long row_size = hash_key_size + sort_key_size + value_size;
context->statistics->measureTime(
static_cast<uint32_t>(histogram_type::ROW_SIZE), row_size);
if (context->top_count > 0) {
context->top_rows.push(
std::move(hash_key), std::move(sort_key), row_size);
}
}
if (context->count_hash_key) {
if (hash_key != context->last_hash_key) {
context->split_hash_key_count++;
context->last_hash_key = std::move(hash_key);
}
}
scan_data_next(context);
break;
case SCAN_GEN_GEO:
context->split_request_count++;
ttl_seconds = compute_ttl_seconds(expire_ts_seconds, ts_expired);
if (ts_expired) {
scan_data_next(context);
} else {
context->geoclient->async_set(
hash_key,
sort_key,
value,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms,
ttl_seconds);
}
break;
default:
LOG_FATAL("op = {}", context->op);
break;
}
} else {
scan_data_next(context);
}
} else if (ret == pegasus::PERR_SCAN_COMPLETE) {
context->split_completed.store(true);
} else {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] scan next failed: %s\n",
context->split_id,
context->client->get_error_string(ret));
context->error_occurred->store(true);
}
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
});
if (context->count_hash_key) {
// disable parallel scan if count_hash_key == true
break;
}
}
}
struct node_desc
{
std::string desc;
dsn::host_port hp;
node_desc(const std::string &s, const dsn::host_port &n) : desc(s), hp(n) {}
};
// type: all | replica-server | meta-server
inline bool fill_nodes(shell_context *sc, const std::string &type, std::vector<node_desc> &nodes)
{
if (type == "all" || type == "meta-server") {
for (const auto &hp : sc->meta_list) {
nodes.emplace_back("meta-server", hp);
}
}
if (type == "all" || type == "replica-server") {
std::map<dsn::host_port, dsn::replication::node_status::type> rs_nodes;
::dsn::error_code err =
sc->ddl_client->list_nodes(dsn::replication::node_status::NS_ALIVE, rs_nodes);
if (err != ::dsn::ERR_OK) {
fprintf(stderr, "ERROR: list node failed: %s\n", err.to_string());
return false;
}
for (auto &kv : rs_nodes) {
nodes.emplace_back("replica-server", kv.first);
}
}
return true;
}
// Fetch the metrics according to `query_string` for each target node.
inline std::vector<dsn::http_result> get_metrics(const std::vector<node_desc> &nodes,
const std::string &query_string)
{
std::vector<dsn::http_result> results(nodes.size());
dsn::task_tracker tracker;
for (size_t i = 0; i < nodes.size(); ++i) {
(void)dsn::tasking::enqueue(
LPC_GET_METRICS, &tracker, [&nodes, &query_string, &results, i]() {
dsn::http_url url;
#define SET_RESULT_AND_RETURN_IF_URL_NOT_OK(name, expr) \
do { \
auto err = url.set_##name(expr); \
if (!err) { \
results[i] = dsn::http_result(std::move(err)); \
return; \
} \
} while (0)
SET_RESULT_AND_RETURN_IF_URL_NOT_OK(host, nodes[i].hp.host().c_str());
SET_RESULT_AND_RETURN_IF_URL_NOT_OK(port, nodes[i].hp.port());
SET_RESULT_AND_RETURN_IF_URL_NOT_OK(
path, dsn::metrics_http_service::kMetricsQueryPath.c_str());
SET_RESULT_AND_RETURN_IF_URL_NOT_OK(query, query_string.c_str());
results[i] = dsn::http_get(url);
#undef SET_RESULT_AND_RETURN_IF_URL_NOT_OK
});
}
tracker.wait_outstanding_tasks();
return results;
}
// Adapt the result returned by `get_metrics` into the structure that could be processed by
// `remote_command`.
template <typename... Args>
inline dsn::error_s process_get_metrics_result(const dsn::http_result &result,
const node_desc &node,
const char *what,
Args &&...args)
{
if (dsn_unlikely(!result.error())) {
return FMT_ERR(result.error().code(),
"ERROR: query {} metrics from node {} failed, msg={}",
fmt::format(what, std::forward<Args>(args)...),
node.hp,
result.error());
}
if (dsn_unlikely(result.status() != dsn::http_status_code::kOk)) {
return FMT_ERR(dsn::ERR_HTTP_ERROR,
"ERROR: query {} metrics from node {} failed, http_status={}, msg={}",
fmt::format(what, std::forward<Args>(args)...),
node.hp,
dsn::get_http_status_message(result.status()),
result.body());
}
return dsn::error_s::ok();
}
#define RETURN_SHELL_IF_GET_METRICS_FAILED(result, node, what, ...) \
do { \
const auto &res = process_get_metrics_result(result, node, what, ##__VA_ARGS__); \
if (dsn_unlikely(!res)) { \
fmt::println(res.description()); \
return true; \
} \
} while (0)
// Adapt the result of some parsing operations on the metrics returned by `get_metrics` into the
// structure that could be processed by `remote_command`.
template <typename... Args>
inline dsn::error_s process_parse_metrics_result(const dsn::error_s &result,
const node_desc &node,
const char *what,
Args &&...args)
{
if (dsn_unlikely(!result)) {
return FMT_ERR(result.code(),
"ERROR: {} metrics response from node {} failed, msg={}",
fmt::format(what, std::forward<Args>(args)...),
node.hp,
result);
}
return dsn::error_s::ok();
}
#define RETURN_SHELL_IF_PARSE_METRICS_FAILED(expr, node, what, ...) \
do { \
const auto &res = process_parse_metrics_result(expr, node, what, ##__VA_ARGS__); \
if (dsn_unlikely(!res)) { \
fmt::println(res.description()); \
return true; \
} \
} while (0)
using stat_var_map = std::unordered_map<std::string, double *>;
// Abstract class used to aggregate the stats based on the custom filters while iterating over
// the fetched metrics.
//
// Given the type and attributes of an entity, derived classes need to implement a custom filter
// to return the selected `stat_var_map`, if any. Calculations including addition and subtraction
// are also provided for aggregating the stats. The metric name would be a dimension for the
// aggregation.
class aggregate_stats
{
public:
aggregate_stats() = default;
virtual ~aggregate_stats() = default;
#define CALC_STAT_VARS(entities, op) \
for (const auto &entity : entities) { \
stat_var_map *stat_vars = nullptr; \
RETURN_NOT_OK(get_stat_vars(entity.type, entity.attributes, &stat_vars)); \
\
if (stat_vars == nullptr || stat_vars->empty()) { \
continue; \
} \
\
for (const auto &m : entity.metrics) { \
auto iter = stat_vars->find(m.name); \
if (iter != stat_vars->end()) { \
*iter->second op m.value; \
} \
} \
} \
return dsn::error_s::ok()
// Following interfaces provide calculations over the fetched metrics. They would perform
// each calculation on each metric whose name was found in `stat_var_map` returned by
// `get_stat_vars`.
// Assign the matched metric value directly to the selected member of `stat_var_map` without
// extra calculation.
dsn::error_s assign(const std::vector<dsn::metric_entity_brief_value_snapshot> &entities)
{
CALC_STAT_VARS(entities, =);
}
// Add and assign the matched metric value to the selected member of `stat_var_map`.
dsn::error_s add_assign(const std::vector<dsn::metric_entity_brief_value_snapshot> &entities)
{
CALC_STAT_VARS(entities, +=);
}
// Subtract and assign the matched metric value to the selected member of `stat_var_map`.
dsn::error_s sub_assign(const std::vector<dsn::metric_entity_brief_value_snapshot> &entities)
{
CALC_STAT_VARS(entities, -=);
}
void calc_rates(uint64_t timestamp_ns_start, uint64_t timestamp_ns_end)
{
calc_rates(dsn::calc_metric_sample_duration_s(timestamp_ns_start, timestamp_ns_end));
}
#undef CALC_STAT_VARS
protected:
// Given the type and attributes of an entity, decide which `stat_var_map` is selected, if any.
// Otherwise, `*stat_vars` would be set to nullptr.
virtual dsn::error_s get_stat_vars(const std::string &entity_type,
const dsn::metric_entity::attr_map &entity_attrs,
stat_var_map **stat_vars) = 0;
// Implement self-defined calculation for rates, such as QPS.
virtual void calc_rates(double duration_s) = 0;
};
// Support multiple kinds of aggregations over the fetched metrics, such as sums, increases and
// rates. Users could choose to create aggregations as needed.
class aggregate_stats_calcs
{
public:
aggregate_stats_calcs() noexcept = default;
~aggregate_stats_calcs() = default;
aggregate_stats_calcs(aggregate_stats_calcs &&) noexcept = default;
aggregate_stats_calcs &operator=(aggregate_stats_calcs &&) noexcept = default;
#define DEF_CALC_CREATOR(name) \
template <typename T, typename... Args> \
void create_##name(Args &&...args) \
{ \
_##name = std::make_unique<T>(std::forward<Args>(args)...); \
}
// Create the aggregations as needed.
DEF_CALC_CREATOR(assignments)
DEF_CALC_CREATOR(sums)
DEF_CALC_CREATOR(increases)
DEF_CALC_CREATOR(rates)
#undef DEF_CALC_CREATOR
#define CALC_ASSIGNMENT_STATS(entities) \
do { \
if (_assignments) { \
RETURN_NOT_OK(_assignments->assign(entities)); \
} \
} while (0)
#define CALC_ACCUM_STATS(entities) \
do { \
if (_sums) { \
RETURN_NOT_OK(_sums->add_assign(entities)); \
} \
} while (0)
// Perform the chosen aggregations (both assignment and accum) on the fetched metrics.
dsn::error_s aggregate_metrics(const std::string &json_string)
{
DESERIALIZE_METRIC_QUERY_BRIEF_SNAPSHOT(value, json_string, query_snapshot);
return aggregate_metrics(query_snapshot);
}
dsn::error_s aggregate_metrics(const dsn::metric_query_brief_value_snapshot &query_snapshot)
{
CALC_ASSIGNMENT_STATS(query_snapshot.entities);
CALC_ACCUM_STATS(query_snapshot.entities);
return dsn::error_s::ok();
}
// Perform the chosen aggregations (assignement, accum, delta and rate) on the fetched metrics.
dsn::error_s aggregate_metrics(const std::string &json_string_start,
const std::string &json_string_end)
{
DESERIALIZE_METRIC_QUERY_BRIEF_2_SAMPLES(
json_string_start, json_string_end, query_snapshot_start, query_snapshot_end);
return aggregate_metrics(query_snapshot_start, query_snapshot_end);
}
dsn::error_s
aggregate_metrics(const dsn::metric_query_brief_value_snapshot &query_snapshot_start,
const dsn::metric_query_brief_value_snapshot &query_snapshot_end)
{
// Apply ending sample to the assignment and accum aggregations.
CALC_ASSIGNMENT_STATS(query_snapshot_end.entities);
CALC_ACCUM_STATS(query_snapshot_end.entities);
const std::array deltas_list = {&_increases, &_rates};
for (const auto stats : deltas_list) {
if (!(*stats)) {
continue;
}
RETURN_NOT_OK((*stats)->add_assign(query_snapshot_end.entities));
RETURN_NOT_OK((*stats)->sub_assign(query_snapshot_start.entities));
}
if (_rates) {
_rates->calc_rates(query_snapshot_start.timestamp_ns, query_snapshot_end.timestamp_ns);
}
return dsn::error_s::ok();
}
#undef CALC_ACCUM_STATS
#undef CALC_ASSIGNMENT_STATS
private:
DISALLOW_COPY_AND_ASSIGN(aggregate_stats_calcs);
std::unique_ptr<aggregate_stats> _assignments;
std::unique_ptr<aggregate_stats> _sums;
std::unique_ptr<aggregate_stats> _increases;
std::unique_ptr<aggregate_stats> _rates;
};
// Convenient macros for `get_stat_vars` to set `*stat_vars` to nullptr and return under some
// circumstances.
#define RETURN_NULL_STAT_VARS_IF(expr) \
do { \
if (expr) { \
*stat_vars = nullptr; \
return dsn::error_s::ok(); \
} \
} while (0)
#define RETURN_NULL_STAT_VARS_IF_NOT_OK(expr) \
do { \
const auto &err = (expr); \
if (dsn_unlikely(!err)) { \
*stat_vars = nullptr; \
return err; \
} \
} while (0)
// A helper macro to parse command argument, the result is filled in a string vector variable named
// 'container'.
#define PARSE_STRS(container) \
do { \
const auto param = cmd(param_index++).str(); \
::dsn::utils::split_args(param.c_str(), container, ','); \
if (container.empty()) { \
SHELL_PRINTLN_ERROR( \
"invalid command, '{}' should be in the form of 'val1,val2,val3' and " \
"should not be empty", \
param); \
return false; \
} \
std::set<std::string> str_set(container.begin(), container.end()); \
if (str_set.size() != container.size()) { \
SHELL_PRINTLN_ERROR("invalid command, '{}' has duplicate values", param); \
return false; \
} \
} while (false)
#define PARSE_OPT_STRS(container, def_val, ...) \
do { \
const auto param = cmd(__VA_ARGS__, (def_val)).str(); \
::dsn::utils::split_args(param.c_str(), container, ','); \
} while (false)
// A helper macro to parse command argument, the result is filled in an uint32_t variable named
// 'value'.
#define PARSE_UINT(value) \
do { \
const auto param = cmd(param_index++).str(); \
if (!::dsn::buf2uint32(param, value)) { \
SHELL_PRINTLN_ERROR("invalid command, '{}' should be an unsigned integer", param); \
return false; \
} \
} while (false)
// A helper macro to parse an optional command argument, the result is filled in an uint32_t
// variable 'value'.
//
// Variable arguments are `name` or `init_list` of argh::parser::operator(). See argh::parser
// for details.
#define PARSE_OPT_UINT(value, def_val, ...) \
do { \
const auto param = cmd(__VA_ARGS__, (def_val)).str(); \
if (!::dsn::buf2uint32(param, value)) { \
SHELL_PRINTLN_ERROR("invalid command, '{}' should be an unsigned integer", param); \
return false; \
} \
} while (false)
// A helper macro to parse command argument, the result is filled in an uint32_t vector variable
// 'container'.
#define PARSE_UINTS(container) \
do { \
std::vector<std::string> strs; \
PARSE_STRS(strs); \
container.clear(); \
for (const auto &str : strs) { \
uint32_t v; \
if (!::dsn::buf2uint32(str, v)) { \
SHELL_PRINTLN_ERROR("invalid command, '{}' should be an unsigned integer", str); \
return false; \
} \
container.insert(v); \
} \
} while (false)
// A helper macro to parse an optional command argument, the result is filled in an int32_t
// variable 'value'.
//
// Variable arguments are `name` or `init_list` of argh::parser::operator(). See argh::parser
// for details.
#define PARSE_OPT_INT(value, def_val, ...) \
do { \
const auto param = cmd(__VA_ARGS__, (def_val)).str(); \
if (!::dsn::buf2int32(param, value)) { \
SHELL_PRINTLN_ERROR("invalid command, '{}' should be a signed integer", param); \
return false; \
} \
} while (false)
// Parse enum value from the parameters of command line.
#define PARSE_OPT_ENUM(enum_val, invalid_val, ...) \
do { \
const std::string __str(cmd(__VA_ARGS__, "").str()); \
if (!__str.empty()) { \
const auto &__val = enum_from_string(__str.c_str(), invalid_val); \
if (__val == invalid_val) { \
SHELL_PRINTLN_ERROR("invalid enum: '{}'", __str); \
return false; \
} \
enum_val = __val; \
} \
} while (false)
// Parse the provided parameter into the map by the specified delimiters.
#define PARSE_OPT_KV_MAP(map, item_splitter, kv_splitter, ...) \
do { \
const std::string __str(cmd(__VA_ARGS__, "").str()); \
if (!::dsn::utils::parse_kv_map(__str.c_str(), map, item_splitter, kv_splitter)) { \
SHELL_PRINTLN_ERROR("invalid kvs: '{}'", __str); \
return false; \
} \
} while (false)
#define RETURN_FALSE_IF_NOT(expr, ...) \
do { \
if (dsn_unlikely(!(expr))) { \
fmt::print(stderr, "{}\n", fmt::format(__VA_ARGS__)); \
return false; \
} \
} while (false)
#define RETURN_FALSE_IF_NON_OK(expr, ...) \
do { \
const auto _ec = (expr); \
if (dsn_unlikely(_ec != dsn::ERR_OK)) { \
fmt::print(stderr, "{}: {}\n", _ec, fmt::format(__VA_ARGS__)); \
return false; \
} \
} while (false)
#define RETURN_FALSE_IF_NON_RDB_OK(expr, ...) \
do { \
const auto _s = (expr); \
if (dsn_unlikely(!_s.ok())) { \
fmt::print(stderr, "{}: {}\n", _s.ToString(), fmt::format(__VA_ARGS__)); \
return false; \
} \
} while (false)
// Total aggregation over the fetched metrics. The only dimension is the metric name, which
// is also the key of `stat_var_map`.
class total_aggregate_stats : public aggregate_stats
{
public:
total_aggregate_stats(const std::string &entity_type, stat_var_map &&stat_vars)
: _my_entity_type(entity_type), _my_stat_vars(std::move(stat_vars))
{
}
~total_aggregate_stats() = default;
protected:
dsn::error_s get_stat_vars(const std::string &entity_type,
const dsn::metric_entity::attr_map &entity_attrs,
stat_var_map **stat_vars) override
{
*stat_vars = (entity_type == _my_entity_type) ? &_my_stat_vars : nullptr;
return dsn::error_s::ok();
}
void calc_rates(double duration_s) override
{
for (auto &stat_var : _my_stat_vars) {
*stat_var.second /= duration_s;
}
}
private:
DISALLOW_COPY_AND_ASSIGN(total_aggregate_stats);
const std::string _my_entity_type;
stat_var_map _my_stat_vars;
};
using table_stat_map = std::unordered_map<int32_t, stat_var_map>;
// Table-level aggregation over the fetched metrics. There are 2 dimensions for the aggregation:
// * the table id, from the attributes of the metric entity;
// * the metric name, which is also the key of `stat_var_map`.
//
// It should be noted that `partitions` argument is also provided as the filter. The reason is
// that partition-level metrics from a node should be excluded under some circumstances. For
// example, the partition-level QPS we care about must be from the primary replica. The fetched
// metrics would be ignored once they are from a node that is not the primary replica of the
// target partition. However, empty `partitions` means there is no restriction.
class table_aggregate_stats : public aggregate_stats
{
public:
table_aggregate_stats(const std::string &entity_type,
table_stat_map &&table_stats,
const std::unordered_set<dsn::gpid> &partitions)
: _my_entity_type(entity_type),
_my_table_stats(std::move(table_stats)),
_my_partitions(std::move(partitions))
{
}
~table_aggregate_stats() override = default;
protected:
dsn::error_s get_stat_vars(const std::string &entity_type,
const dsn::metric_entity::attr_map &entity_attrs,
stat_var_map **stat_vars) override
{
RETURN_NULL_STAT_VARS_IF(entity_type != _my_entity_type);
int32_t metric_table_id;
RETURN_NULL_STAT_VARS_IF_NOT_OK(dsn::parse_metric_table_id(entity_attrs, metric_table_id));
// Empty `_my_partitions` means there is no restriction; otherwise, the partition id
// should be found in `_my_partitions`.
if (!_my_partitions.empty()) {
int32_t metric_partition_id;
RETURN_NULL_STAT_VARS_IF_NOT_OK(
dsn::parse_metric_partition_id(entity_attrs, metric_partition_id));
dsn::gpid metric_pid(metric_table_id, metric_partition_id);
RETURN_NULL_STAT_VARS_IF(_my_partitions.find(metric_pid) == _my_partitions.end());
}
const auto &table_stat = _my_table_stats.find(metric_table_id);
CHECK_TRUE(table_stat != _my_table_stats.end());
*stat_vars = &table_stat->second;
return dsn::error_s::ok();
}
void calc_rates(double duration_s) override
{
for (auto &table_stats : _my_table_stats) {
for (auto &stat_var : table_stats.second) {
*stat_var.second /= duration_s;
}
}
}
private:
DISALLOW_COPY_AND_ASSIGN(table_aggregate_stats);
const std::string _my_entity_type;
table_stat_map _my_table_stats;
std::unordered_set<dsn::gpid> _my_partitions;
};
using partition_stat_map = std::unordered_map<dsn::gpid, stat_var_map>;
// Partition-level aggregation over the fetched metrics. There are 3 dimensions for the aggregation:
// * the table id, from the attributes of the metric entity;
// * the partition id, also from the attributes of the metric entity;
// * the metric name, which is also the key of `stat_var_map`.
class partition_aggregate_stats : public aggregate_stats
{
public:
partition_aggregate_stats(const std::string &entity_type, partition_stat_map &&partition_stats)
: _my_entity_type(entity_type), _my_partition_stats(std::move(partition_stats))
{
}
~partition_aggregate_stats() override = default;
protected:
dsn::error_s get_stat_vars(const std::string &entity_type,
const dsn::metric_entity::attr_map &entity_attrs,
stat_var_map **stat_vars) override
{
RETURN_NULL_STAT_VARS_IF(entity_type != _my_entity_type);
int32_t metric_table_id;
RETURN_NULL_STAT_VARS_IF_NOT_OK(dsn::parse_metric_table_id(entity_attrs, metric_table_id));
int32_t metric_partition_id;
RETURN_NULL_STAT_VARS_IF_NOT_OK(
dsn::parse_metric_partition_id(entity_attrs, metric_partition_id));
dsn::gpid metric_pid(metric_table_id, metric_partition_id);
const auto &partition_stat = _my_partition_stats.find(metric_pid);
RETURN_NULL_STAT_VARS_IF(partition_stat == _my_partition_stats.end());
*stat_vars = &partition_stat->second;
return dsn::error_s::ok();
}
void calc_rates(double duration_s) override
{
for (auto &partition_stats : _my_partition_stats) {
for (auto &stat_var : partition_stats.second) {
*stat_var.second /= duration_s;
}
}
}
private:
DISALLOW_COPY_AND_ASSIGN(partition_aggregate_stats);
const std::string _my_entity_type;
partition_stat_map _my_partition_stats;
};
inline std::vector<std::pair<bool, std::string>>
call_remote_command(shell_context *sc,
const std::vector<node_desc> &nodes,
const std::string &cmd,
const std::vector<std::string> &arguments)
{
std::vector<std::pair<bool, std::string>> results;
std::vector<dsn::task_ptr> tasks;
tasks.resize(nodes.size());
results.resize(nodes.size());
for (int i = 0; i < nodes.size(); ++i) {
auto callback = [&results, i](::dsn::error_code err, const std::string &resp) {
if (err == ::dsn::ERR_OK) {
results[i].first = true;
results[i].second = resp;
} else {
results[i].first = false;
results[i].second = err.to_string();
}
};
tasks[i] = dsn::dist::cmd::async_call_remote(
dsn::dns_resolver::instance().resolve_address(nodes[i].hp),
cmd,
arguments,
callback,
std::chrono::milliseconds(5000));
}
for (int i = 0; i < nodes.size(); ++i) {
tasks[i]->wait();
}
return results;
}
inline bool parse_app_pegasus_perf_counter_name(const std::string &name,
int32_t &app_id,
int32_t &partition_index,
std::string &counter_name)
{
std::string::size_type find = name.find_last_of('@');
if (find == std::string::npos)
return false;
int n = sscanf(name.c_str() + find + 1, "%d.%d", &app_id, &partition_index);
if (n != 2)
return false;
std::string::size_type find2 = name.find_last_of('*');
if (find2 == std::string::npos)
return false;
counter_name = name.substr(find2 + 1, find - find2 - 1);
return true;
}
inline bool parse_app_perf_counter_name(const std::string &name,
std::string &app_name,
std::string &counter_name)
{
/**
* name format:
* 1.{node}*{section}*{counter_name}@{app_name}.{percent_line}
* 2.{node}*{section}*{counter_name}@{app_name}
*/
std::string::size_type find = name.find_last_of('@');
if (find == std::string::npos)
return false;
std::string::size_type find2 = name.find_last_of('.');
if (find2 == std::string::npos) {
app_name = name.substr(find + 1);
} else {
app_name = name.substr(find + 1, find2 - find - 1);
}
std::string::size_type find3 = name.find_last_of('*');
if (find3 == std::string::npos)
return false;
counter_name = name.substr(find3 + 1, find - find3 - 1);
return true;
}
struct row_data
{
row_data() = default;
explicit row_data(const std::string &name) : row_name(name) {}
double get_total_read_qps() const { return get_qps + multi_get_qps + batch_get_qps + scan_qps; }
double get_total_write_qps() const
{
return put_qps + remove_qps + multi_put_qps + multi_remove_qps + check_and_set_qps +
check_and_mutate_qps + incr_qps + duplicate_qps;
}
double get_total_read_bytes() const
{
return get_bytes + multi_get_bytes + batch_get_bytes + scan_bytes;
}
double get_total_write_bytes() const
{
return put_bytes + multi_put_bytes + check_and_set_bytes + check_and_mutate_bytes;
}
void aggregate(const row_data &row)
{
get_qps += row.get_qps;
multi_get_qps += row.multi_get_qps;
batch_get_qps += row.batch_get_qps;
put_qps += row.put_qps;
multi_put_qps += row.multi_put_qps;
remove_qps += row.remove_qps;
multi_remove_qps += row.multi_remove_qps;
incr_qps += row.incr_qps;
check_and_set_qps += row.check_and_set_qps;
check_and_mutate_qps += row.check_and_mutate_qps;
scan_qps += row.scan_qps;
duplicate_qps += row.duplicate_qps;
dup_shipped_ops += row.dup_shipped_ops;
dup_failed_shipping_ops += row.dup_failed_shipping_ops;
dup_recent_mutation_loss_count += row.dup_recent_mutation_loss_count;
recent_read_cu += row.recent_read_cu;
recent_write_cu += row.recent_write_cu;
recent_expire_count += row.recent_expire_count;
recent_filter_count += row.recent_filter_count;
recent_abnormal_count += row.recent_abnormal_count;
recent_write_throttling_delay_count += row.recent_write_throttling_delay_count;
recent_write_throttling_reject_count += row.recent_write_throttling_reject_count;
recent_read_throttling_delay_count += row.recent_read_throttling_delay_count;
recent_read_throttling_reject_count += row.recent_read_throttling_reject_count;
recent_backup_request_throttling_delay_count +=
row.recent_backup_request_throttling_delay_count;
recent_backup_request_throttling_reject_count +=
row.recent_backup_request_throttling_reject_count;
recent_write_splitting_reject_count += row.recent_write_splitting_reject_count;
recent_read_splitting_reject_count += row.recent_read_splitting_reject_count;
recent_write_bulk_load_ingestion_reject_count +=
row.recent_write_bulk_load_ingestion_reject_count;
storage_mb += row.storage_mb;
storage_count += row.storage_count;
rdb_block_cache_hit_count += row.rdb_block_cache_hit_count;
rdb_block_cache_total_count += row.rdb_block_cache_total_count;
rdb_index_and_filter_blocks_mem_usage += row.rdb_index_and_filter_blocks_mem_usage;
rdb_memtable_mem_usage += row.rdb_memtable_mem_usage;
rdb_estimate_num_keys += row.rdb_estimate_num_keys;
rdb_bf_seek_negatives += row.rdb_bf_seek_negatives;
rdb_bf_seek_total += row.rdb_bf_seek_total;
rdb_bf_point_positive_true += row.rdb_bf_point_positive_true;
rdb_bf_point_positive_total += row.rdb_bf_point_positive_total;
rdb_bf_point_negatives += row.rdb_bf_point_negatives;
backup_request_qps += row.backup_request_qps;
backup_request_bytes += row.backup_request_bytes;
get_bytes += row.get_bytes;
multi_get_bytes += row.multi_get_bytes;
batch_get_bytes += row.batch_get_bytes;
scan_bytes += row.scan_bytes;
put_bytes += row.put_bytes;
multi_put_bytes += row.multi_put_bytes;
check_and_set_bytes += row.check_and_set_bytes;
check_and_mutate_bytes += row.check_and_mutate_bytes;
recent_rdb_compaction_input_bytes += row.recent_rdb_compaction_input_bytes;
recent_rdb_compaction_output_bytes += row.recent_rdb_compaction_output_bytes;
rdb_read_l2andup_hit_count += row.rdb_read_l2andup_hit_count;
rdb_read_l1_hit_count += row.rdb_read_l1_hit_count;
rdb_read_l0_hit_count += row.rdb_read_l0_hit_count;
rdb_read_memtable_hit_count += row.rdb_read_memtable_hit_count;
rdb_write_amplification += row.rdb_write_amplification;
rdb_read_amplification += row.rdb_read_amplification;
}
std::string row_name;
int32_t app_id = 0;
int32_t partition_count = 0;
double get_qps = 0;
double multi_get_qps = 0;
double batch_get_qps = 0;
double put_qps = 0;
double multi_put_qps = 0;
double remove_qps = 0;
double multi_remove_qps = 0;
double incr_qps = 0;
double check_and_set_qps = 0;
double check_and_mutate_qps = 0;
double scan_qps = 0;
double duplicate_qps = 0;
double dup_shipped_ops = 0;
double dup_failed_shipping_ops = 0;
double dup_recent_mutation_loss_count = 0;
double recent_read_cu = 0;
double recent_write_cu = 0;
double recent_expire_count = 0;
double recent_filter_count = 0;
double recent_abnormal_count = 0;
double recent_write_throttling_delay_count = 0;
double recent_write_throttling_reject_count = 0;
double recent_read_throttling_delay_count = 0;
double recent_read_throttling_reject_count = 0;
double recent_backup_request_throttling_delay_count = 0;
double recent_backup_request_throttling_reject_count = 0;
double recent_write_splitting_reject_count = 0;
double recent_read_splitting_reject_count = 0;
double recent_write_bulk_load_ingestion_reject_count = 0;
double storage_mb = 0;
double storage_count = 0;
double rdb_block_cache_hit_count = 0;
double rdb_block_cache_total_count = 0;
double rdb_index_and_filter_blocks_mem_usage = 0;
double rdb_memtable_mem_usage = 0;
double rdb_estimate_num_keys = 0;
double rdb_bf_seek_negatives = 0;
double rdb_bf_seek_total = 0;
double rdb_bf_point_positive_true = 0;
double rdb_bf_point_positive_total = 0;
double rdb_bf_point_negatives = 0;
double backup_request_qps = 0;
double backup_request_bytes = 0;
double get_bytes = 0;
double multi_get_bytes = 0;
double batch_get_bytes = 0;
double scan_bytes = 0;
double put_bytes = 0;
double multi_put_bytes = 0;
double check_and_set_bytes = 0;
double check_and_mutate_bytes = 0;
double recent_rdb_compaction_input_bytes = 0;
double recent_rdb_compaction_output_bytes = 0;
double rdb_read_l2andup_hit_count = 0;
double rdb_read_l1_hit_count = 0;
double rdb_read_l0_hit_count = 0;
double rdb_read_memtable_hit_count = 0;
double rdb_write_amplification = 0;
double rdb_read_amplification = 0;
};
// TODO(wangdan): there are still dozens of fields to be added to the following functions.
inline dsn::metric_filters row_data_filters()
{
dsn::metric_filters filters;
filters.with_metric_fields = {dsn::kMetricNameField, dsn::kMetricSingleValueField};
filters.entity_types = {"replica"};
filters.entity_metrics = {
"get_requests",
"multi_get_requests",
"batch_get_requests",
"put_requests",
"multi_put_requests",
"remove_requests",
"multi_remove_requests",
"incr_requests",
"check_and_set_requests",
"check_and_mutate_requests",
"scan_requests",
"dup_requests",
"dup_shipped_successful_requests",
"dup_shipped_failed_requests",
"dup_recent_lost_mutations",
"read_capacity_units",
"write_capacity_units",
"read_expired_values",
"read_filtered_values",
"abnormal_read_requests",
"throttling_delayed_write_requests",
"throttling_rejected_write_requests",
"throttling_delayed_read_requests",
"throttling_rejected_read_requests",
"throttling_delayed_backup_requests",
"throttling_rejected_backup_requests",
"splitting_rejected_write_requests",
"splitting_rejected_read_requests",
"bulk_load_ingestion_rejected_write_requests",
"rdb_total_sst_size_mb",
"rdb_total_sst_files",
"rdb_block_cache_hit_count",
"rdb_block_cache_total_count",
"rdb_index_and_filter_blocks_mem_usage_bytes",
"rdb_memtable_mem_usage_bytes",
"rdb_estimated_keys",
"rdb_bloom_filter_seek_negatives",
"rdb_bloom_filter_seek_total",
"rdb_bloom_filter_point_lookup_true_positives",
"rdb_bloom_filter_point_lookup_positives",
"rdb_bloom_filter_point_lookup_negatives",
"backup_requests",
"backup_request_bytes",
"get_bytes",
"multi_get_bytes",
"batch_get_bytes",
"scan_bytes",
"put_bytes",
"multi_put_bytes",
"check_and_set_bytes",
"check_and_mutate_bytes",
"rdb_compaction_input_bytes",
"rdb_compaction_output_bytes",
"rdb_l2_and_up_hit_count",
"rdb_l1_hit_count",
"rdb_l0_hit_count",
"rdb_memtable_hit_count",
"rdb_write_amplification",
"rdb_read_amplification",
};
return filters;
}
inline dsn::metric_filters row_data_filters(int32_t table_id)
{
auto filters = row_data_filters();
filters.entity_attrs = {"table_id", std::to_string(table_id)};
return filters;
}
#define BIND_ROW(metric_name, member) \
{ \
#metric_name, &row.member \
}
inline stat_var_map create_sums(row_data &row)
{
return stat_var_map({
BIND_ROW(dup_recent_lost_mutations, dup_recent_mutation_loss_count),
BIND_ROW(rdb_total_sst_size_mb, storage_mb),
BIND_ROW(rdb_total_sst_files, storage_count),
BIND_ROW(rdb_block_cache_hit_count, rdb_block_cache_hit_count),
BIND_ROW(rdb_block_cache_total_count, rdb_block_cache_total_count),
BIND_ROW(rdb_index_and_filter_blocks_mem_usage_bytes,
rdb_index_and_filter_blocks_mem_usage),
BIND_ROW(rdb_memtable_mem_usage_bytes, rdb_memtable_mem_usage),
BIND_ROW(rdb_estimated_keys, rdb_estimate_num_keys),
BIND_ROW(rdb_bloom_filter_seek_negatives, rdb_bf_seek_negatives),
BIND_ROW(rdb_bloom_filter_seek_total, rdb_bf_seek_total),
BIND_ROW(rdb_bloom_filter_point_lookup_true_positives, rdb_bf_point_positive_true),
BIND_ROW(rdb_bloom_filter_point_lookup_positives, rdb_bf_point_positive_total),
BIND_ROW(rdb_bloom_filter_point_lookup_negatives, rdb_bf_point_negatives),
BIND_ROW(rdb_l2_and_up_hit_count, rdb_read_l2andup_hit_count),
BIND_ROW(rdb_l1_hit_count, rdb_read_l1_hit_count),
BIND_ROW(rdb_l0_hit_count, rdb_read_l0_hit_count),
BIND_ROW(rdb_memtable_hit_count, rdb_read_memtable_hit_count),
BIND_ROW(rdb_write_amplification, rdb_write_amplification),
BIND_ROW(rdb_read_amplification, rdb_read_amplification),
});
}
inline stat_var_map create_increases(row_data &row)
{
return stat_var_map({
BIND_ROW(read_capacity_units, recent_read_cu),
BIND_ROW(write_capacity_units, recent_write_cu),
BIND_ROW(read_expired_values, recent_expire_count),
BIND_ROW(read_filtered_values, recent_filter_count),
BIND_ROW(abnormal_read_requests, recent_abnormal_count),
BIND_ROW(throttling_delayed_write_requests, recent_write_throttling_delay_count),
BIND_ROW(throttling_rejected_write_requests, recent_write_throttling_reject_count),
BIND_ROW(throttling_delayed_read_requests, recent_read_throttling_delay_count),
BIND_ROW(throttling_rejected_read_requests, recent_read_throttling_reject_count),
BIND_ROW(throttling_delayed_backup_requests, recent_backup_request_throttling_delay_count),
BIND_ROW(throttling_rejected_backup_requests,
recent_backup_request_throttling_reject_count),
BIND_ROW(splitting_rejected_write_requests, recent_write_splitting_reject_count),
BIND_ROW(splitting_rejected_read_requests, recent_read_splitting_reject_count),
BIND_ROW(bulk_load_ingestion_rejected_write_requests,
recent_write_bulk_load_ingestion_reject_count),
BIND_ROW(rdb_compaction_input_bytes, recent_rdb_compaction_input_bytes),
BIND_ROW(rdb_compaction_output_bytes, recent_rdb_compaction_output_bytes),
});
}
inline stat_var_map create_rates(row_data &row)
{
return stat_var_map({
BIND_ROW(get_requests, get_qps),
BIND_ROW(multi_get_requests, multi_get_qps),
BIND_ROW(batch_get_requests, batch_get_qps),
BIND_ROW(put_requests, put_qps),
BIND_ROW(multi_put_requests, multi_put_qps),
BIND_ROW(remove_requests, remove_qps),
BIND_ROW(multi_remove_requests, multi_remove_qps),
BIND_ROW(incr_requests, incr_qps),
BIND_ROW(check_and_set_requests, check_and_set_qps),
BIND_ROW(check_and_mutate_requests, check_and_mutate_qps),
BIND_ROW(scan_requests, scan_qps),
BIND_ROW(dup_requests, duplicate_qps),
BIND_ROW(dup_shipped_successful_requests, dup_shipped_ops),
BIND_ROW(dup_shipped_failed_requests, dup_failed_shipping_ops),
BIND_ROW(backup_requests, backup_request_qps),
BIND_ROW(backup_request_bytes, backup_request_bytes),
BIND_ROW(get_bytes, get_bytes),
BIND_ROW(multi_get_bytes, multi_get_bytes),
BIND_ROW(batch_get_bytes, batch_get_bytes),
BIND_ROW(scan_bytes, scan_bytes),
BIND_ROW(put_bytes, put_bytes),
BIND_ROW(multi_put_bytes, multi_put_bytes),
BIND_ROW(check_and_set_bytes, check_and_set_bytes),
BIND_ROW(check_and_mutate_bytes, check_and_mutate_bytes),
});
}
#undef BIND_ROW
// Given all tables, create all aggregations needed for the table-level stats. All selected
// partitions should have their primary replicas on this node.
inline std::unique_ptr<aggregate_stats_calcs> create_table_aggregate_stats_calcs(
const std::map<int32_t, std::vector<dsn::partition_configuration>> &pcs_by_appid,
const dsn::host_port &node,
const std::string &entity_type,
std::vector<row_data> &rows)
{
table_stat_map sums;
table_stat_map increases;
table_stat_map rates;
std::unordered_set<dsn::gpid> partitions;
for (auto &row : rows) {
const std::vector<std::pair<table_stat_map *, std::function<stat_var_map(row_data &)>>>
processors = {
{&sums, create_sums},
{&increases, create_increases},
{&rates, create_rates},
};
for (auto &processor : processors) {
// Put both dimensions of table id and metric name into filters for each kind of
// aggregation.
processor.first->emplace(row.app_id, processor.second(row));
}
const auto &iter = pcs_by_appid.find(row.app_id);
CHECK(iter != pcs_by_appid.end(),
"table could not be found in pcs_by_appid: table_id={}",
row.app_id);
for (const auto &pc : iter->second) {
if (pc.hp_primary != node) {
// Ignore once the replica of the metrics is not the primary of the partition.
continue;
}
partitions.insert(pc.pid);
}
}
auto calcs = std::make_unique<aggregate_stats_calcs>();
calcs->create_sums<table_aggregate_stats>(entity_type, std::move(sums), partitions);
calcs->create_increases<table_aggregate_stats>(entity_type, std::move(increases), partitions);
calcs->create_rates<table_aggregate_stats>(entity_type, std::move(rates), partitions);
return calcs;
}
// Given a table and all of its partitions, create all aggregations needed for the partition-level
// stats. All selected partitions should have their primary replicas on this node.
inline std::unique_ptr<aggregate_stats_calcs>
create_partition_aggregate_stats_calcs(const int32_t table_id,
const std::vector<dsn::partition_configuration> &pcs,
const dsn::host_port &node,
const std::string &entity_type,
std::vector<row_data> &rows)
{
CHECK_EQ(rows.size(), pcs.size());
partition_stat_map sums;
partition_stat_map increases;
partition_stat_map rates;
for (size_t i = 0; i < rows.size(); ++i) {
if (pcs[i].hp_primary != node) {
// Ignore once the replica of the metrics is not the primary of the partition.
continue;
}
const std::vector<std::pair<partition_stat_map *, std::function<stat_var_map(row_data &)>>>
processors = {
{&sums, create_sums},
{&increases, create_increases},
{&rates, create_rates},
};
for (auto &processor : processors) {
// Put all dimensions of table id, partition_id, and metric name into filters for
// each kind of aggregation.
processor.first->emplace(dsn::gpid(table_id, i), processor.second(rows[i]));
}
}
auto calcs = std::make_unique<aggregate_stats_calcs>();
calcs->create_sums<partition_aggregate_stats>(entity_type, std::move(sums));
calcs->create_increases<partition_aggregate_stats>(entity_type, std::move(increases));
calcs->create_rates<partition_aggregate_stats>(entity_type, std::move(rates));
return calcs;
}
inline bool
update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name, double value)
{
if (counter_name == "get_qps")
row.get_qps += value;
else if (counter_name == "multi_get_qps")
row.multi_get_qps += value;
else if (counter_name == "batch_get_qps")
row.batch_get_qps += value;
else if (counter_name == "put_qps")
row.put_qps += value;
else if (counter_name == "multi_put_qps")
row.multi_put_qps += value;
else if (counter_name == "remove_qps")
row.remove_qps += value;
else if (counter_name == "multi_remove_qps")
row.multi_remove_qps += value;
else if (counter_name == "incr_qps")
row.incr_qps += value;
else if (counter_name == "check_and_set_qps")
row.check_and_set_qps += value;
else if (counter_name == "check_and_mutate_qps")
row.check_and_mutate_qps += value;
else if (counter_name == "scan_qps")
row.scan_qps += value;
else if (counter_name == "duplicate_qps")
row.duplicate_qps += value;
else if (counter_name == "dup_shipped_ops")
row.dup_shipped_ops += value;
else if (counter_name == "dup_failed_shipping_ops")
row.dup_failed_shipping_ops += value;
else if (counter_name == "dup_recent_mutation_loss_count")
row.dup_recent_mutation_loss_count += value;
else if (counter_name == "recent.read.cu")
row.recent_read_cu += value;
else if (counter_name == "recent.write.cu")
row.recent_write_cu += value;
else if (counter_name == "recent.expire.count")
row.recent_expire_count += value;
else if (counter_name == "recent.filter.count")
row.recent_filter_count += value;
else if (counter_name == "recent.abnormal.count")
row.recent_abnormal_count += value;
else if (counter_name == "recent.write.throttling.delay.count")
row.recent_write_throttling_delay_count += value;
else if (counter_name == "recent.write.throttling.reject.count")
row.recent_write_throttling_reject_count += value;
else if (counter_name == "recent.read.throttling.delay.count")
row.recent_read_throttling_delay_count += value;
else if (counter_name == "recent.read.throttling.reject.count")
row.recent_read_throttling_reject_count += value;
else if (counter_name == "recent.backup.request.throttling.delay.count")
row.recent_backup_request_throttling_delay_count += value;
else if (counter_name == "recent.backup.request.throttling.reject.count")
row.recent_backup_request_throttling_reject_count += value;
else if (counter_name == "recent.write.splitting.reject.count")
row.recent_write_splitting_reject_count += value;
else if (counter_name == "recent.read.splitting.reject.count")
row.recent_read_splitting_reject_count += value;
else if (counter_name == "recent.write.bulk.load.ingestion.reject.count")
row.recent_write_bulk_load_ingestion_reject_count += value;
else if (counter_name == "disk.storage.sst(MB)")
row.storage_mb += value;
else if (counter_name == "disk.storage.sst.count")
row.storage_count += value;
else if (counter_name == "rdb.block_cache.hit_count")
row.rdb_block_cache_hit_count += value;
else if (counter_name == "rdb.block_cache.total_count")
row.rdb_block_cache_total_count += value;
else if (counter_name == "rdb.index_and_filter_blocks.memory_usage")
row.rdb_index_and_filter_blocks_mem_usage += value;
else if (counter_name == "rdb.memtable.memory_usage")
row.rdb_memtable_mem_usage += value;
else if (counter_name == "rdb.estimate_num_keys")
row.rdb_estimate_num_keys += value;
else if (counter_name == "rdb.bf_seek_negatives")
row.rdb_bf_seek_negatives += value;
else if (counter_name == "rdb.bf_seek_total")
row.rdb_bf_seek_total += value;
else if (counter_name == "rdb.bf_point_positive_true")
row.rdb_bf_point_positive_true += value;
else if (counter_name == "rdb.bf_point_positive_total")
row.rdb_bf_point_positive_total += value;
else if (counter_name == "rdb.bf_point_negatives")
row.rdb_bf_point_negatives += value;
else if (counter_name == "backup_request_qps")
row.backup_request_qps += value;
else if (counter_name == "backup_request_bytes")
row.backup_request_bytes += value;
else if (counter_name == "get_bytes")
row.get_bytes += value;
else if (counter_name == "multi_get_bytes")
row.multi_get_bytes += value;
else if (counter_name == "batch_get_bytes")
row.batch_get_bytes += value;
else if (counter_name == "scan_bytes")
row.scan_bytes += value;
else if (counter_name == "put_bytes")
row.put_bytes += value;
else if (counter_name == "multi_put_bytes")
row.multi_put_bytes += value;
else if (counter_name == "check_and_set_bytes")
row.check_and_set_bytes += value;
else if (counter_name == "check_and_mutate_bytes")
row.check_and_mutate_bytes += value;
else if (counter_name == "recent_rdb_compaction_input_bytes")
row.recent_rdb_compaction_input_bytes += value;
else if (counter_name == "recent_rdb_compaction_output_bytes")
row.recent_rdb_compaction_output_bytes += value;
else if (counter_name == "rdb.read_l2andup_hit_count")
row.rdb_read_l2andup_hit_count += value;
else if (counter_name == "rdb.read_l1_hit_count")
row.rdb_read_l1_hit_count += value;
else if (counter_name == "rdb.read_l0_hit_count")
row.rdb_read_l0_hit_count += value;
else if (counter_name == "rdb.read_memtable_hit_count")
row.rdb_read_memtable_hit_count += value;
else if (counter_name == "rdb.write_amplification")
row.rdb_write_amplification += value;
else if (counter_name == "rdb.read_amplification")
row.rdb_read_amplification += value;
else
return false;
return true;
}
inline bool get_apps_and_nodes(shell_context *sc,
std::vector<::dsn::app_info> &apps,
std::vector<node_desc> &nodes)
{
const auto &result = sc->ddl_client->list_apps(dsn::app_status::AS_AVAILABLE, apps);
if (!result) {
LOG_ERROR("list apps failed, error={}", result);
return false;
}
if (!fill_nodes(sc, "replica-server", nodes)) {
LOG_ERROR("get replica server node list failed");
return false;
}
return true;
}
inline bool
get_app_partitions(shell_context *sc,
const std::vector<::dsn::app_info> &apps,
std::map<int32_t, std::vector<dsn::partition_configuration>> &pcs_by_appid)
{
for (const ::dsn::app_info &app : apps) {
int32_t app_id = 0;
int32_t partition_count = 0;
dsn::error_code err = sc->ddl_client->list_app(
app.app_name, app_id, partition_count, pcs_by_appid[app.app_id]);
if (err != ::dsn::ERR_OK) {
LOG_ERROR("list app {} failed, error = {}", app.app_name, err);
return false;
}
CHECK_EQ(app_id, app.app_id);
CHECK_EQ(partition_count, app.partition_count);
}
return true;
}
inline bool decode_node_perf_counter_info(const dsn::host_port &hp,
const std::pair<bool, std::string> &result,
dsn::perf_counter_info &info)
{
if (!result.first) {
LOG_ERROR("query perf counter info from node {} failed", hp);
return false;
}
dsn::blob bb(result.second.data(), 0, result.second.size());
if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb, info)) {
LOG_ERROR("decode perf counter info from node {} failed, result = {}", hp, result.second);
return false;
}
if (info.result != "OK") {
LOG_ERROR(
"query perf counter info from node {} returns error, error = {}", hp, info.result);
return false;
}
return true;
}
// rows: key-app name, value-perf counters for each partition
inline bool get_app_partition_stat(shell_context *sc,
std::map<std::string, std::vector<row_data>> &rows)
{
// get apps and nodes
std::vector<::dsn::app_info> apps;
std::vector<node_desc> nodes;
if (!get_apps_and_nodes(sc, apps, nodes)) {
return false;
}
// get the relationship between app_id and app_name
std::map<int32_t, std::string> app_id_name;
std::map<std::string, int32_t> app_name_id;
for (::dsn::app_info &app : apps) {
app_id_name[app.app_id] = app.app_name;
app_name_id[app.app_name] = app.app_id;
rows[app.app_name].resize(app.partition_count);
}
// get app_id --> partitions
std::map<int32_t, std::vector<dsn::partition_configuration>> pcs_by_appid;
if (!get_app_partitions(sc, apps, pcs_by_appid)) {
return false;
}
// get all of the perf counters with format ".*@.*"
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc, nodes, "perf-counters", {".*@.*"});
for (int i = 0; i < nodes.size(); ++i) {
// decode info of perf-counters on node i
dsn::perf_counter_info info;
if (!decode_node_perf_counter_info(nodes[i].hp, results[i], info)) {
return false;
}
for (dsn::perf_counter_metric &m : info.counters) {
// get app_id/partition_id/counter_name/app_name from the name of perf-counter
int32_t app_id_x, partition_index_x;
std::string counter_name;
std::string app_name;
if (parse_app_pegasus_perf_counter_name(
m.name, app_id_x, partition_index_x, counter_name)) {
// only primary partition will be counted
const auto find = pcs_by_appid.find(app_id_x);
if (find != pcs_by_appid.end() &&
find->second[partition_index_x].hp_primary == nodes[i].hp) {
row_data &row = rows[app_id_name[app_id_x]][partition_index_x];
row.row_name = std::to_string(partition_index_x);
row.app_id = app_id_x;
update_app_pegasus_perf_counter(row, counter_name, m.value);
}
} else if (parse_app_perf_counter_name(m.name, app_name, counter_name)) {
// if the app_name from perf-counter isn't existed(maybe the app was dropped), it
// will be ignored.
if (app_name_id.find(app_name) == app_name_id.end()) {
continue;
}
// perf-counter value will be set into partition index 0.
row_data &row = rows[app_name][0];
row.app_id = app_name_id[app_name];
update_app_pegasus_perf_counter(row, counter_name, m.value);
}
}
}
return true;
}
// Aggregate the table-level stats for all tables since table name is not specified.
inline bool
get_table_stats(shell_context *sc, uint32_t sample_interval_ms, std::vector<row_data> &rows)
{
std::vector<::dsn::app_info> apps;
std::vector<node_desc> nodes;
if (!get_apps_and_nodes(sc, apps, nodes)) {
return false;
}
const auto &query_string = row_data_filters().to_query_string();
const auto &results_start = get_metrics(nodes, query_string);
std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
const auto &results_end = get_metrics(nodes, query_string);
std::map<int32_t, std::vector<dsn::partition_configuration>> pcs_by_appid;
if (!get_app_partitions(sc, apps, pcs_by_appid)) {
return false;
}
rows.clear();
rows.reserve(apps.size());
std::transform(
apps.begin(), apps.end(), std::back_inserter(rows), [](const dsn::app_info &app) {
row_data row;
row.row_name = app.app_name;
row.app_id = app.app_id;
row.partition_count = app.partition_count;
return row;
});
CHECK_EQ(rows.size(), pcs_by_appid.size());
for (size_t i = 0; i < nodes.size(); ++i) {
RETURN_SHELL_IF_GET_METRICS_FAILED(
results_start[i], nodes[i], "starting row data requests");
RETURN_SHELL_IF_GET_METRICS_FAILED(results_end[i], nodes[i], "ending row data requests");
auto calcs = create_table_aggregate_stats_calcs(pcs_by_appid, nodes[i].hp, "replica", rows);
RETURN_SHELL_IF_PARSE_METRICS_FAILED(
calcs->aggregate_metrics(results_start[i].body(), results_end[i].body()),
nodes[i],
"aggregate row data requests");
}
return true;
}
// Aggregate the partition-level stats for the specified table.
inline bool get_partition_stats(shell_context *sc,
const std::string &table_name,
uint32_t sample_interval_ms,
std::vector<row_data> &rows)
{
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
LOG_ERROR("get replica server node list failed");
return false;
}
int32_t table_id = 0;
int32_t partition_count = 0;
std::vector<dsn::partition_configuration> pcs;
const auto &err = sc->ddl_client->list_app(table_name, table_id, partition_count, pcs);
if (err != ::dsn::ERR_OK) {
LOG_ERROR("list app {} failed, error = {}", table_name, err);
return false;
}
CHECK_EQ(pcs.size(), partition_count);
const auto &query_string = row_data_filters(table_id).to_query_string();
const auto &results_start = get_metrics(nodes, query_string);
std::this_thread::sleep_for(std::chrono::milliseconds(sample_interval_ms));
const auto &results_end = get_metrics(nodes, query_string);
rows.clear();
rows.reserve(partition_count);
for (int32_t i = 0; i < partition_count; ++i) {
rows.emplace_back(std::to_string(i));
}
for (size_t i = 0; i < nodes.size(); ++i) {
RETURN_SHELL_IF_GET_METRICS_FAILED(
results_start[i], nodes[i], "starting row data requests for table(id={})", table_id);
RETURN_SHELL_IF_GET_METRICS_FAILED(
results_end[i], nodes[i], "ending row data requests for table(id={})", table_id);
auto calcs =
create_partition_aggregate_stats_calcs(table_id, pcs, nodes[i].hp, "replica", rows);
RETURN_SHELL_IF_PARSE_METRICS_FAILED(
calcs->aggregate_metrics(results_start[i].body(), results_end[i].body()),
nodes[i],
"aggregate row data requests for table(id={})",
table_id);
}
return true;
}
inline bool get_app_stat(shell_context *sc,
const std::string &table_name,
uint32_t sample_interval_ms,
std::vector<row_data> &rows)
{
if (table_name.empty()) {
return get_table_stats(sc, sample_interval_ms, rows);
}
return get_partition_stats(sc, table_name, sample_interval_ms, rows);
}
struct node_capacity_unit_stat
{
// timestamp when node perf_counter_info has updated.
std::string timestamp;
std::string node_address;
// mapping: app_id --> (read_cu, write_cu)
std::map<int32_t, std::pair<int64_t, int64_t>> cu_value_by_app;
std::string dump_to_json() const
{
std::map<int32_t, std::vector<int64_t>> values;
for (auto &kv : cu_value_by_app) {
auto &pair = kv.second;
if (pair.first != 0 || pair.second != 0)
values.emplace(kv.first, std::vector<int64_t>{pair.first, pair.second});
}
std::stringstream out;
rapidjson::OStreamWrapper wrapper(out);
dsn::json::JsonWriter writer(wrapper);
dsn::json::json_encode(writer, values);
return out.str();
}
};
inline bool get_capacity_unit_stat(shell_context *sc,
std::vector<node_capacity_unit_stat> &nodes_stat)
{
std::vector<node_desc> nodes;
if (!fill_nodes(sc, "replica-server", nodes)) {
LOG_ERROR("get replica server node list failed");
return false;
}
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc, nodes, "perf-counters-by-substr", {".cu@"});
nodes_stat.resize(nodes.size());
for (int i = 0; i < nodes.size(); ++i) {
dsn::perf_counter_info info;
if (!decode_node_perf_counter_info(nodes[i].hp, results[i], info)) {
LOG_WARNING("decode perf counter from node({}) failed, just ignore it", nodes[i].hp);
continue;
}
nodes_stat[i].timestamp = info.timestamp_str;
nodes_stat[i].node_address =
dsn::dns_resolver::instance().resolve_address(nodes[i].hp).to_string();
for (dsn::perf_counter_metric &m : info.counters) {
int32_t app_id, pidx;
std::string counter_name;
bool r = parse_app_pegasus_perf_counter_name(m.name, app_id, pidx, counter_name);
CHECK(r, "name = {}", m.name);
if (counter_name == "recent.read.cu") {
nodes_stat[i].cu_value_by_app[app_id].first += (int64_t)m.value;
} else if (counter_name == "recent.write.cu") {
nodes_stat[i].cu_value_by_app[app_id].second += (int64_t)m.value;
}
}
}
return true;
}
struct app_storage_size_stat
{
// timestamp when this stat is generated.
std::string timestamp;
// mapping: app_id --> [app_partition_count, stat_partition_count, storage_size_in_mb]
std::map<int32_t, std::vector<int64_t>> st_value_by_app;
std::string dump_to_json() const
{
std::stringstream out;
rapidjson::OStreamWrapper wrapper(out);
dsn::json::JsonWriter writer(wrapper);
dsn::json::json_encode(writer, st_value_by_app);
return out.str();
}
};
inline bool get_storage_size_stat(shell_context *sc, app_storage_size_stat &st_stat)
{
std::vector<::dsn::app_info> apps;
std::vector<node_desc> nodes;
if (!get_apps_and_nodes(sc, apps, nodes)) {
LOG_ERROR("get apps and nodes failed");
return false;
}
std::map<int32_t, std::vector<dsn::partition_configuration>> pcs_by_appid;
if (!get_app_partitions(sc, apps, pcs_by_appid)) {
LOG_ERROR("get app partitions failed");
return false;
}
for (auto &[_, pcs] : pcs_by_appid) {
for (auto &pc : pcs) {
// use partition_flags to record if this partition's storage size is calculated,
// because `pcs_by_appid' is a temporary variable, so we can re-use partition_flags.
pc.partition_flags = 0;
}
}
std::vector<std::pair<bool, std::string>> results = call_remote_command(
sc, nodes, "perf-counters-by-prefix", {"replica*app.pegasus*disk.storage.sst(MB)"});
for (int i = 0; i < nodes.size(); ++i) {
dsn::perf_counter_info info;
if (!decode_node_perf_counter_info(nodes[i].hp, results[i], info)) {
LOG_WARNING("decode perf counter from node({}) failed, just ignore it", nodes[i].hp);
continue;
}
for (dsn::perf_counter_metric &m : info.counters) {
int32_t app_id_x, partition_index_x;
std::string counter_name;
bool parse_ret = parse_app_pegasus_perf_counter_name(
m.name, app_id_x, partition_index_x, counter_name);
CHECK(parse_ret, "name = {}", m.name);
if (counter_name != "disk.storage.sst(MB)")
continue;
auto find = pcs_by_appid.find(app_id_x);
if (find == pcs_by_appid.end()) // app id not found
continue;
auto &pc = find->second[partition_index_x];
if (pc.hp_primary != nodes[i].hp) // not primary replica
continue;
if (pc.partition_flags != 0) // already calculated
continue;
pc.partition_flags = 1;
int64_t app_partition_count = find->second.size();
auto st_it = st_stat.st_value_by_app
.emplace(app_id_x, std::vector<int64_t>{app_partition_count, 0, 0})
.first;
st_it->second[1]++; // stat_partition_count
st_it->second[2] += m.value; // storage_size_in_mb
}
}
char buf[20];
dsn::utils::time_ms_to_date_time(dsn_now_ms(), buf, sizeof(buf));
st_stat.timestamp = buf;
return true;
}