src/runtime/profiler.cpp (402 lines of code) (raw):
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
/*
HELP GRAPH
CALL ===== net(call) ========> ENQUEUE ===== queue(server) ====> START
^ ^ ||
| | ||
| | ||
| | ||
| | ||
Client Latency Server Latency exec(server)
| | ||
| | ||
| | ||
| | ||
V V ||
START<== queue(server) == ENQUEUE <===== net(reply) ======= REPLY <=============================
||
||
exec(client)
||
||
\/
END
*/
#include "runtime/profiler.h"
#include <stddef.h>
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <memory>
#include <string>
#include "aio/aio_task.h"
#include "perf_counter/perf_counter.h"
#include "perf_counter/perf_counter_wrapper.h"
#include "profiler_header.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/task/task.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_spec.h"
#include "utils/config_api.h"
#include "utils/extensible_object.h"
#include "utils/fmt_logging.h"
#include "utils/flags.h"
#include "utils/join_point.h"
namespace dsn {
struct service_spec;
namespace tools {
DSN_DEFINE_bool(task..default, is_profile, false, "whether to profile this kind of task");
DSN_DEFINE_bool(task..default,
collect_call_count,
true,
"whether to collect how many time this kind of tasks invoke each of other kinds "
"tasks");
typedef uint64_extension_helper<task_spec_profiler, task> task_ext_for_profiler;
typedef uint64_extension_helper<task_spec_profiler, message_ex> message_ext_for_profiler;
std::unique_ptr<task_spec_profiler[]> s_spec_profilers;
int s_task_code_max = 0;
// call normal task
static void profiler_on_task_create(task *caller, task *callee)
{
task_ext_for_profiler::get(callee) = dsn_now_ns();
}
static void profiler_on_task_enqueue(task *caller, task *callee)
{
auto callee_code = callee->spec().code;
CHECK(callee_code >= 0 && callee_code <= s_task_code_max, "code = {}", callee_code.code());
if (caller != nullptr) {
auto caller_code = caller->spec().code;
CHECK(caller_code >= 0 && caller_code <= s_task_code_max, "code = {}", caller_code.code());
auto &prof = s_spec_profilers[caller_code];
if (prof.collect_call_count) {
prof.call_counts[callee_code]++;
}
}
task_ext_for_profiler::get(callee) = dsn_now_ns();
if (callee->delay_milliseconds() == 0) {
auto ptr = s_spec_profilers[callee_code].ptr[TASK_IN_QUEUE].get();
if (ptr != nullptr)
ptr->increment();
}
}
static void profiler_on_task_begin(task *this_)
{
auto code = this_->spec().code;
// TODO(yingchun): duplicate checks, should refactor later
CHECK(code >= 0 && code <= s_task_code_max, "code = {}", code.code());
uint64_t &qts = task_ext_for_profiler::get(this_);
uint64_t now = dsn_now_ns();
auto ptr = s_spec_profilers[code].ptr[TASK_QUEUEING_TIME_NS].get();
if (ptr != nullptr)
ptr->set(now - qts);
qts = now;
ptr = s_spec_profilers[code].ptr[TASK_IN_QUEUE].get();
if (ptr != nullptr)
ptr->decrement();
}
static void profiler_on_task_end(task *this_)
{
auto code = this_->spec().code;
CHECK(code >= 0 && code <= s_task_code_max, "code = {}", code.code());
uint64_t qts = task_ext_for_profiler::get(this_);
uint64_t now = dsn_now_ns();
auto ptr = s_spec_profilers[code].ptr[TASK_EXEC_TIME_NS].get();
if (ptr != nullptr)
ptr->set(now - qts);
ptr = s_spec_profilers[code].ptr[TASK_THROUGHPUT].get();
if (ptr != nullptr)
ptr->increment();
}
static void profiler_on_task_cancelled(task *this_)
{
auto code = this_->spec().code;
CHECK(code >= 0 && code <= s_task_code_max, "code = {}", code.code());
auto ptr = s_spec_profilers[code].ptr[TASK_CANCELLED].get();
if (ptr != nullptr)
ptr->increment();
}
static void profiler_on_task_wait_pre(task *caller, task *callee, uint32_t timeout_ms) {}
static void profiler_on_task_wait_post(task *caller, task *callee, bool succ) {}
static void profiler_on_task_cancel_post(task *caller, task *callee, bool succ) {}
// return true means continue, otherwise early terminate with task::set_error_code
static void profiler_on_aio_call(task *caller, aio_task *callee)
{
if (nullptr != caller) {
auto caller_code = caller->spec().code;
CHECK(caller_code >= 0 && caller_code <= s_task_code_max, "code = {}", caller_code.code());
auto &prof = s_spec_profilers[caller_code];
if (prof.collect_call_count) {
auto callee_code = callee->spec().code;
CHECK(callee_code >= 0 && callee_code <= s_task_code_max,
"code = {}",
callee_code.code());
prof.call_counts[callee_code]++;
}
}
// time disk io starts
task_ext_for_profiler::get(callee) = dsn_now_ns();
}
static void profiler_on_aio_enqueue(aio_task *this_)
{
auto code = this_->spec().code;
CHECK(code >= 0 && code <= s_task_code_max, "code = {}", code.code());
uint64_t &ats = task_ext_for_profiler::get(this_);
uint64_t now = dsn_now_ns();
auto ptr = s_spec_profilers[code].ptr[AIO_LATENCY_NS].get();
if (ptr != nullptr)
ptr->set(now - ats);
ats = now;
ptr = s_spec_profilers[code].ptr[TASK_IN_QUEUE].get();
if (ptr != nullptr)
ptr->increment();
}
// return true means continue, otherwise early terminate with task::set_error_code
static void profiler_on_rpc_call(task *caller, message_ex *req, rpc_response_task *callee)
{
if (nullptr != caller) {
auto caller_code = caller->spec().code;
CHECK(caller_code >= 0 && caller_code <= s_task_code_max, "code = {}", caller_code.code());
auto &prof = s_spec_profilers[caller_code];
if (prof.collect_call_count) {
CHECK(req->local_rpc_code >= 0 && req->local_rpc_code <= s_task_code_max,
"code = {}",
req->local_rpc_code.code());
prof.call_counts[req->local_rpc_code]++;
}
}
// time rpc starts
if (nullptr != callee) {
task_ext_for_profiler::get(callee) = dsn_now_ns();
}
}
static void profiler_on_rpc_request_enqueue(rpc_request_task *callee)
{
auto callee_code = callee->spec().code;
CHECK(callee_code >= 0 && callee_code <= s_task_code_max, "code = {}", callee_code.code());
uint64_t now = dsn_now_ns();
task_ext_for_profiler::get(callee) = now;
message_ext_for_profiler::get(callee->get_request()) = now;
auto ptr = s_spec_profilers[callee_code].ptr[TASK_IN_QUEUE].get();
if (ptr != nullptr) {
ptr->increment();
}
ptr = s_spec_profilers[callee_code].ptr[RPC_SERVER_SIZE_PER_REQUEST_IN_BYTES].get();
if (ptr != nullptr) {
ptr->set(callee->get_request()->header->body_length);
}
}
static void profile_on_rpc_task_dropped(rpc_request_task *callee)
{
auto code = callee->spec().code;
auto ptr = s_spec_profilers[code].ptr[RPC_DROPPED_IF_TIMEOUT].get();
if (ptr != nullptr) {
ptr->increment();
}
}
static void profiler_on_rpc_create_response(message_ex *req, message_ex *resp)
{
message_ext_for_profiler::get(resp) = message_ext_for_profiler::get(req);
}
// return true means continue, otherwise early terminate with task::set_error_code
static void profiler_on_rpc_reply(task *caller, message_ex *msg)
{
auto caller_code = caller->spec().code;
CHECK(caller_code >= 0 && caller_code <= s_task_code_max, "code = {}", caller_code.code());
auto &prof = s_spec_profilers[caller_code];
if (prof.collect_call_count) {
CHECK(msg->local_rpc_code >= 0 && msg->local_rpc_code <= s_task_code_max,
"code = {}",
msg->local_rpc_code.code());
prof.call_counts[msg->local_rpc_code]++;
}
uint64_t qts = message_ext_for_profiler::get(msg);
uint64_t now = dsn_now_ns();
task_spec *spec = task_spec::get(msg->local_rpc_code);
CHECK_NOTNULL(spec, "task_spec cannot be null, code = {}", msg->local_rpc_code.code());
auto code = spec->rpc_paired_code;
CHECK(code >= 0 && code <= s_task_code_max, "code = {}", code.code());
auto ptr = s_spec_profilers[code].ptr[RPC_SERVER_LATENCY_NS].get();
if (ptr != nullptr) {
ptr->set(now - qts);
}
ptr = s_spec_profilers[code].ptr[RPC_SERVER_SIZE_PER_RESPONSE_IN_BYTES].get();
if (ptr != nullptr) {
ptr->set(msg->header->body_length);
}
}
static void profiler_on_rpc_response_enqueue(rpc_response_task *resp)
{
auto resp_code = resp->spec().code;
CHECK(resp_code >= 0 && resp_code <= s_task_code_max, "code = {}", resp_code.code());
uint64_t &cts = task_ext_for_profiler::get(resp);
uint64_t now = dsn_now_ns();
if (resp->get_response() != nullptr) {
auto ptr = s_spec_profilers[resp_code].ptr[RPC_CLIENT_NON_TIMEOUT_LATENCY_NS].get();
if (ptr != nullptr)
ptr->set(now - cts);
} else {
auto ptr = s_spec_profilers[resp_code].ptr[RPC_CLIENT_TIMEOUT_THROUGHPUT].get();
if (ptr != nullptr)
ptr->increment();
}
cts = now;
auto ptr = s_spec_profilers[resp_code].ptr[TASK_IN_QUEUE].get();
if (ptr != nullptr)
ptr->increment();
}
void profiler::install(service_spec &)
{
s_task_code_max = dsn::task_code::max();
s_spec_profilers.reset(new task_spec_profiler[s_task_code_max + 1]);
task_ext_for_profiler::register_ext();
message_ext_for_profiler::register_ext();
for (int i = 0; i <= s_task_code_max; i++) {
if (i == TASK_CODE_INVALID)
continue;
std::string name(dsn::task_code(i).to_string());
std::string section_name = std::string("task.") + name;
task_spec *spec = task_spec::get(i);
CHECK_NOTNULL(spec, "");
s_spec_profilers[i].collect_call_count = dsn_config_get_value_bool(
section_name.c_str(),
"collect_call_count",
FLAGS_collect_call_count,
"whether to collect how many time this kind of tasks invoke each of other kinds tasks");
s_spec_profilers[i].call_counts = new std::atomic<int64_t>[ s_task_code_max + 1 ];
std::fill(s_spec_profilers[i].call_counts,
s_spec_profilers[i].call_counts + s_task_code_max + 1,
0);
s_spec_profilers[i].is_profile =
dsn_config_get_value_bool(section_name.c_str(),
"is_profile",
FLAGS_is_profile,
"whether to profile this kind of task");
if (!s_spec_profilers[i].is_profile)
continue;
if (dsn_config_get_value_bool(
section_name.c_str(),
"profiler::inqueue",
true,
"whether to profile the number of this kind of tasks in all queues"))
s_spec_profilers[i].ptr[TASK_IN_QUEUE].init_global_counter(
"zion",
"profiler",
(name + std::string(".inqueue")).c_str(),
COUNTER_TYPE_NUMBER,
"task number in all queues");
if (dsn_config_get_value_bool(section_name.c_str(),
"profiler::queue",
true,
"whether to profile the queuing time of a task"))
s_spec_profilers[i].ptr[TASK_QUEUEING_TIME_NS].init_global_counter(
"zion",
"profiler",
(name + std::string(".queue(ns)")).c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
"latency due to waiting in the queue");
if (dsn_config_get_value_bool(section_name.c_str(),
"profiler::exec",
true,
"whether to profile the executing time of a task"))
s_spec_profilers[i].ptr[TASK_EXEC_TIME_NS].init_global_counter(
"zion",
"profiler",
(name + std::string(".exec(ns)")).c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
"latency due to executing tasks");
if (dsn_config_get_value_bool(section_name.c_str(),
"profiler::qps",
true,
"whether to profile the qps of a task"))
s_spec_profilers[i].ptr[TASK_THROUGHPUT].init_global_counter(
"zion",
"profiler",
(name + std::string(".qps")).c_str(),
COUNTER_TYPE_RATE,
"task numbers per second");
if (dsn_config_get_value_bool(section_name.c_str(),
"profiler::cancelled",
true,
"whether to profile the cancelled times of a task"))
s_spec_profilers[i].ptr[TASK_CANCELLED].init_global_counter(
"zion",
"profiler",
(name + std::string(".cancelled")).c_str(),
COUNTER_TYPE_NUMBER,
"cancelled times of a specific task type");
if (spec->type == dsn_task_type_t::TASK_TYPE_RPC_REQUEST) {
if (dsn_config_get_value_bool(section_name.c_str(),
"profiler::latency.server",
true,
"whether to profile the server latency of a task")) {
s_spec_profilers[i].ptr[RPC_SERVER_LATENCY_NS].init_global_counter(
"zion",
"profiler",
(name + std::string(".latency.server")).c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
"latency from enqueue point to reply point on the server side for RPC "
"tasks");
}
if (dsn_config_get_value_bool(section_name.c_str(),
"profiler::size.request.server",
false,
"whether to profile the size per request")) {
s_spec_profilers[i].ptr[RPC_SERVER_SIZE_PER_REQUEST_IN_BYTES].init_global_counter(
"zion",
"profiler",
(name + std::string(".size.request.server")).c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
"");
}
if (dsn_config_get_value_bool(section_name.c_str(),
"profiler::size.response.server",
false,
"whether to profile the size per response")) {
s_spec_profilers[i].ptr[RPC_SERVER_SIZE_PER_RESPONSE_IN_BYTES].init_global_counter(
"zion",
"profiler",
(name + std::string(".size.response.server")).c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
"");
}
if (dsn_config_get_value_bool(
section_name.c_str(),
"rpc_request_dropped_before_execution_when_timeout",
false,
"whether to profile the number of rpc dropped for timeout"))
s_spec_profilers[i].ptr[RPC_DROPPED_IF_TIMEOUT].init_global_counter(
"zion",
"profiler",
(name + std::string(".rpc.dropped")).c_str(),
COUNTER_TYPE_VOLATILE_NUMBER,
"rpc dropped if queue time exceed client timeout");
} else if (spec->type == dsn_task_type_t::TASK_TYPE_RPC_RESPONSE) {
if (dsn_config_get_value_bool(section_name.c_str(),
"profiler::latency.client",
true,
"whether to profile the client latency of a task"))
s_spec_profilers[i].ptr[RPC_CLIENT_NON_TIMEOUT_LATENCY_NS].init_global_counter(
"zion",
"profiler",
(name + std::string(".latency.client(ns)")).c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
"latency from call point to enqueue point on the client side for RPC "
"tasks");
if (dsn_config_get_value_bool(section_name.c_str(),
"profiler::timeout.qps",
true,
"whether to profile the timeout qps of a task"))
s_spec_profilers[i].ptr[RPC_CLIENT_TIMEOUT_THROUGHPUT].init_global_counter(
"zion",
"profiler",
(name + std::string(".timeout.qps")).c_str(),
COUNTER_TYPE_RATE,
"time-out task numbers per second for RPC tasks");
} else if (spec->type == dsn_task_type_t::TASK_TYPE_AIO) {
if (dsn_config_get_value_bool(section_name.c_str(),
"profiler::latency",
true,
"whether to profile the latency of an AIO task"))
s_spec_profilers[i].ptr[AIO_LATENCY_NS].init_global_counter(
"zion",
"profiler",
(name + std::string(".latency(ns)")).c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
"latency from call point to enqueue point for AIO tasks");
}
// we don't use perf_counter_ptr but perf_counter* in ptr[xxx] to avoid unnecessary memory
// access cost
// we need to add reference so that the counters won't go
// release_ref should be done when the profiler exits (which never happens right now so we
// omit that for the time being)
for (size_t j = 0; j < sizeof(s_spec_profilers[i].ptr) / sizeof(perf_counter *); j++) {
if (s_spec_profilers[i].ptr[j].get() != nullptr) {
s_spec_profilers[i].ptr[j]->add_ref();
}
}
spec->on_task_create.put_back(profiler_on_task_create, "profiler");
spec->on_task_enqueue.put_back(profiler_on_task_enqueue, "profiler");
spec->on_task_begin.put_back(profiler_on_task_begin, "profiler");
spec->on_task_end.put_back(profiler_on_task_end, "profiler");
spec->on_task_cancelled.put_back(profiler_on_task_cancelled, "profiler");
spec->on_task_wait_pre.put_back(profiler_on_task_wait_pre, "profiler");
spec->on_task_wait_post.put_back(profiler_on_task_wait_post, "profiler");
spec->on_task_cancel_post.put_back(profiler_on_task_cancel_post, "profiler");
spec->on_aio_call.put_back(profiler_on_aio_call, "profiler");
spec->on_aio_enqueue.put_back(profiler_on_aio_enqueue, "profiler");
spec->on_rpc_call.put_back(profiler_on_rpc_call, "profiler");
spec->on_rpc_request_enqueue.put_back(profiler_on_rpc_request_enqueue, "profiler");
spec->on_rpc_task_dropped.put_back(profile_on_rpc_task_dropped, "profiler");
spec->on_rpc_create_response.put_back(profiler_on_rpc_create_response, "profiler");
spec->on_rpc_reply.put_back(profiler_on_rpc_reply, "profiler");
spec->on_rpc_response_enqueue.put_back(profiler_on_rpc_response_enqueue, "profiler");
}
}
profiler::profiler(const char *name) : toollet(name) {}
} // namespace tools
} // namespace dsn