python/rocketmq/v5/client/metrics/client_metrics.py (131 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import threading import time from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import \ OTLPMetricExporter from opentelemetry.metrics import Histogram from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.metrics.view import (ExplicitBucketHistogramAggregation, View) from opentelemetry.sdk.resources import Resource from rocketmq.grpc_protocol import Metric from rocketmq.v5.client.connection import RpcEndpoints from rocketmq.v5.log import logger from rocketmq.v5.model import HistogramEnum, MessageMetricType, MetricContext class ClientMetrics: METRIC_EXPORTER_RPC_TIMEOUT = 5 METRIC_READER_INTERVAL = 60000 # 1 minute METRIC_INSTRUMENTATION_NAME = "org.apache.rocketmq.message" def __init__(self, client_id, configuration): self.__enabled = False self.__endpoints = None self.__client_id = client_id self.__client_configuration = configuration self.__send_success_cost_time_instrument = None self.__meter_provider = None self.__metric_lock = threading.Lock() def reset_metrics(self, metric: Metric): # if metrics endpoints changed or metric.on from False to True, start a new client metrics with self.__metric_lock: if self.__satisfy(metric): return # metric.on from True to False, shutdown client metrics if not metric.on: self.__meter_provider_shutdown() self.__enabled = False self.__endpoints = None self.__send_success_cost_time_instrument = None return self.__enabled = metric.on self.__endpoints = RpcEndpoints(metric.endpoints) self.__meter_provider_start() logger.info(f"client:{self.__client_id} start metric provider success.") """ send metric """ def send_before(self, topic): send_context = MetricContext(MessageMetricType.SEND) # record send message time start_timestamp = round(time.time() * 1000, 2) send_context.put_attr("send_stopwatch", start_timestamp) send_context.put_attr("topic", topic) send_context.put_attr("client_id", self.__client_id) return send_context def send_after(self, send_context: MetricContext, success: bool): if send_context is None: logger.warn( "metrics do send after exception. send_context must not be none." ) return if send_context.metric_type != MessageMetricType.SEND: logger.warn( f"metric type must be MessageMetricType.SEND. current send_context type is {send_context.metric_type}" ) return if send_context.get_attr("send_stopwatch") is None: logger.warn( "metrics do send after exception. send_stopwatch must not be none." ) return if send_context.get_attr("topic") is None: logger.warn("metrics do send after exception. topic must not be none.") return if send_context.get_attr("client_id") is None: send_context.put_attr("client_id", self.__client_id) # record send RT and result start_timestamp = send_context.get_attr("send_stopwatch") cost = round(time.time() * 1000, 2) - start_timestamp send_context.put_attr("invocation_status", "success" if success else "failure") send_context.remove_attr("send_stopwatch") self.__record_send_success_cost_time(send_context, cost) """ private """ def __satisfy(self, metric: Metric): if metric.endpoints is None: return True # if metrics endpoints changed, return False if ( self.__enabled and metric.on and self.__endpoints == RpcEndpoints(metric.endpoints) ): return True return not self.__enabled and not metric.on def __meter_provider_shutdown(self): if self.__meter_provider is not None: try: self.__meter_provider.shutdown() self.__meter_provider = None except Exception as e: logger.error(f"meter provider shutdown exception:{e}") def __meter_provider_start(self): if self.__endpoints is None: logger.warn( f"client:{self.__client_id} can't create meter provider, because endpoints is none." ) return try: # setup OTLP exporter exporter = OTLPMetricExporter( endpoint=self.__endpoints.__str__(), insecure=True, timeout=ClientMetrics.METRIC_EXPORTER_RPC_TIMEOUT, ) # create a metric reader and set the export interval reader = PeriodicExportingMetricReader( exporter, export_interval_millis=ClientMetrics.METRIC_READER_INTERVAL ) # create an empty resource resource = Resource.get_empty() # create view send_cost_time_view = View( instrument_type=Histogram, instrument_name=HistogramEnum.SEND_COST_TIME.histogram_name, aggregation=ExplicitBucketHistogramAggregation( HistogramEnum.SEND_COST_TIME.buckets ), ) # create MeterProvider self.__meter_provider = MeterProvider( metric_readers=[reader], resource=resource, views=[send_cost_time_view] ) # define the histogram instruments self.__send_success_cost_time_instrument = self.__meter_provider.get_meter( ClientMetrics.METRIC_INSTRUMENTATION_NAME ).create_histogram(HistogramEnum.SEND_COST_TIME.histogram_name) except Exception as e: logger.error( f"client:{self.__client_id} start meter provider exception: {e}" ) def __record_send_success_cost_time(self, context, amount): if self.__enabled: try: # record send message cost time and result self.__send_success_cost_time_instrument.record( amount, context.attributes ) except Exception as e: logger.error(f"record send message cost time exception, e:{e}")