maga_transformer/cpp/metrics/RtpLLMMetrics.h (394 lines of code) (raw):
#pragma once
#include "autil/Log.h"
#include "kmonitor/client/MetricsReporter.h"
#include <chrono>
#include <cstdint>
#include <thread>
#include <unistd.h>
namespace kmonitor {
class MetricsTags;
class MutableMetric;
} // namespace kmonitor
namespace rtp_llm {
class RpcMetricsCollector final {
public:
// rpc server metrics
bool qps = false;
bool cancel_qps = false;
bool error_qps = false;
int64_t onflight_request = 0;
int64_t total_rt_us = 0;
// pd-sep prefill and decode metrics
int retry_times = 0;
int loading_cache_request = 0;
// pd-sep prefill metrics
int64_t get_rpc_connection_rt_us = 0;
int64_t multimodal_process_rt_us = 0;
int64_t remote_allocate_resource_rt_us = 0;
int64_t enqueue_request_rt_us = 0;
int64_t remote_load_cache_start_rt_us = 0;
int64_t poll_local_output_rt_us = 0;
int64_t remote_load_cache_end_rt_us = 0;
int64_t remote_generate_rt_us = 0;
int64_t poll_remote_output_rt_us = 0;
// pd-sep decode stage metrics
int64_t prepare_generate_context_rt_us = 0;
int64_t allocate_resource_rt_us = 0;
int64_t load_cache_from_prefill_rt_us = 0;
int64_t local_generate_rt_us = 0;
// for decode tp
int64_t load_cache_min_rt_us = 0;
int64_t load_cache_max_rt_us = 0;
int64_t load_cache_polling_cost_us = 0;
};
class RpcMetrics: public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager* manager) override;
void report(const kmonitor::MetricsTags* tags, RpcMetricsCollector* collector);
public:
kmonitor::MutableMetric* qps_metric = nullptr;
kmonitor::MutableMetric* cancel_qps_metric = nullptr;
kmonitor::MutableMetric* error_qps_metric = nullptr;
kmonitor::MutableMetric* onflight_request_metric = nullptr;
kmonitor::MutableMetric* total_rt_us_metric = nullptr;
kmonitor::MutableMetric* retry_times_metric = nullptr;
kmonitor::MutableMetric* loading_cache_request_metric = nullptr;
kmonitor::MutableMetric* get_rpc_connection_rt_us_metric = nullptr;
kmonitor::MutableMetric* multimodal_process_rt_us_metric = nullptr;
kmonitor::MutableMetric* remote_allocate_resource_rt_us_metric = nullptr;
kmonitor::MutableMetric* enqueue_request_rt_us_metric = nullptr;
kmonitor::MutableMetric* remote_load_cache_start_rt_us_metric = nullptr;
kmonitor::MutableMetric* poll_local_output_rt_us_metric = nullptr;
kmonitor::MutableMetric* remote_load_cache_end_rt_us_metric = nullptr;
kmonitor::MutableMetric* remote_generate_rt_us_metric = nullptr;
kmonitor::MutableMetric* poll_remote_output_rt_us_metric = nullptr;
kmonitor::MutableMetric* prepare_generate_context_rt_us_metric = nullptr;
kmonitor::MutableMetric* allocate_resource_rt_us_metric = nullptr;
kmonitor::MutableMetric* load_cache_from_prefill_rt_us_metric = nullptr;
kmonitor::MutableMetric* local_generate_rt_us_metric = nullptr;
kmonitor::MutableMetric* load_cache_min_rt_us_metric = nullptr;
kmonitor::MutableMetric* load_cache_max_rt_us_metric = nullptr;
kmonitor::MutableMetric* load_cache_polling_cost_us_metric = nullptr;
private:
AUTIL_LOG_DECLARE();
};
class RtpLLMStreamMetricsCollector final {
public:
bool qps = false;
bool cancel_qps = false;
bool error_qps = false;
bool is_streaming_qps = false;
bool not_streaming_qps = true;
int64_t total_latency_us = 0;
int64_t first_token_latency_us = 0;
int64_t wait_latency_us = 0;
int64_t pause_latency_us = 0;
int64_t iterate_count = 0;
int64_t reuse_length = 0;
int64_t input_token_length = 0;
int64_t output_token_length = 0;
// for timeout
int64_t timeout_latency_us = 0;
int64_t query_batch_size = 0;
int64_t fallback_tokens = 0;
int64_t fallback_times = 0;
int32_t batch_with_prefill_times = 0;
int32_t batch_with_prefill_len = 0;
int32_t malloc_failed_times = 0;
};
class RtpLLMStreamMetrics: public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager* manager) override;
void report(const kmonitor::MetricsTags* tags, RtpLLMStreamMetricsCollector* collector);
public:
kmonitor::MutableMetric* qps_metric = nullptr;
kmonitor::MutableMetric* cancel_qps_metric = nullptr;
kmonitor::MutableMetric* error_qps_metric = nullptr;
kmonitor::MutableMetric* is_streaming_qps_metric = nullptr;
kmonitor::MutableMetric* not_streaming_qps_metric = nullptr;
kmonitor::MutableMetric* total_latency_us_metric = nullptr;
kmonitor::MutableMetric* first_token_latency_us_metric = nullptr;
kmonitor::MutableMetric* wait_latency_us_metric = nullptr;
kmonitor::MutableMetric* pause_latency_us_metric = nullptr;
kmonitor::MutableMetric* iterate_count_metric = nullptr;
kmonitor::MutableMetric* reuse_length_metric = nullptr;
kmonitor::MutableMetric* input_token_length_metric = nullptr;
kmonitor::MutableMetric* output_token_length_metric = nullptr;
kmonitor::MutableMetric* query_batch_size_metric = nullptr;
kmonitor::MutableMetric* fallback_tokens_metric = nullptr;
kmonitor::MutableMetric* fallback_times_metric = nullptr;
kmonitor::MutableMetric* batch_with_prefill_times_metric = nullptr;
kmonitor::MutableMetric* batch_with_prefill_len_metric = nullptr;
kmonitor::MutableMetric* timeout_latency_us_metric = nullptr;
kmonitor::MutableMetric* malloc_failed_times_metric = nullptr;
private:
AUTIL_LOG_DECLARE();
};
// corresponding to python metrics
class RtpEmbeddingGlobalMetricsCollector final {
public:
bool error = false;
double total_latency_us = 0;
};
class RtpEmbeddingGlobalMetrics: public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager* manager) override;
void report(const kmonitor::MetricsTags* tags, RtpEmbeddingGlobalMetricsCollector* collector);
public:
kmonitor::MutableMetric* qps_metric = nullptr;
kmonitor::MutableMetric* success_qps_metric = nullptr;
kmonitor::MutableMetric* error_qps_metric = nullptr;
kmonitor::MutableMetric* total_latency_us_metric = nullptr;
private:
AUTIL_LOG_DECLARE();
};
class RtpEmbeddingStreamMetricsCollector final {
public:
int64_t total_latency_us = 0;
int64_t wait_latency_us = 0;
int64_t input_token_length = 0;
};
class RtpEmbeddingStreamMetrics: public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager* manager) override;
void report(const kmonitor::MetricsTags* tags, RtpEmbeddingStreamMetricsCollector* collector);
public:
kmonitor::MutableMetric* total_latency_us_metric = nullptr;
kmonitor::MutableMetric* wait_latency_us_metric = nullptr;
kmonitor::MutableMetric* input_token_length_metric = nullptr;
private:
AUTIL_LOG_DECLARE();
};
class RtpLLMSchedulerMetricsCollector final {
public:
int64_t wait_stream_size = 0;
int64_t running_stream_size = 0;
int64_t remote_running_stream_size = 0;
int64_t fallback_stream_size = 0;
};
class RtpLLMSchedulerMetrics: public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager* manager) override;
void report(const kmonitor::MetricsTags* tags, RtpLLMSchedulerMetricsCollector* collector);
public:
kmonitor::MutableMetric* wait_stream_size_metric = nullptr;
kmonitor::MutableMetric* running_stream_size_metric = nullptr;
kmonitor::MutableMetric* remote_running_stream_size_metric = nullptr;
kmonitor::MutableMetric* fallback_stream_size_metric = nullptr;
private:
AUTIL_LOG_DECLARE();
};
class RtpLLMEngineMetricsCollector final {
public:
bool update_lora_qps = false;
bool error_update_lora_qps = false;
int64_t step_latency_us = 0;
};
class RtpLLMEngineMetrics: public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager* manager) override;
void report(const kmonitor::MetricsTags* tags, RtpLLMEngineMetricsCollector* collector);
public:
kmonitor::MutableMetric* step_latency_us_metric = nullptr;
kmonitor::MutableMetric* update_lora_qps_metric = nullptr;
kmonitor::MutableMetric* error_update_lora_qps_metric = nullptr;
private:
AUTIL_LOG_DECLARE();
};
class RtpLLMTokenPSMetricsCollector final {
public:
void merge(const RtpLLMTokenPSMetricsCollector* collector) {
if (collector) {
context_tps += collector->context_tps;
generate_tps += collector->generate_tps;
total_tps += collector->total_tps;
}
}
public:
int64_t context_tps = 0;
int64_t generate_tps = 0;
int64_t total_tps = 0;
};
class RtpLLMTokenPSMetrics: public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager* manager) override;
void report(const kmonitor::MetricsTags* tags, RtpLLMTokenPSMetricsCollector* collector);
public:
kmonitor::MutableMetric* context_tps_metric = nullptr;
kmonitor::MutableMetric* generate_tps_metric = nullptr;
kmonitor::MutableMetric* total_tps_metric = nullptr;
private:
AUTIL_LOG_DECLARE();
};
template<typename MetricsType, typename CollectType>
class MetricsLoopReporter {
public:
explicit MetricsLoopReporter(const kmonitor::MetricsReporterPtr metrics_reporter, int interval_ms = 1000)
:collector_(CollectType()),
interval_ms_(interval_ms),
metrics_reporter_(metrics_reporter) {
if (metrics_reporter_) {
metrics_reporter_thread_ = std::thread(&MetricsLoopReporter<MetricsType, CollectType>::reportLoop, this);
}
}
~MetricsLoopReporter() {
stop_ = true;
if (metrics_reporter_thread_.joinable()) {
metrics_reporter_thread_.join();
}
}
void report(const CollectType *collector) {
std::lock_guard<std::mutex> lock(mutex_);
collector_.merge(collector);
}
private:
void reportLoop() {
while (metrics_reporter_ && !stop_) {
{
std::lock_guard<std::mutex> lock(mutex_);
metrics_reporter_->report<MetricsType, CollectType>(nullptr, &collector_);
collector_ = CollectType();
}
std::this_thread::sleep_for(std::chrono::milliseconds(interval_ms_));
}
}
private:
std::mutex mutex_;
bool stop_ = false;
CollectType collector_;
int interval_ms_ = 1000;
std::thread metrics_reporter_thread_;
kmonitor::MetricsReporterPtr metrics_reporter_ = nullptr;
};
class RtpLLMExecutorMetricsCollector final {
public:
int64_t context_batch_size = 0;
int64_t generate_batch_size = 0;
int64_t context_batch_size_when_has_context = 0;
int64_t generate_batch_size_when_has_context = 0;
int64_t execute_token_size_when_has_context = 0;
int64_t max_seq_len_when_has_context = 0;
int64_t execute_token_size = 0;
int64_t max_seq_len = 0;
int64_t gather_model_input_us = 0;
int64_t tp_sync_input_us = 0;
int64_t model_forward_us = 0;
int64_t sample_input_us = 0;
int64_t dispatch_output_us = 0;
// eplb metrics
int64_t eplb_step_latency_us = 0;
};
class RtpLLMExecutorMetrics: public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager* manager) override;
void report(const kmonitor::MetricsTags* tags, RtpLLMExecutorMetricsCollector* collector);
public:
kmonitor::MutableMetric* context_batch_size_metric = nullptr;
kmonitor::MutableMetric* generate_batch_size_metric = nullptr;
kmonitor::MutableMetric* context_batch_size_when_has_context_metric = nullptr;
kmonitor::MutableMetric* generate_batch_size_when_has_context_metric = nullptr;
kmonitor::MutableMetric* execute_token_size_when_has_context_metric = nullptr;
kmonitor::MutableMetric* max_seq_len_when_has_context_metric = nullptr;
kmonitor::MutableMetric* execute_token_size_metric = nullptr;
kmonitor::MutableMetric* max_seq_len_metric = nullptr;
kmonitor::MutableMetric* gather_model_input_us_metric = nullptr;
kmonitor::MutableMetric* tp_sync_input_us_metric = nullptr;
kmonitor::MutableMetric* model_forward_us_metric = nullptr;
kmonitor::MutableMetric* sample_input_us_metric = nullptr;
kmonitor::MutableMetric* dispatch_output_us_metric = nullptr;
// eplb metrics
kmonitor::MutableMetric* eplb_step_latency_us_metric = nullptr;
private:
AUTIL_LOG_DECLARE();
};
class RtpLLMCacheMetricsCollector final {
public:
int64_t kv_cache_item_num = 0;
int64_t kv_cache_free_blocks = 0;
int64_t kv_cache_available_blocks = 0;
int64_t kv_cache_left_seq = 0;
float kv_cache_used_ratio = 0;
int64_t mr_cost_time_ms = 0;
};
class RtpLLMCacheMetrics: public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager* manager) override;
void report(const kmonitor::MetricsTags* tags, RtpLLMCacheMetricsCollector* collector);
public:
kmonitor::MutableMetric* kv_cache_item_num_metric = nullptr;
kmonitor::MutableMetric* kv_cache_free_blocks_metric = nullptr;
kmonitor::MutableMetric* kv_cache_available_blocks_metric = nullptr;
kmonitor::MutableMetric* kv_cache_left_seq_metric = nullptr;
kmonitor::MutableMetric* kv_cache_used_ratio_metric = nullptr;
kmonitor::MutableMetric* mr_cost_time_ms_metric = nullptr;
private:
AUTIL_LOG_DECLARE();
};
class RtpLLMCacheReuseMetricsCollector final {
public:
int64_t match_cost_time_us = 0;
int64_t kv_cache_reuse_length = 0;
};
class RtpLLMCacheReuseMetrics: public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager* manager) override;
void report(const kmonitor::MetricsTags* tags, RtpLLMCacheReuseMetricsCollector* collector);
public:
kmonitor::MutableMetric* match_cost_time_us = nullptr;
kmonitor::MutableMetric* kv_cache_reuse_length = nullptr;
private:
AUTIL_LOG_DECLARE();
};
class RtpLLMKernelMetricsCollector final {
public:
float kernel_exec_time = 0;
};
class RtpLLMKernelMetrics: public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager* manager) override;
void report(const kmonitor::MetricsTags* tags, RtpLLMKernelMetricsCollector* collector);
public:
kmonitor::MutableMetric* kernel_exec_time_metric = nullptr;
private:
AUTIL_LOG_DECLARE();
};
class RtpLLMSpeculativeEngineMetricsCollector final {
public:
int64_t step_latency_us = 0;
int64_t propose_step_latency_us = 0;
int64_t score_step_latency_us = 0;
int64_t speculative_sampler_latency_us = 0;
int64_t updater_step_latency_us = 0;
int64_t total_propose_token_num = 0;
int64_t total_accepted_token_num = 0;
};
class RtpLLMSpeculativeEngineMetrics: public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager* manager) override;
void report(const kmonitor::MetricsTags* tags, RtpLLMSpeculativeEngineMetricsCollector* collector);
public:
kmonitor::MutableMetric* step_latency_us_metric = nullptr;
kmonitor::MutableMetric* propose_step_latency_us_metric = nullptr;
kmonitor::MutableMetric* score_step_latency_us_metric = nullptr;
kmonitor::MutableMetric* speculative_sampler_latency_us_metric = nullptr;
kmonitor::MutableMetric* updater_step_latency_us_metric = nullptr;
kmonitor::MutableMetric* total_propose_token_num_metric = nullptr;
kmonitor::MutableMetric* total_accepted_token_num_metric = nullptr;
private:
AUTIL_LOG_DECLARE();
};
class RtpLLmEplbMetricsCollector final {
public:
int64_t ep_rank;
int64_t update_layer_id;
int64_t update_weights_latency_ms;
bool update_weights_qps;
std::vector<int64_t> gpu_loads;
};
class RtpLLmEplbMetrics: public kmonitor::MetricsGroup {
public:
bool init(kmonitor::MetricsGroupManager* manager) override;
void report(const kmonitor::MetricsTags* tags, RtpLLmEplbMetricsCollector* collector);
public:
kmonitor::MutableMetric* update_weights_qps_metric = nullptr;
kmonitor::MutableMetric* update_layer_weights_qps_metric = nullptr;
kmonitor::MutableMetric* update_weights_latency_ms_metric = nullptr;
kmonitor::MutableMetric* gpu_loads_metric = nullptr;
private:
AUTIL_LOG_DECLARE();
};
bool initKmonitorFactory();
void stopKmonitorFactory();
kmonitor::MetricsTags getHippoTags();
} // namespace rtp_llm