maga_transformer/metrics/kmonitor_metric_reporter.py (48 lines of code) (raw):

import logging from enum import Enum from typing import Dict, Any, Union from maga_transformer.distribute.worker_info import g_parallel_info, g_frontend_server_info class AccMetrics(Enum): CANCEL_QPS_METRIC = "py_rtp_cancal_qps_metric" SUCCESS_QPS_METRIC = "py_rtp_success_qps_metric" QPS_METRIC = "py_rtp_framework_qps" ERROR_QPS_METRIC = "py_rtp_framework_error_qps" CONFLICT_QPS_METRIC = "py_rtp_framework_concurrency_exception_qps" ITER_QPS_METRIC = "py_rtp_response_iterate_qps" UPDATE_QPS_METRIC = "py_rtp_update_qps_metric" ERROR_UPDATE_QPS_METRIC = "py_rtp_error_update_target_qps" class GaugeMetrics(Enum): RESPONSE_FIRST_TOKEN_RT_METRIC = "py_rtp_response_first_token_rt" RESPONSE_ITER_RT_METRIC = "py_rtp_response_iterate_rt" RESPONSE_ITERATE_COUNT = "py_rtp_response_iterate_count" LANTENCY_METRIC = "py_rtp_framework_rt" FT_ITERATE_COUNT_METRIC = "ft_iterate_count" INPUT_TOKEN_SIZE_METRIC = "ft_input_token_length" OUTPUT_TOKEN_SIZE_METRIC = "ft_output_token_length" PRE_PIPELINE_RT_METRIC = "ft_pre_pipeline_rt" POST_PIPELINE_RT_METRIC = "ft_post_pipeline_rt" NUM_BEAMS_METRIC = "ft_num_beams" UPDATE_LANTENCY_METRIC = "py_rtp_update_framework_rt" class MetricReporter(object): def __init__(self, kmonitor: Any): self._kmon = kmonitor self._matic_map: Dict[str, Any] = {} self._inited = False def report(self, metric: Union[AccMetrics,GaugeMetrics], value: float = 1, tags: Dict[str, Any] = {}): if g_parallel_info.dp_size > 1: tags['dp_rank'] = str(g_parallel_info.dp_rank) tags['frontend_server_id'] = str(g_frontend_server_info.frontend_server_id) kmon_metric = self._matic_map.get(metric.value, None) if kmon_metric is None: logging.warn(f"no metric named {metric.name}") return kmon_metric.report(value, tags) def flush(self) -> None: self._kmon.flush() def init(self): if not self._inited: self._inited = True for metric in AccMetrics: self._matic_map[metric.value] = self._kmon.register_acc_metric(metric.value) for metric in GaugeMetrics: self._matic_map[metric.value] = self._kmon.register_gauge_metric(metric.value)