in java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterManager.java [81:173]
public synchronized void reset(Metric metric) {
try {
if (clientMeter.satisfy(metric)) {
log.info("Metric settings is satisfied by the current message meter, metric={}, clientId={}",
metric, clientId);
return;
}
if (!metric.isOn()) {
log.info("Metric is off, clientId={}", clientId);
clientMeter.shutdown();
clientMeter = ClientMeter.disabledInstance(clientId);
return;
}
final Endpoints endpoints = metric.getEndpoints();
final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(endpoints.getGrpcTarget())
.intercept(new AuthInterceptor(clientConfiguration, clientId));
if (clientConfiguration.isSslEnabled()) {
final SslContextBuilder builder = GrpcSslContexts.forClient();
builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
SslContext sslContext = builder.build();
channelBuilder.sslContext(sslContext);
} else {
channelBuilder.usePlaintext();
}
final List<InetSocketAddress> socketAddresses = endpoints.toSocketAddresses();
if (null != socketAddresses) {
IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses);
channelBuilder.nameResolverFactory(metricResolverFactory);
}
ManagedChannel channel = channelBuilder.build();
OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder().setChannel(channel)
.setTimeout(METRIC_EXPORTER_RPC_TIMEOUT)
.build();
InstrumentSelector sendSuccessCostTimeInstrumentSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.SEND_COST_TIME.getName()).build();
final View sendSuccessCostTimeView = View.builder()
.setAggregation(HistogramEnum.SEND_COST_TIME.getBucket()).build();
InstrumentSelector deliveryLatencyInstrumentSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.DELIVERY_LATENCY.getName()).build();
final View deliveryLatencyView = View.builder().setAggregation(HistogramEnum.DELIVERY_LATENCY.getBucket())
.build();
InstrumentSelector awaitTimeInstrumentSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.AWAIT_TIME.getName()).build();
final View awaitTimeView = View.builder().setAggregation(HistogramEnum.AWAIT_TIME.getBucket()).build();
InstrumentSelector processTimeInstrumentSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.PROCESS_TIME.getName()).build();
final View processTimeView = View.builder().setAggregation(HistogramEnum.PROCESS_TIME.getBucket()).build();
PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter)
.setInterval(METRIC_READER_INTERVAL).build();
final SdkMeterProvider provider = SdkMeterProvider.builder()
.setResource(Resource.empty())
.registerMetricReader(reader)
.registerView(sendSuccessCostTimeInstrumentSelector, sendSuccessCostTimeView)
.registerView(deliveryLatencyInstrumentSelector, deliveryLatencyView)
.registerView(awaitTimeInstrumentSelector, awaitTimeView)
.registerView(processTimeInstrumentSelector, processTimeView)
.build();
final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build();
Meter meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
// Reset message meter.
ClientMeter existedClientMeter = clientMeter;
clientMeter = new ClientMeter(meter, endpoints, provider, clientId);
existedClientMeter.shutdown();
log.info("Metrics is on, endpoints={}, clientId={}", endpoints, clientId);
final List<GaugeEnum> gauges = gaugeObserver.getGauges();
for (GaugeEnum gauge : gauges) {
meter.gaugeBuilder(gauge.getName()).buildWithCallback(measurement -> {
final Map<Attributes, Double> map = gaugeObserver.getValues(gauge);
if (map.isEmpty()) {
return;
}
for (Map.Entry<Attributes, Double> entry : map.entrySet()) {
final Attributes attributes = entry.getKey();
final Double value = entry.getValue();
measurement.record(value, attributes);
}
});
}
} catch (Throwable t) {
log.error("Exception raised when resetting message meter, clientId={}", clientId, t);
}
}