src/server/info_collector.h (201 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
// IWYU pragma: no_include <bits/std_abs.h>
#include <s2/third_party/absl/base/port.h>
#include <stdint.h>
#include <stdlib.h>
#include <map>
#include <memory>
#include <string>
#include "perf_counter/perf_counter.h"
#include "perf_counter/perf_counter_wrapper.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/task/task.h"
#include "runtime/task/task_tracker.h"
#include "shell/command_helper.h"
#include "utils/synchronize.h"
struct shell_context;
namespace pegasus {
class pegasus_client;
namespace server {
class hotspot_partition_calculator;
class result_writer;
class info_collector
{
public:
struct app_stat_counters
{
double convert_to_1M_ratio(double hit, double total)
{
return std::abs(total) < 1e-6 ? 0 : hit / total * 1e6;
}
void set(const row_data &row_stats)
{
get_qps->set(row_stats.get_qps);
multi_get_qps->set(row_stats.multi_get_qps);
put_qps->set(row_stats.put_qps);
multi_put_qps->set(row_stats.multi_put_qps);
remove_qps->set(row_stats.remove_qps);
multi_remove_qps->set(row_stats.multi_remove_qps);
incr_qps->set(row_stats.incr_qps);
check_and_set_qps->set(row_stats.check_and_set_qps);
check_and_mutate_qps->set(row_stats.check_and_mutate_qps);
scan_qps->set(row_stats.scan_qps);
duplicate_qps->set(row_stats.duplicate_qps);
dup_shipped_ops->set(row_stats.dup_shipped_ops);
dup_failed_shipping_ops->set(row_stats.dup_failed_shipping_ops);
dup_recent_mutation_loss_count->set(row_stats.dup_recent_mutation_loss_count);
recent_read_cu->set(row_stats.recent_read_cu);
recent_write_cu->set(row_stats.recent_write_cu);
recent_expire_count->set(row_stats.recent_expire_count);
recent_filter_count->set(row_stats.recent_filter_count);
recent_abnormal_count->set(row_stats.recent_abnormal_count);
recent_write_throttling_delay_count->set(row_stats.recent_write_throttling_delay_count);
recent_write_throttling_reject_count->set(
row_stats.recent_write_throttling_reject_count);
recent_read_throttling_delay_count->set(row_stats.recent_read_throttling_delay_count);
recent_read_throttling_reject_count->set(row_stats.recent_read_throttling_reject_count);
recent_backup_request_throttling_delay_count->set(
row_stats.recent_backup_request_throttling_delay_count);
recent_backup_request_throttling_reject_count->set(
row_stats.recent_backup_request_throttling_reject_count);
recent_write_splitting_reject_count->set(row_stats.recent_write_splitting_reject_count);
recent_read_splitting_reject_count->set(row_stats.recent_read_splitting_reject_count);
recent_write_bulk_load_ingestion_reject_count->set(
row_stats.recent_write_bulk_load_ingestion_reject_count);
storage_mb->set(row_stats.storage_mb);
storage_count->set(row_stats.storage_count);
rdb_block_cache_hit_rate->set(convert_to_1M_ratio(
row_stats.rdb_block_cache_hit_count, row_stats.rdb_block_cache_total_count));
rdb_index_and_filter_blocks_mem_usage->set(
row_stats.rdb_index_and_filter_blocks_mem_usage);
rdb_memtable_mem_usage->set(row_stats.rdb_memtable_mem_usage);
rdb_estimate_num_keys->set(row_stats.rdb_estimate_num_keys);
rdb_bf_seek_negatives_rate->set(
convert_to_1M_ratio(row_stats.rdb_bf_seek_negatives, row_stats.rdb_bf_seek_total));
rdb_bf_point_negatives_rate->set(convert_to_1M_ratio(
row_stats.rdb_bf_point_negatives,
row_stats.rdb_bf_point_negatives + row_stats.rdb_bf_point_positive_total));
rdb_bf_point_false_positive_rate->set(convert_to_1M_ratio(
row_stats.rdb_bf_point_positive_total - row_stats.rdb_bf_point_positive_true,
(row_stats.rdb_bf_point_positive_total - row_stats.rdb_bf_point_positive_true) +
row_stats.rdb_bf_point_negatives));
read_qps->set(row_stats.get_total_read_qps());
write_qps->set(row_stats.get_total_write_qps());
backup_request_qps->set(row_stats.backup_request_qps);
backup_request_bytes->set(row_stats.backup_request_bytes);
get_bytes->set(row_stats.get_bytes);
multi_get_bytes->set(row_stats.multi_get_bytes);
scan_bytes->set(row_stats.scan_bytes);
put_bytes->set(row_stats.put_bytes);
multi_put_bytes->set(row_stats.multi_put_bytes);
check_and_set_bytes->set(row_stats.check_and_set_bytes);
check_and_mutate_bytes->set(row_stats.check_and_mutate_bytes);
read_bytes->set(row_stats.get_total_read_bytes());
write_bytes->set(row_stats.get_total_write_bytes());
recent_rdb_compaction_input_bytes->set(row_stats.recent_rdb_compaction_input_bytes);
recent_rdb_compaction_output_bytes->set(row_stats.recent_rdb_compaction_output_bytes);
rdb_read_l2andup_hit_rate->set(convert_to_1M_ratio(
row_stats.rdb_read_l2andup_hit_count, row_stats.rdb_block_cache_total_count));
rdb_read_l1_hit_rate->set(convert_to_1M_ratio(row_stats.rdb_read_l1_hit_count,
row_stats.rdb_block_cache_total_count));
rdb_read_l0_hit_rate->set(convert_to_1M_ratio(row_stats.rdb_read_l0_hit_count,
row_stats.rdb_block_cache_total_count));
rdb_read_memtable_hit_rate->set(convert_to_1M_ratio(
row_stats.rdb_read_memtable_hit_count, row_stats.rdb_block_cache_total_count));
rdb_write_amplification->set(row_stats.rdb_write_amplification /
row_stats.partition_count);
rdb_read_amplification->set(row_stats.rdb_read_amplification /
row_stats.partition_count);
}
::dsn::perf_counter_wrapper get_qps;
::dsn::perf_counter_wrapper multi_get_qps;
::dsn::perf_counter_wrapper put_qps;
::dsn::perf_counter_wrapper multi_put_qps;
::dsn::perf_counter_wrapper remove_qps;
::dsn::perf_counter_wrapper multi_remove_qps;
::dsn::perf_counter_wrapper incr_qps;
::dsn::perf_counter_wrapper check_and_set_qps;
::dsn::perf_counter_wrapper check_and_mutate_qps;
::dsn::perf_counter_wrapper scan_qps;
::dsn::perf_counter_wrapper duplicate_qps;
::dsn::perf_counter_wrapper dup_shipped_ops;
::dsn::perf_counter_wrapper dup_failed_shipping_ops;
::dsn::perf_counter_wrapper dup_recent_mutation_loss_count;
::dsn::perf_counter_wrapper recent_read_cu;
::dsn::perf_counter_wrapper recent_write_cu;
::dsn::perf_counter_wrapper recent_expire_count;
::dsn::perf_counter_wrapper recent_filter_count;
::dsn::perf_counter_wrapper recent_abnormal_count;
::dsn::perf_counter_wrapper recent_write_throttling_delay_count;
::dsn::perf_counter_wrapper recent_write_throttling_reject_count;
::dsn::perf_counter_wrapper recent_read_throttling_delay_count;
::dsn::perf_counter_wrapper recent_read_throttling_reject_count;
::dsn::perf_counter_wrapper recent_backup_request_throttling_delay_count;
::dsn::perf_counter_wrapper recent_backup_request_throttling_reject_count;
::dsn::perf_counter_wrapper recent_write_splitting_reject_count;
::dsn::perf_counter_wrapper recent_read_splitting_reject_count;
::dsn::perf_counter_wrapper recent_write_bulk_load_ingestion_reject_count;
::dsn::perf_counter_wrapper storage_mb;
::dsn::perf_counter_wrapper storage_count;
::dsn::perf_counter_wrapper rdb_block_cache_hit_rate;
::dsn::perf_counter_wrapper rdb_block_cache_mem_usage;
::dsn::perf_counter_wrapper rdb_index_and_filter_blocks_mem_usage;
::dsn::perf_counter_wrapper rdb_memtable_mem_usage;
::dsn::perf_counter_wrapper rdb_estimate_num_keys;
::dsn::perf_counter_wrapper rdb_bf_seek_negatives_rate;
::dsn::perf_counter_wrapper rdb_bf_point_negatives_rate;
::dsn::perf_counter_wrapper rdb_bf_point_false_positive_rate;
::dsn::perf_counter_wrapper read_qps;
::dsn::perf_counter_wrapper write_qps;
::dsn::perf_counter_wrapper backup_request_qps;
::dsn::perf_counter_wrapper backup_request_bytes;
::dsn::perf_counter_wrapper get_bytes;
::dsn::perf_counter_wrapper multi_get_bytes;
::dsn::perf_counter_wrapper scan_bytes;
::dsn::perf_counter_wrapper put_bytes;
::dsn::perf_counter_wrapper multi_put_bytes;
::dsn::perf_counter_wrapper check_and_set_bytes;
::dsn::perf_counter_wrapper check_and_mutate_bytes;
::dsn::perf_counter_wrapper read_bytes;
::dsn::perf_counter_wrapper write_bytes;
::dsn::perf_counter_wrapper recent_rdb_compaction_input_bytes;
::dsn::perf_counter_wrapper recent_rdb_compaction_output_bytes;
::dsn::perf_counter_wrapper rdb_read_l2andup_hit_rate;
::dsn::perf_counter_wrapper rdb_read_l1_hit_rate;
::dsn::perf_counter_wrapper rdb_read_l0_hit_rate;
::dsn::perf_counter_wrapper rdb_read_memtable_hit_rate;
::dsn::perf_counter_wrapper rdb_write_amplification;
::dsn::perf_counter_wrapper rdb_read_amplification;
};
info_collector();
~info_collector();
void start();
void stop();
void on_app_stat();
app_stat_counters *get_app_counters(const std::string &app_name);
void on_capacity_unit_stat(int remaining_retry_count);
bool has_capacity_unit_updated(const std::string &node_address, const std::string ×tamp);
void on_storage_size_stat(int remaining_retry_count);
private:
dsn::task_tracker _tracker;
::dsn::rpc_address _meta_servers;
std::string _cluster_name;
std::shared_ptr<shell_context> _shell_context;
::dsn::task_ptr _app_stat_timer_task;
::dsn::utils::ex_lock_nr _app_stat_counter_lock;
std::map<std::string, app_stat_counters *> _app_stat_counters;
// client to access server.
pegasus_client *_client;
// for writing cu stat result
std::unique_ptr<result_writer> _result_writer;
uint32_t _capacity_unit_retry_wait_seconds;
uint32_t _capacity_unit_retry_max_count;
::dsn::task_ptr _capacity_unit_stat_timer_task;
uint32_t _storage_size_retry_wait_seconds;
uint32_t _storage_size_retry_max_count;
::dsn::task_ptr _storage_size_stat_timer_task;
::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock;
// mapping 'node address' --> 'last updated timestamp'
std::map<std::string, string> _capacity_unit_update_info;
// _hotspot_calculator_store is to save hotspot_partition_calculator for each table, a
// hotspot_partition_calculator saves historical hotspot data and alert perf_counters of
// corresponding table
std::map<std::string, std::shared_ptr<hotspot_partition_calculator>> _hotspot_calculator_store;
std::shared_ptr<hotspot_partition_calculator>
get_hotspot_calculator(const std::string &app_name, const int partition_count);
};
} // namespace server
} // namespace pegasus