platform/statistic/stats.cpp (506 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "platform/statistic/stats.h"
#include <glog/logging.h>
#include <ctime>
#include "common/utils/utils.h"
#include "proto/kv/kv.pb.h"
namespace asio = boost::asio;
namespace beast = boost::beast;
using tcp = asio::ip::tcp;
namespace resdb {
std::mutex g_mutex;
Stats* Stats::GetGlobalStats(int seconds) {
std::unique_lock<std::mutex> lk(g_mutex);
static Stats stats(seconds);
return &stats;
} // gets a singelton instance of Stats Class
Stats::Stats(int sleep_time) {
monitor_sleep_time_ = sleep_time;
#ifdef TEST_MODE
monitor_sleep_time_ = 1;
#endif
num_call_ = 0;
num_commit_ = 0;
run_time_ = 0;
run_call_ = 0;
run_call_time_ = 0;
server_call_ = 0;
server_process_ = 0;
run_req_num_ = 0;
run_req_run_time_ = 0;
seq_gap_ = 0;
total_request_ = 0;
total_geo_request_ = 0;
geo_request_ = 0;
stop_ = false;
begin_ = false;
socket_recv_ = 0;
broad_cast_msg_ = 0;
send_broad_cast_msg_ = 0;
prometheus_ = nullptr;
global_thread_ =
std::thread(&Stats::MonitorGlobal, this); // pass by reference
transaction_summary_.port = -1;
// Setup websocket here
make_faulty_.store(false);
transaction_summary_.request_pre_prepare_state_time =
std::chrono::system_clock::time_point::min();
transaction_summary_.prepare_state_time =
std::chrono::system_clock::time_point::min();
transaction_summary_.commit_state_time =
std::chrono::system_clock::time_point::min();
transaction_summary_.execution_time =
std::chrono::system_clock::time_point::min();
transaction_summary_.txn_number = 0;
}
void Stats::Stop() { stop_ = true; }
Stats::~Stats() {
stop_ = true;
if (global_thread_.joinable()) {
global_thread_.join();
}
if (enable_resview && crow_thread_.joinable()) {
crow_thread_.join();
}
}
int64_t GetRSS() {
int64_t rss = 0;
FILE* fp = NULL;
if ((fp = fopen("/proc/self/statm", "r")) == NULL) {
return 0;
}
uint64_t size, resident, share, text, lib, data, dt;
if (fscanf(fp, "%lu %lu %lu %lu %lu %lu %lu", &size, &resident, &share, &text,
&lib, &data, &dt) != 7) {
fclose(fp);
return 0;
}
fclose(fp);
int64_t page_size = sysconf(_SC_PAGESIZE);
rss = resident * page_size;
// Convert to MB
rss = rss / (1024 * 1024);
return rss;
}
void Stats::CrowRoute() {
crow::SimpleApp app;
while (!stop_) {
try {
CROW_ROUTE(app, "/consensus_data")
.methods("GET"_method)([this](const crow::request& req,
crow::response& res) {
LOG(ERROR) << "API 1";
res.set_header("Access-Control-Allow-Origin",
"*"); // Allow requests from any origin
res.set_header("Access-Control-Allow-Methods",
"GET, POST, OPTIONS"); // Specify allowed methods
res.set_header(
"Access-Control-Allow-Headers",
"Content-Type, Authorization"); // Specify allowed headers
// Send your response
res.body = consensus_history_.dump();
res.end();
});
CROW_ROUTE(app, "/get_status")
.methods("GET"_method)([this](const crow::request& req,
crow::response& res) {
LOG(ERROR) << "API 2";
res.set_header("Access-Control-Allow-Origin",
"*"); // Allow requests from any origin
res.set_header("Access-Control-Allow-Methods",
"GET, POST, OPTIONS"); // Specify allowed methods
res.set_header(
"Access-Control-Allow-Headers",
"Content-Type, Authorization"); // Specify allowed headers
// Send your response
res.body = IsFaulty() ? "Faulty" : "Not Faulty";
res.end();
});
CROW_ROUTE(app, "/make_faulty")
.methods("GET"_method)([this](const crow::request& req,
crow::response& res) {
LOG(ERROR) << "API 3";
res.set_header("Access-Control-Allow-Origin",
"*"); // Allow requests from any origin
res.set_header("Access-Control-Allow-Methods",
"GET, POST, OPTIONS"); // Specify allowed methods
res.set_header(
"Access-Control-Allow-Headers",
"Content-Type, Authorization"); // Specify allowed headers
// Send your response
if (enable_faulty_switch_) {
make_faulty_.store(!make_faulty_.load());
}
res.body = "Success";
res.end();
});
CROW_ROUTE(app, "/transaction_data")
.methods("GET"_method)([this](const crow::request& req,
crow::response& res) {
LOG(ERROR) << "API 4";
res.set_header("Access-Control-Allow-Origin",
"*"); // Allow requests from any origin
res.set_header("Access-Control-Allow-Methods",
"GET, POST, OPTIONS"); // Specify allowed methods
res.set_header(
"Access-Control-Allow-Headers",
"Content-Type, Authorization"); // Specify allowed headers
nlohmann::json mem_view_json;
int status =
getrusage(RUSAGE_SELF, &transaction_summary_.process_stats_);
if (status == 0) {
mem_view_json["resident_set_size"] = GetRSS();
mem_view_json["max_resident_set_size"] =
transaction_summary_.process_stats_.ru_maxrss;
mem_view_json["num_reads"] =
transaction_summary_.process_stats_.ru_inblock;
mem_view_json["num_writes"] =
transaction_summary_.process_stats_.ru_oublock;
}
mem_view_json["ext_cache_hit_ratio"] =
transaction_summary_.ext_cache_hit_ratio_;
mem_view_json["level_db_stats"] =
transaction_summary_.level_db_stats_;
mem_view_json["level_db_approx_mem_size"] =
transaction_summary_.level_db_approx_mem_size_;
res.body = mem_view_json.dump();
mem_view_json.clear();
res.end();
});
app.port(8500 + transaction_summary_.port).multithreaded().run();
sleep(1);
} catch (const std::exception& e) {
}
}
app.stop();
}
bool Stats::IsFaulty() { return make_faulty_.load(); }
void Stats::ChangePrimary(int primary_id) {
transaction_summary_.primary_id = primary_id;
make_faulty_.store(false);
}
void Stats::SetProps(int replica_id, std::string ip, int port,
bool resview_flag, bool faulty_flag) {
transaction_summary_.replica_id = replica_id;
transaction_summary_.ip = ip;
transaction_summary_.port = port;
enable_resview = resview_flag;
enable_faulty_switch_ = faulty_flag;
if (resview_flag) {
crow_thread_ = std::thread(&Stats::CrowRoute, this);
}
}
void Stats::SetPrimaryId(int primary_id) {
transaction_summary_.primary_id = primary_id;
}
void Stats::SetStorageEngineMetrics(double ext_cache_hit_ratio,
std::string level_db_stats,
std::string level_db_approx_mem_size) {
transaction_summary_.ext_cache_hit_ratio_ = ext_cache_hit_ratio;
transaction_summary_.level_db_stats_ = level_db_stats;
transaction_summary_.level_db_approx_mem_size_ = level_db_approx_mem_size;
}
void Stats::RecordStateTime(std::string state) {
if (!enable_resview) {
return;
}
if (state == "request" || state == "pre-prepare") {
transaction_summary_.request_pre_prepare_state_time =
std::chrono::system_clock::now();
} else if (state == "prepare") {
transaction_summary_.prepare_state_time = std::chrono::system_clock::now();
} else if (state == "commit") {
transaction_summary_.commit_state_time = std::chrono::system_clock::now();
}
}
void Stats::GetTransactionDetails(BatchUserRequest batch_request) {
if (!enable_resview) {
return;
}
transaction_summary_.txn_number = batch_request.seq();
transaction_summary_.txn_command.clear();
transaction_summary_.txn_key.clear();
transaction_summary_.txn_value.clear();
for (auto& sub_request : batch_request.user_requests()) {
KVRequest kv_request;
if (!kv_request.ParseFromString(sub_request.request().data())) {
break;
}
if (kv_request.cmd() == KVRequest::SET) {
transaction_summary_.txn_command.push_back("SET");
transaction_summary_.txn_key.push_back(kv_request.key());
transaction_summary_.txn_value.push_back(kv_request.value());
} else if (kv_request.cmd() == KVRequest::GET) {
transaction_summary_.txn_command.push_back("GET");
transaction_summary_.txn_key.push_back(kv_request.key());
transaction_summary_.txn_value.push_back("");
} else if (kv_request.cmd() == KVRequest::GETALLVALUES) {
transaction_summary_.txn_command.push_back("GETALLVALUES");
transaction_summary_.txn_key.push_back(kv_request.key());
transaction_summary_.txn_value.push_back("");
} else if (kv_request.cmd() == KVRequest::GETRANGE) {
transaction_summary_.txn_command.push_back("GETRANGE");
transaction_summary_.txn_key.push_back(kv_request.key());
transaction_summary_.txn_value.push_back(kv_request.value());
}
}
}
void Stats::SendSummary() {
if (!enable_resview) {
return;
}
transaction_summary_.execution_time = std::chrono::system_clock::now();
// Convert Transaction Summary to JSON
summary_json_["replica_id"] = transaction_summary_.replica_id;
summary_json_["ip"] = transaction_summary_.ip;
summary_json_["port"] = transaction_summary_.port;
summary_json_["primary_id"] = transaction_summary_.primary_id;
summary_json_["propose_pre_prepare_time"] =
transaction_summary_.request_pre_prepare_state_time.time_since_epoch()
.count();
summary_json_["prepare_time"] =
transaction_summary_.prepare_state_time.time_since_epoch().count();
summary_json_["commit_time"] =
transaction_summary_.commit_state_time.time_since_epoch().count();
summary_json_["execution_time"] =
transaction_summary_.execution_time.time_since_epoch().count();
for (size_t i = 0;
i < transaction_summary_.prepare_message_count_times_list.size(); i++) {
summary_json_["prepare_message_timestamps"].push_back(
transaction_summary_.prepare_message_count_times_list[i]
.time_since_epoch()
.count());
}
for (size_t i = 0;
i < transaction_summary_.commit_message_count_times_list.size(); i++) {
summary_json_["commit_message_timestamps"].push_back(
transaction_summary_.commit_message_count_times_list[i]
.time_since_epoch()
.count());
}
summary_json_["txn_number"] = transaction_summary_.txn_number;
for (size_t i = 0; i < transaction_summary_.txn_command.size(); i++) {
summary_json_["txn_commands"].push_back(
transaction_summary_.txn_command[i]);
}
for (size_t i = 0; i < transaction_summary_.txn_key.size(); i++) {
summary_json_["txn_keys"].push_back(transaction_summary_.txn_key[i]);
}
for (size_t i = 0; i < transaction_summary_.txn_value.size(); i++) {
summary_json_["txn_values"].push_back(transaction_summary_.txn_value[i]);
}
summary_json_["ext_cache_hit_ratio"] =
transaction_summary_.ext_cache_hit_ratio_;
consensus_history_[std::to_string(transaction_summary_.txn_number)] =
summary_json_;
LOG(ERROR) << summary_json_.dump();
// Reset Transaction Summary Parameters
transaction_summary_.request_pre_prepare_state_time =
std::chrono::system_clock::time_point::min();
transaction_summary_.prepare_state_time =
std::chrono::system_clock::time_point::min();
transaction_summary_.commit_state_time =
std::chrono::system_clock::time_point::min();
transaction_summary_.execution_time =
std::chrono::system_clock::time_point::min();
transaction_summary_.prepare_message_count_times_list.clear();
transaction_summary_.commit_message_count_times_list.clear();
summary_json_.clear();
}
void Stats::MonitorGlobal() {
LOG(ERROR) << "monitor:" << name_ << " sleep time:" << monitor_sleep_time_;
uint64_t seq_fail = 0;
uint64_t client_call = 0, socket_recv = 0;
uint64_t num_client_req = 0, num_propose = 0, num_prepare = 0, num_commit = 0,
pending_execute = 0, execute = 0, execute_done = 0;
uint64_t broad_cast_msg = 0, send_broad_cast_msg = 0;
uint64_t send_broad_cast_msg_per_rep = 0;
uint64_t server_call = 0, server_process = 0;
uint64_t seq_gap = 0;
uint64_t total_request = 0, total_geo_request = 0, geo_request = 0;
// ====== for client proxy ======
uint64_t run_req_num = 0, run_req_run_time = 0;
uint64_t last_run_req_num = 0, last_run_req_run_time = 0;
// =============================
uint64_t last_seq_fail = 0;
uint64_t last_num_client_req = 0, last_num_propose = 0, last_num_prepare = 0,
last_num_commit = 0;
uint64_t last_pending_execute = 0, last_execute = 0, last_execute_done = 0;
uint64_t last_client_call = 0, last_socket_recv = 0;
uint64_t last_broad_cast_msg = 0, last_send_broad_cast_msg = 0;
uint64_t last_send_broad_cast_msg_per_rep = 0;
uint64_t last_server_call = 0, last_server_process = 0;
uint64_t last_total_request = 0, last_total_geo_request = 0,
last_geo_request = 0;
uint64_t time = 0;
while (!stop_) {
sleep(monitor_sleep_time_);
time += monitor_sleep_time_;
seq_fail = seq_fail_;
socket_recv = socket_recv_;
client_call = client_call_;
num_client_req = num_client_req_;
num_propose = num_propose_;
num_prepare = num_prepare_;
num_commit = num_commit_;
pending_execute = pending_execute_;
execute = execute_;
execute_done = execute_done_;
broad_cast_msg = broad_cast_msg_;
send_broad_cast_msg = send_broad_cast_msg_;
send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep_;
server_call = server_call_;
server_process = server_process_;
seq_gap = seq_gap_;
total_request = total_request_;
total_geo_request = total_geo_request_;
geo_request = geo_request_;
run_req_num = run_req_num_;
run_req_run_time = run_req_run_time_;
LOG(ERROR) << "=========== monitor =========\n"
<< "server call:" << server_call - last_server_call
<< " server process:" << server_process - last_server_process
<< " socket recv:" << socket_recv - last_socket_recv
<< " "
"client call:"
<< client_call - last_client_call
<< " "
"client req:"
<< num_client_req - last_num_client_req
<< " "
"broad_cast:"
<< broad_cast_msg - last_broad_cast_msg
<< " "
"send broad_cast:"
<< send_broad_cast_msg - last_send_broad_cast_msg
<< " "
"per send broad_cast:"
<< send_broad_cast_msg_per_rep - last_send_broad_cast_msg_per_rep
<< " "
"propose:"
<< num_propose - last_num_propose
<< " "
"prepare:"
<< (num_prepare - last_num_prepare)
<< " "
"commit:"
<< (num_commit - last_num_commit)
<< " "
"pending execute:"
<< pending_execute - last_pending_execute
<< " "
"execute:"
<< execute - last_execute
<< " "
"execute done:"
<< execute_done - last_execute_done << " seq gap:" << seq_gap
<< " total request:" << total_request - last_total_request
<< " txn:" << (total_request - last_total_request) / 5
<< " total geo request:"
<< total_geo_request - last_total_geo_request
<< " total geo request per:"
<< (total_geo_request - last_total_geo_request) / 5
<< " geo request:" << (geo_request - last_geo_request)
<< " "
"seq fail:"
<< seq_fail - last_seq_fail << " time:" << time
<< " "
"\n--------------- monitor ------------";
if (run_req_num - last_run_req_num > 0) {
LOG(ERROR) << " req client latency:"
<< static_cast<double>(run_req_run_time -
last_run_req_run_time) /
(run_req_num - last_run_req_num) / 1000000000.0;
}
last_seq_fail = seq_fail;
last_socket_recv = socket_recv;
last_client_call = client_call;
last_num_client_req = num_client_req;
last_num_propose = num_propose;
last_num_prepare = num_prepare;
last_num_commit = num_commit;
last_pending_execute = pending_execute;
last_execute = execute;
last_execute_done = execute_done;
last_broad_cast_msg = broad_cast_msg;
last_send_broad_cast_msg = send_broad_cast_msg;
last_send_broad_cast_msg_per_rep = send_broad_cast_msg_per_rep;
last_server_call = server_call;
last_server_process = server_process;
last_run_req_num = run_req_num;
last_run_req_run_time = run_req_run_time;
last_total_request = total_request;
last_total_geo_request = total_geo_request;
last_geo_request = geo_request;
}
}
void Stats::IncClientCall() {
if (prometheus_) {
prometheus_->Inc(CLIENT_CALL, 1);
}
client_call_++;
}
void Stats::IncClientRequest() {
if (prometheus_) {
prometheus_->Inc(CLIENT_REQ, 1);
}
num_client_req_++;
}
void Stats::IncPropose() {
if (prometheus_) {
prometheus_->Inc(PROPOSE, 1);
}
num_propose_++;
}
void Stats::IncPrepare() {
if (prometheus_) {
prometheus_->Inc(PREPARE, 1);
}
num_prepare_++;
transaction_summary_.prepare_message_count_times_list.push_back(
std::chrono::system_clock::now());
}
void Stats::IncCommit() {
if (prometheus_) {
prometheus_->Inc(COMMIT, 1);
}
num_commit_++;
transaction_summary_.commit_message_count_times_list.push_back(
std::chrono::system_clock::now());
}
void Stats::IncPendingExecute() { pending_execute_++; }
void Stats::IncExecute() { execute_++; }
void Stats::IncExecuteDone() {
if (prometheus_) {
prometheus_->Inc(EXECUTE, 1);
}
execute_done_++;
}
void Stats::BroadCastMsg() {
if (prometheus_) {
prometheus_->Inc(BROAD_CAST, 1);
}
broad_cast_msg_++;
}
void Stats::SendBroadCastMsg(uint32_t num) { send_broad_cast_msg_ += num; }
void Stats::SendBroadCastMsgPerRep() { send_broad_cast_msg_per_rep_++; }
void Stats::SeqFail() { seq_fail_++; }
void Stats::IncTotalRequest(uint32_t num) {
if (prometheus_) {
prometheus_->Inc(NUM_EXECUTE_TX, num);
}
total_request_ += num;
}
void Stats::IncTotalGeoRequest(uint32_t num) { total_geo_request_ += num; }
void Stats::IncGeoRequest() { geo_request_++; }
void Stats::ServerCall() {
if (prometheus_) {
prometheus_->Inc(SERVER_CALL_NAME, 1);
}
server_call_++;
}
void Stats::ServerProcess() {
if (prometheus_) {
prometheus_->Inc(SERVER_PROCESS, 1);
}
server_process_++;
}
void Stats::SeqGap(uint64_t seq_gap) { seq_gap_ = seq_gap; }
void Stats::AddLatency(uint64_t run_time) {
run_req_num_++;
run_req_run_time_ += run_time;
}
void Stats::SetPrometheus(const std::string& prometheus_address) {
prometheus_ = std::make_unique<PrometheusHandler>(prometheus_address);
}
} // namespace resdb