platform/statistic/stats.h (116 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #pragma once #include <crow.h> #include <chrono> #include <future> #include <nlohmann/json.hpp> #include "boost/asio.hpp" #include "boost/beast.hpp" #include "platform/common/network/tcp_socket.h" #include "platform/proto/resdb.pb.h" #include "platform/statistic/prometheus_handler.h" #include "proto/kv/kv.pb.h" #include "sys/resource.h" namespace asio = boost::asio; namespace beast = boost::beast; using tcp = asio::ip::tcp; namespace resdb { struct VisualData { // Set when initializing int replica_id; int primary_id; std::string ip; int port; // Set when new txn is received int txn_number; std::vector<std::string> txn_command; std::vector<std::string> txn_key; std::vector<std::string> txn_value; // Request state if primary_id==replica_id, pre_prepare state otherwise std::chrono::system_clock::time_point request_pre_prepare_state_time; std::chrono::system_clock::time_point prepare_state_time; std::vector<std::chrono::system_clock::time_point> prepare_message_count_times_list; std::chrono::system_clock::time_point commit_state_time; std::vector<std::chrono::system_clock::time_point> commit_message_count_times_list; std::chrono::system_clock::time_point execution_time; // Storage Engine Stats double ext_cache_hit_ratio_; std::string level_db_stats_; std::string level_db_approx_mem_size_; // process stats struct rusage process_stats_; }; class Stats { public: static Stats* GetGlobalStats(int sleep_seconds = 5); void Stop(); void RetrieveProgress(); void SetProps(int replica_id, std::string ip, int port, bool resview_flag, bool faulty_flag); void SetPrimaryId(int primary_id); void SetStorageEngineMetrics(double ext_cache_hit_ratio, std::string level_db_stats, std::string level_db_approx_mem_size); void RecordStateTime(std::string state); void GetTransactionDetails(BatchUserRequest batch_request); void SendSummary(); void CrowRoute(); bool IsFaulty(); void ChangePrimary(int primary_id); void AddLatency(uint64_t run_time); void Monitor(); void MonitorGlobal(); void IncSocketRecv(); void IncClientCall(); void IncClientRequest(); void IncPropose(); void IncPrepare(); void IncCommit(); void IncPendingExecute(); void IncExecute(); void IncExecuteDone(); void BroadCastMsg(); void SendBroadCastMsg(uint32_t num); void SendBroadCastMsgPerRep(); void SeqFail(); void IncTotalRequest(uint32_t num); void IncTotalGeoRequest(uint32_t num); void IncGeoRequest(); void SeqGap(uint64_t seq_gap); // Network in->worker void ServerCall(); void ServerProcess(); void SetPrometheus(const std::string& prometheus_address); protected: Stats(int sleep_time = 5); ~Stats(); private: std::string monitor_port_ = "default"; std::string name_; std::atomic<int> num_call_, run_call_; std::atomic<uint64_t> last_time_, run_time_, run_call_time_; std::thread thread_; std::atomic<bool> begin_; std::atomic<bool> stop_; std::condition_variable cv_; std::mutex mutex_; std::thread global_thread_; std::atomic<uint64_t> num_client_req_, num_propose_, num_prepare_, num_commit_, pending_execute_, execute_, execute_done_; std::atomic<uint64_t> client_call_, socket_recv_; std::atomic<uint64_t> broad_cast_msg_, send_broad_cast_msg_, send_broad_cast_msg_per_rep_; std::atomic<uint64_t> seq_fail_; std::atomic<uint64_t> server_call_, server_process_; std::atomic<uint64_t> run_req_num_; std::atomic<uint64_t> run_req_run_time_; std::atomic<uint64_t> seq_gap_; std::atomic<uint64_t> total_request_, total_geo_request_, geo_request_; int monitor_sleep_time_ = 5; // default 5s. std::thread crow_thread_; bool enable_resview; bool enable_faulty_switch_; VisualData transaction_summary_; std::atomic<bool> make_faulty_; std::atomic<uint64_t> prev_num_prepare_; std::atomic<uint64_t> prev_num_commit_; nlohmann::json summary_json_; nlohmann::json consensus_history_; std::unique_ptr<PrometheusHandler> prometheus_; }; } // namespace resdb