maga_transformer/cpp/metrics/RtpLLMMetrics.cc (360 lines of code) (raw):

#include "maga_transformer/cpp/metrics/RtpLLMMetrics.h" #include "autil/EnvUtil.h" #include "maga_transformer/cpp/utils/Logger.h" #include "kmonitor/client/KMonitorFactory.h" #include "maga_transformer/cpp/metrics/KmonParam.h" namespace rtp_llm { AUTIL_LOG_SETUP(rtp_llm, RpcMetrics); AUTIL_LOG_SETUP(rtp_llm, RtpLLMStreamMetrics); AUTIL_LOG_SETUP(rtp_llm, RtpEmbeddingGlobalMetrics); AUTIL_LOG_SETUP(rtp_llm, RtpEmbeddingStreamMetrics); AUTIL_LOG_SETUP(rtp_llm, RtpLLMSchedulerMetrics); AUTIL_LOG_SETUP(rtp_llm, RtpLLMCacheMetrics); AUTIL_LOG_SETUP(rtp_llm, RtpLLMCacheReuseMetrics); AUTIL_LOG_SETUP(rtp_llm, RtpLLMExecutorMetrics); AUTIL_LOG_SETUP(rtp_llm, RtpLLMTokenPSMetrics); AUTIL_LOG_SETUP(rtp_llm, RtpLLMEngineMetrics); AUTIL_LOG_SETUP(rtp_llm, RtpLLMKernelMetrics); AUTIL_LOG_SETUP(rtp_llm, RtpLLMSpeculativeEngineMetrics); AUTIL_LOG_SETUP(rtp_llm, RtpLLmEplbMetrics); #define REPORT_QPS(name) \ if (collector->name) { \ REPORT_MUTABLE_QPS(name##_metric); \ } #define REPORT_GAUGE(name) \ if (collector->name) { \ REPORT_MUTABLE_METRIC(name##_metric, collector->name); \ } bool RpcMetrics::init(kmonitor::MetricsGroupManager* manager) { REGISTER_QPS_MUTABLE_METRIC(qps_metric, "rtp_llm_rpc_qps"); REGISTER_QPS_MUTABLE_METRIC(error_qps_metric, "rtp_llm_rpc_error_qps"); REGISTER_QPS_MUTABLE_METRIC(cancel_qps_metric, "rtp_llm_rpc_cancel_qps"); REGISTER_GAUGE_MUTABLE_METRIC(onflight_request_metric, "rtp_llm_rpc_onflight_request"); REGISTER_GAUGE_MUTABLE_METRIC(total_rt_us_metric, "rtp_llm_rpc_total_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(retry_times_metric, "rtp_llm_rpc_retry_times"); REGISTER_GAUGE_MUTABLE_METRIC(loading_cache_request_metric, "rtp_llm_rpc_loading_cache_request"); REGISTER_GAUGE_MUTABLE_METRIC(get_rpc_connection_rt_us_metric, "rtp_llm_rpc_get_rpc_connection_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(multimodal_process_rt_us_metric, "rtp_llm_rpc_multimodal_process_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(remote_allocate_resource_rt_us_metric, "rtp_llm_rpc_remote_allocate_resource_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(enqueue_request_rt_us_metric, "rtp_llm_rpc_enqueue_request_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(remote_load_cache_start_rt_us_metric, "rtp_llm_rpc_remote_load_cache_start_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(poll_local_output_rt_us_metric, "rtp_llm_rpc_poll_local_output_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(remote_load_cache_end_rt_us_metric, "rtp_llm_rpc_remote_load_cache_end_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(remote_generate_rt_us_metric, "rtp_llm_rpc_remote_generate_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(poll_remote_output_rt_us_metric, "rtp_llm_rpc_poll_remote_output_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(prepare_generate_context_rt_us_metric, "rtp_llm_rpc_prepare_generate_context_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(allocate_resource_rt_us_metric, "rtp_llm_rpc_allocate_resource_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(load_cache_from_prefill_rt_us_metric, "rtp_llm_rpc_load_cache_from_prefill_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(local_generate_rt_us_metric, "rtp_llm_rpc_local_generate_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(load_cache_min_rt_us_metric, "rtp_llm_rpc_load_cache_min_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(load_cache_max_rt_us_metric, "rtp_llm_rpc_load_cache_max_rt_us"); REGISTER_GAUGE_MUTABLE_METRIC(load_cache_polling_cost_us_metric, "rtp_llm_rpc_load_cache_polling_cost_us"); return true; } void RpcMetrics::report(const kmonitor::MetricsTags* tags, RpcMetricsCollector* collector) { REPORT_QPS(qps); REPORT_QPS(cancel_qps); REPORT_QPS(error_qps); REPORT_GAUGE(onflight_request); REPORT_GAUGE(total_rt_us); REPORT_GAUGE(retry_times); REPORT_GAUGE(loading_cache_request); REPORT_GAUGE(get_rpc_connection_rt_us); REPORT_GAUGE(multimodal_process_rt_us); REPORT_GAUGE(remote_allocate_resource_rt_us); REPORT_GAUGE(enqueue_request_rt_us); REPORT_GAUGE(remote_load_cache_start_rt_us); REPORT_GAUGE(poll_local_output_rt_us); REPORT_GAUGE(remote_load_cache_end_rt_us); REPORT_GAUGE(remote_generate_rt_us); REPORT_GAUGE(poll_remote_output_rt_us); REPORT_GAUGE(prepare_generate_context_rt_us); REPORT_GAUGE(allocate_resource_rt_us); REPORT_GAUGE(load_cache_from_prefill_rt_us); REPORT_GAUGE(local_generate_rt_us); REPORT_GAUGE(load_cache_min_rt_us); REPORT_GAUGE(load_cache_max_rt_us); REPORT_GAUGE(load_cache_polling_cost_us); } bool RtpLLMStreamMetrics::init(kmonitor::MetricsGroupManager* manager) { REGISTER_QPS_MUTABLE_METRIC(qps_metric, "rtp_llm_framework_qps"); REGISTER_QPS_MUTABLE_METRIC(error_qps_metric, "rtp_llm_framework_error_qps"); REGISTER_QPS_MUTABLE_METRIC(cancel_qps_metric, "rtp_llm_cancel_qps"); REGISTER_QPS_MUTABLE_METRIC(is_streaming_qps_metric, "rtp_llm_is_streaming_qps"); REGISTER_QPS_MUTABLE_METRIC(not_streaming_qps_metric, "rtp_llm_not_streaming_qps"); REGISTER_GAUGE_MUTABLE_METRIC(total_latency_us_metric, "rtp_llm_latency_us"); REGISTER_GAUGE_MUTABLE_METRIC(first_token_latency_us_metric, "rtp_llm_first_token_latency_us"); REGISTER_GAUGE_MUTABLE_METRIC(wait_latency_us_metric, "rtp_llm_wait_latency_us"); REGISTER_GAUGE_MUTABLE_METRIC(pause_latency_us_metric, "rtp_llm_pause_latency_us"); REGISTER_GAUGE_MUTABLE_METRIC(iterate_count_metric, "rtp_llm_iterate_count"); REGISTER_GAUGE_MUTABLE_METRIC(reuse_length_metric, "rtp_llm_reuse_length"); REGISTER_GAUGE_MUTABLE_METRIC(input_token_length_metric, "rtp_llm_input_token_length"); REGISTER_GAUGE_MUTABLE_METRIC(output_token_length_metric, "rtp_llm_output_token_length"); REGISTER_GAUGE_MUTABLE_METRIC(timeout_latency_us_metric, "rtp_llm_timeout_lantency_us"); REGISTER_GAUGE_MUTABLE_METRIC(query_batch_size_metric, "rtp_llm_query_batch_size"); REGISTER_GAUGE_MUTABLE_METRIC(fallback_tokens_metric, "rtp_llm_fallback_tokens"); REGISTER_GAUGE_MUTABLE_METRIC(fallback_times_metric, "rtp_llm_fallback_times"); REGISTER_GAUGE_MUTABLE_METRIC(batch_with_prefill_times_metric, "rtp_llm_batch_with_prefill_times"); REGISTER_GAUGE_MUTABLE_METRIC(batch_with_prefill_len_metric, "rtp_llm_batch_with_prefill_len"); REGISTER_GAUGE_MUTABLE_METRIC(malloc_failed_times_metric, "rtp_llm_malloc_failed_times"); return true; } void RtpLLMStreamMetrics::report(const kmonitor::MetricsTags* tags, RtpLLMStreamMetricsCollector* collector) { REPORT_QPS(qps); REPORT_QPS(cancel_qps); REPORT_QPS(error_qps); REPORT_QPS(is_streaming_qps); REPORT_QPS(not_streaming_qps); REPORT_GAUGE(total_latency_us); REPORT_GAUGE(first_token_latency_us); REPORT_GAUGE(wait_latency_us); REPORT_GAUGE(pause_latency_us); REPORT_GAUGE(iterate_count); REPORT_GAUGE(reuse_length); REPORT_GAUGE(input_token_length); REPORT_GAUGE(output_token_length); REPORT_GAUGE(timeout_latency_us); REPORT_GAUGE(query_batch_size); REPORT_GAUGE(fallback_tokens); REPORT_GAUGE(fallback_times); REPORT_GAUGE(batch_with_prefill_times); REPORT_GAUGE(batch_with_prefill_len); REPORT_GAUGE(malloc_failed_times); } // for rpc request bool RtpEmbeddingGlobalMetrics::init(kmonitor::MetricsGroupManager* manager) { REGISTER_GAUGE_MUTABLE_METRIC(total_latency_us_metric, "py_rtp_framework_rt"); REGISTER_QPS_MUTABLE_METRIC(error_qps_metric, "py_rtp_framework_error_qps"); REGISTER_QPS_MUTABLE_METRIC(qps_metric, "py_rtp_framework_qps"); REGISTER_QPS_MUTABLE_METRIC(success_qps_metric, "py_rtp_success_qps_metric"); return true; } void RtpEmbeddingGlobalMetrics::report(const kmonitor::MetricsTags* tags, RtpEmbeddingGlobalMetricsCollector* collector) { REPORT_MUTABLE_QPS(qps_metric); if (collector->error) { REPORT_MUTABLE_QPS(error_qps_metric); } else { REPORT_MUTABLE_QPS(success_qps_metric); REPORT_GAUGE(total_latency_us); } } bool RtpEmbeddingStreamMetrics::init(kmonitor::MetricsGroupManager* manager) { REGISTER_GAUGE_MUTABLE_METRIC(total_latency_us_metric, "rtp_llm_latency_us"); REGISTER_GAUGE_MUTABLE_METRIC(wait_latency_us_metric, "rtp_llm_wait_latency_us"); REGISTER_GAUGE_MUTABLE_METRIC(input_token_length_metric, "rtp_llm_input_token_length"); return true; } void RtpEmbeddingStreamMetrics::report(const kmonitor::MetricsTags* tags, RtpEmbeddingStreamMetricsCollector* collector) { REPORT_GAUGE(total_latency_us); REPORT_GAUGE(wait_latency_us); REPORT_GAUGE(input_token_length); } bool RtpLLMSchedulerMetrics::init(kmonitor::MetricsGroupManager* manager) { REGISTER_GAUGE_MUTABLE_METRIC(wait_stream_size_metric, "rtp_llm_wait_stream_size"); REGISTER_GAUGE_MUTABLE_METRIC(running_stream_size_metric, "rtp_llm_running_stream_size"); REGISTER_GAUGE_MUTABLE_METRIC(remote_running_stream_size_metric, "rtp_llm_remote_running_stream_size"); REGISTER_GAUGE_MUTABLE_METRIC(fallback_stream_size_metric, "rtp_llm_fallback_stream_size"); return true; } void RtpLLMSchedulerMetrics::report(const kmonitor::MetricsTags* tags, RtpLLMSchedulerMetricsCollector* collector) { REPORT_MUTABLE_METRIC(wait_stream_size_metric, collector->wait_stream_size); REPORT_MUTABLE_METRIC(running_stream_size_metric, collector->running_stream_size); REPORT_MUTABLE_METRIC(remote_running_stream_size_metric, collector->remote_running_stream_size); REPORT_MUTABLE_METRIC(fallback_stream_size_metric, collector->fallback_stream_size); } bool RtpLLMEngineMetrics::init(kmonitor::MetricsGroupManager* manager) { REGISTER_QPS_MUTABLE_METRIC(update_lora_qps_metric, "rtp_llm_update_lora_qps"); REGISTER_QPS_MUTABLE_METRIC(error_update_lora_qps_metric, "rtp_llm_error_update_lora_qps"); REGISTER_GAUGE_MUTABLE_METRIC(step_latency_us_metric, "rtp_llm_step_latency_us"); return true; } void RtpLLMEngineMetrics::report(const kmonitor::MetricsTags* tags, RtpLLMEngineMetricsCollector* collector) { REPORT_QPS(update_lora_qps); REPORT_QPS(error_update_lora_qps); REPORT_GAUGE(step_latency_us); } bool RtpLLMExecutorMetrics::init(kmonitor::MetricsGroupManager* manager) { REGISTER_GAUGE_MUTABLE_METRIC(context_batch_size_metric, "rtp_llm_context_batch_size"); REGISTER_GAUGE_MUTABLE_METRIC(generate_batch_size_metric, "rtp_llm_generate_batch_size"); REGISTER_GAUGE_MUTABLE_METRIC(context_batch_size_when_has_context_metric, "rtp_llm_context_batch_size_when_has_context"); REGISTER_GAUGE_MUTABLE_METRIC(generate_batch_size_when_has_context_metric, "rtp_llm_generate_batch_size_when_has_context"); REGISTER_GAUGE_MUTABLE_METRIC(execute_token_size_when_has_context_metric, "rtp_llm_execute_token_size_when_has_context"); REGISTER_GAUGE_MUTABLE_METRIC(max_seq_len_when_has_context_metric, "rtp_llm_max_seq_len_when_has_context"); REGISTER_GAUGE_MUTABLE_METRIC(execute_token_size_metric, "rtp_llm_execute_token_size"); REGISTER_GAUGE_MUTABLE_METRIC(max_seq_len_metric, "rtp_llm_max_seq_len"); REGISTER_GAUGE_MUTABLE_METRIC(gather_model_input_us_metric, "rtp_llm_gather_model_input_us"); REGISTER_GAUGE_MUTABLE_METRIC(tp_sync_input_us_metric, "rtp_llm_tp_sync_input_us"); REGISTER_GAUGE_MUTABLE_METRIC(model_forward_us_metric, "rtp_llm_model_forward_us"); REGISTER_GAUGE_MUTABLE_METRIC(sample_input_us_metric, "rtp_llm_sample_input_us"); REGISTER_GAUGE_MUTABLE_METRIC(dispatch_output_us_metric, "rtp_llm_dispatch_output_us_metric"); REGISTER_GAUGE_MUTABLE_METRIC(eplb_step_latency_us_metric, "rtp_llm_eplb_step_latency_us"); return true; } void RtpLLMExecutorMetrics::report(const kmonitor::MetricsTags* tags, RtpLLMExecutorMetricsCollector* collector) { REPORT_GAUGE(context_batch_size); REPORT_GAUGE(generate_batch_size); if (collector->context_batch_size != 0) { REPORT_GAUGE(context_batch_size_when_has_context); REPORT_GAUGE(generate_batch_size_when_has_context); REPORT_GAUGE(execute_token_size_when_has_context); REPORT_GAUGE(max_seq_len_when_has_context); } REPORT_GAUGE(execute_token_size); REPORT_GAUGE(max_seq_len); REPORT_GAUGE(gather_model_input_us); REPORT_GAUGE(tp_sync_input_us); REPORT_GAUGE(model_forward_us); REPORT_GAUGE(sample_input_us); REPORT_GAUGE(dispatch_output_us); REPORT_GAUGE(eplb_step_latency_us); } bool RtpLLMSpeculativeEngineMetrics::init(kmonitor::MetricsGroupManager *manager) { REGISTER_GAUGE_MUTABLE_METRIC(step_latency_us_metric, "rtp_llm_sp_step_latency_us"); REGISTER_GAUGE_MUTABLE_METRIC(propose_step_latency_us_metric, "rtp_llm_sp_propose_step_latency_us"); REGISTER_GAUGE_MUTABLE_METRIC(score_step_latency_us_metric, "rtp_llm_sp_score_step_latency_us"); REGISTER_GAUGE_MUTABLE_METRIC(speculative_sampler_latency_us_metric, "rtp_llm_sp_speculative_sampler_latency_us"); REGISTER_GAUGE_MUTABLE_METRIC(updater_step_latency_us_metric, "rtp_llm_sp_updater_step_latency_us"); REGISTER_GAUGE_MUTABLE_METRIC(total_propose_token_num_metric, "rtp_llm_sp_total_propose_token_num"); REGISTER_GAUGE_MUTABLE_METRIC(total_accepted_token_num_metric, "rtp_llm_sp_total_accepted_token_num"); return true; } void RtpLLMSpeculativeEngineMetrics::report(const kmonitor::MetricsTags* tags, RtpLLMSpeculativeEngineMetricsCollector* collector) { REPORT_MUTABLE_METRIC(step_latency_us_metric, collector->step_latency_us); REPORT_MUTABLE_METRIC(propose_step_latency_us_metric, collector->propose_step_latency_us); REPORT_MUTABLE_METRIC(score_step_latency_us_metric, collector->score_step_latency_us); REPORT_MUTABLE_METRIC(speculative_sampler_latency_us_metric, collector->speculative_sampler_latency_us); REPORT_MUTABLE_METRIC(updater_step_latency_us_metric, collector->updater_step_latency_us); REPORT_MUTABLE_METRIC(total_propose_token_num_metric, collector->total_propose_token_num); REPORT_MUTABLE_METRIC(total_accepted_token_num_metric, collector->total_accepted_token_num); } bool RtpLLMTokenPSMetrics::init(kmonitor::MetricsGroupManager* manager) { REGISTER_GAUGE_MUTABLE_METRIC(context_tps_metric, "rtp_llm_context_tps"); REGISTER_GAUGE_MUTABLE_METRIC(generate_tps_metric, "rtp_llm_generate_tps"); REGISTER_GAUGE_MUTABLE_METRIC(total_tps_metric, "rtp_llm_total_tps"); return true; } void RtpLLMTokenPSMetrics::report(const kmonitor::MetricsTags* tags, RtpLLMTokenPSMetricsCollector* collector) { REPORT_MUTABLE_METRIC(context_tps_metric, collector->context_tps); REPORT_MUTABLE_METRIC(generate_tps_metric, collector->generate_tps); REPORT_MUTABLE_METRIC(total_tps_metric, collector->total_tps); } bool RtpLLMCacheMetrics::init(kmonitor::MetricsGroupManager* manager) { REGISTER_GAUGE_MUTABLE_METRIC(kv_cache_item_num_metric, "rtp_llm_kv_cache_item_num"); REGISTER_GAUGE_MUTABLE_METRIC(kv_cache_free_blocks_metric, "rtp_llm_kv_cache_free_blocks"); REGISTER_GAUGE_MUTABLE_METRIC(kv_cache_available_blocks_metric, "rtp_llm_kv_cache_available_blocks"); REGISTER_GAUGE_MUTABLE_METRIC(kv_cache_left_seq_metric, "rtp_llm_kv_cache_left_seq"); REGISTER_GAUGE_MUTABLE_METRIC(kv_cache_used_ratio_metric, "rtp_llm_kv_cache_used_ratio"); REGISTER_GAUGE_MUTABLE_METRIC(mr_cost_time_ms_metric, "rtp_llm_mr_cost_time_ms"); return true; } void RtpLLMCacheMetrics::report(const kmonitor::MetricsTags* tags, RtpLLMCacheMetricsCollector* collector) { REPORT_MUTABLE_METRIC(kv_cache_item_num_metric, collector->kv_cache_item_num); REPORT_MUTABLE_METRIC(kv_cache_free_blocks_metric, collector->kv_cache_free_blocks); REPORT_MUTABLE_METRIC(kv_cache_available_blocks_metric, collector->kv_cache_available_blocks); REPORT_MUTABLE_METRIC(kv_cache_left_seq_metric, collector->kv_cache_left_seq); REPORT_MUTABLE_METRIC(kv_cache_used_ratio_metric, collector->kv_cache_used_ratio); REPORT_MUTABLE_METRIC(mr_cost_time_ms_metric, collector->mr_cost_time_ms); } bool RtpLLMCacheReuseMetrics::init(kmonitor::MetricsGroupManager* manager) { REGISTER_GAUGE_MUTABLE_METRIC(match_cost_time_us, "rtp_llm_match_cost_time_us"); REGISTER_GAUGE_MUTABLE_METRIC(kv_cache_reuse_length, "rtp_llm_kv_cache_reuse_length"); return true; } void RtpLLMCacheReuseMetrics::report(const kmonitor::MetricsTags* tags, RtpLLMCacheReuseMetricsCollector* collector) { REPORT_MUTABLE_METRIC(match_cost_time_us, collector->match_cost_time_us); REPORT_MUTABLE_METRIC(kv_cache_reuse_length, collector->kv_cache_reuse_length); } bool RtpLLMKernelMetrics::init(kmonitor::MetricsGroupManager* manager) { REGISTER_GAUGE_MUTABLE_METRIC(kernel_exec_time_metric, "rtp_llm_kenrel_exec_time"); return true; } void RtpLLMKernelMetrics::report(const kmonitor::MetricsTags* tags, RtpLLMKernelMetricsCollector* collector) { REPORT_MUTABLE_METRIC(kernel_exec_time_metric, collector->kernel_exec_time); } bool RtpLLmEplbMetrics::init(kmonitor::MetricsGroupManager* manager) { REGISTER_QPS_MUTABLE_METRIC(update_weights_qps_metric, "rtp_llm_update_weights_qps"); REGISTER_QPS_MUTABLE_METRIC(update_layer_weights_qps_metric, "rtp_llm_update_layer_weights_qps"); REGISTER_GAUGE_MUTABLE_METRIC(update_weights_latency_ms_metric, "rtp_llm_update_weights_latency_ms"); REGISTER_GAUGE_MUTABLE_METRIC(gpu_loads_metric, "rtp_llm_gpu_loads"); return true; } void RtpLLmEplbMetrics::report(const kmonitor::MetricsTags* tags, RtpLLmEplbMetricsCollector* collector) { // ep stats metrics int num_layer = collector->gpu_loads.size(); auto ep_tag = kmonitor::MetricsTags("ep_rank", std::to_string(collector->ep_rank)); tags->MergeTags(&ep_tag); for (int i = 0; i < num_layer; ++i) { auto layer_tag = kmonitor::MetricsTags("layer", std::to_string(i)); ep_tag.MergeTags(&layer_tag); if (gpu_loads_metric) { gpu_loads_metric->Report(&layer_tag, collector->gpu_loads[i]); } } // update weights metrics if (collector->update_weights_qps) { REPORT_MUTABLE_QPS(update_weights_qps_metric); REPORT_MUTABLE_METRIC(update_weights_latency_ms_metric, collector->update_weights_latency_ms); // report layer qps auto layer_tag = kmonitor::MetricsTags("layer", std::to_string(collector->update_layer_id)); tags->MergeTags(&layer_tag); if (update_layer_weights_qps_metric) { update_layer_weights_qps_metric->Report(&layer_tag, 1); } collector->update_weights_qps = false; } } #undef REPORT_QPS #undef REPORT_GAUGE bool initKmonitorFactory() { KmonParam param; param.init(); if (!param.kmonitorMetricsReporterCacheLimit.empty()) { size_t limit = 0; if (autil::StringUtil::fromString<size_t>(param.kmonitorMetricsReporterCacheLimit, limit) || limit > 0) { kmonitor::MetricsReporter::setMetricsReporterCacheLimit(limit); RTP_LLM_LOG_INFO("set metrics reporter cache limit [%lu].", limit); } } if (param.kmonitorNormalSamplePeriod > 0) { RTP_LLM_LOG_INFO("set kmonitor normal sample period [%d] seconds.", param.kmonitorNormalSamplePeriod); kmonitor::MetricLevelConfig config; config.period[kmonitor::NORMAL] = (unsigned int)param.kmonitorNormalSamplePeriod; kmonitor::MetricLevelManager::SetGlobalLevelConfig(config); } kmonitor::MetricsConfig metricsConfig; metricsConfig.set_tenant_name(param.kmonitorTenant); metricsConfig.set_service_name(param.kmonitorServiceName); std::string sink_address = param.kmonitorSinkAddress; if (!param.kmonitorPort.empty()) { sink_address += ":" + param.kmonitorPort; } metricsConfig.set_sink_address(sink_address.c_str()); metricsConfig.set_enable_log_file_sink(param.kmonitorEnableLogFileSink); //metricsConfig.set_enable_prometheus_sink(param.kmonitorEnablePrometheusSink); metricsConfig.set_manually_mode(param.kmonitorManuallyMode); metricsConfig.set_inited(true); metricsConfig.AddGlobalTag("hippo_slave_ip", param.hippoSlaveIp); for (auto &pair : param.kmonitorTags) { metricsConfig.AddGlobalTag(pair.first, pair.second); } if (!kmonitor::KMonitorFactory::Init(metricsConfig)) { RTP_LLM_LOG_ERROR("init kmonitor factory failed with"); return false; } // registerBuildInMetrics to refresh sg_buildin_kmonitor for KMonitorWorker::Start kmonitor::KMonitorFactory::registerBuildInMetrics(nullptr, param.kmonitorMetricsPrefix); RTP_LLM_LOG_INFO("KMonitorFactory::registerBuildInMetrics() finished"); kmonitor::KMonitorFactory::Start(); RTP_LLM_LOG_INFO("KMonitorFactory::Start() finished"); return true; } void stopKmonitorFactory() { kmonitor::KMonitorFactory::Shutdown(); } kmonitor::MetricsTags getHippoTags() { auto hippo_tags = kmonitor::MetricsTags(); if (std::getenv("HIPPO_ROLE")) { auto host_ip = autil::EnvUtil::getEnv("HIPPO_SLAVE_IP", ""); hippo_tags.AddTag("host_ip", host_ip); hippo_tags.AddTag("container_ip", autil::EnvUtil::getEnv("RequestedIP", host_ip)); hippo_tags.AddTag("hippo_role", autil::EnvUtil::getEnv("HIPPO_ROLE", "")); hippo_tags.AddTag("hippo_app", autil::EnvUtil::getEnv("HIPPO_APP", "")); hippo_tags.AddTag("hippo_group", autil::EnvUtil::getEnv("HIPPO_SERVICE_NAME", "")); } return hippo_tags; } } // namespace rtp_llm