router/pkg/metric/prom_metric_store.go (175 lines of code) (raw):

package metric import ( "context" "errors" "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric" "go.uber.org/zap" ) const ( cosmoRouterPrometheusMeterName = "cosmo.router.prometheus" cosmoRouterPrometheusMeterVersion = "0.0.1" ) type PromMetricStore struct { meter otelmetric.Meter meterProvider *metric.MeterProvider logger *zap.Logger measurements *Measurements instrumentRegistrations []otelmetric.Registration circuitBreakerEnabled bool } func NewPromMetricStore( logger *zap.Logger, meterProvider *metric.MeterProvider, routerInfoAttributes otelmetric.ObserveOption, opts MetricOpts, ) (Provider, error) { meter := meterProvider.Meter(cosmoRouterPrometheusMeterName, otelmetric.WithInstrumentationVersion(cosmoRouterPrometheusMeterVersion), ) m := &PromMetricStore{ meter: meter, logger: logger, meterProvider: meterProvider, instrumentRegistrations: make([]otelmetric.Registration, 0, 1), circuitBreakerEnabled: opts.EnableCircuitBreaker, } measures, err := createMeasures(meter, opts) if err != nil { return nil, err } m.measurements = measures err = m.startInitMetrics(routerInfoAttributes) if err != nil { return nil, err } return m, nil } func (h *PromMetricStore) startInitMetrics(initAttributes otelmetric.ObserveOption) error { gauge := h.measurements.observableGauges[RouterInfo] rc, err := h.meter.RegisterCallback(func(_ context.Context, o otelmetric.Observer) error { o.ObserveInt64(gauge, 1, initAttributes) return nil }, gauge) if err != nil { return err } h.instrumentRegistrations = append(h.instrumentRegistrations, rc) return nil } func (h *PromMetricStore) MeasureInFlight(ctx context.Context, opts ...otelmetric.AddOption) func() { if c, ok := h.measurements.upDownCounters[InFlightRequestsUpDownCounter]; ok { c.Add(ctx, 1, opts...) } return func() { if c, ok := h.measurements.upDownCounters[InFlightRequestsUpDownCounter]; ok { c.Add(ctx, -1, opts...) } } } func (h *PromMetricStore) MeasureRequestCount(ctx context.Context, opts ...otelmetric.AddOption) { if c, ok := h.measurements.counters[RequestCounter]; ok { c.Add(ctx, 1, opts...) } } func (h *PromMetricStore) MeasureCircuitBreakerShortCircuit(ctx context.Context, opts ...otelmetric.AddOption) { if !h.circuitBreakerEnabled { return } if c, ok := h.measurements.counters[CircuitBreakerShortCircuitsCounter]; ok { c.Add(ctx, 1, opts...) } } func (h *PromMetricStore) SetCircuitBreakerState(ctx context.Context, state bool, opts ...otelmetric.RecordOption) { if !h.circuitBreakerEnabled { return } if c, ok := h.measurements.gauges[CircuitBreakerStateGauge]; ok { // The value 0 here means it's not open, 1 means it's open var boolAsInt int64 = 0 if state { boolAsInt = 1 } c.Record(ctx, boolAsInt, opts...) } } func (h *PromMetricStore) MeasureRequestSize(ctx context.Context, contentLength int64, opts ...otelmetric.AddOption) { if c, ok := h.measurements.counters[RequestContentLengthCounter]; ok { c.Add(ctx, contentLength, opts...) } } func (h *PromMetricStore) MeasureResponseSize(ctx context.Context, size int64, opts ...otelmetric.AddOption) { if c, ok := h.measurements.counters[ResponseContentLengthCounter]; ok { c.Add(ctx, size, opts...) } } func (h *PromMetricStore) MeasureLatency(ctx context.Context, latency float64, opts ...otelmetric.RecordOption) { if c, ok := h.measurements.histograms[ServerLatencyHistogram]; ok { c.Record(ctx, latency, opts...) } } func (h *PromMetricStore) MeasureRequestError(ctx context.Context, opts ...otelmetric.AddOption) { if c, ok := h.measurements.counters[RequestError]; ok { c.Add(ctx, 1, opts...) } } func (h *PromMetricStore) MeasureOperationPlanningTime(ctx context.Context, planningTime float64, opts ...otelmetric.RecordOption) { if c, ok := h.measurements.histograms[OperationPlanningTime]; ok { c.Record(ctx, planningTime, opts...) } } func (h *PromMetricStore) MeasureSchemaFieldUsage(ctx context.Context, schemaUsage int64, opts ...otelmetric.AddOption) { if c, ok := h.measurements.counters[SchemaFieldUsageCounter]; ok { c.Add(ctx, schemaUsage, opts...) } } func (h *PromMetricStore) Flush(ctx context.Context) error { return h.meterProvider.ForceFlush(ctx) } func (h *PromMetricStore) Shutdown() error { var err error for _, reg := range h.instrumentRegistrations { if regErr := reg.Unregister(); regErr != nil { err = errors.Join(regErr) } } return err } // explodeAddInstrument explodes the metric into multiple metrics with different label values in Prometheus. func explodeAddInstrument(ctx context.Context, sliceAttrs []attribute.KeyValue, collect func(ctx context.Context, opts ...otelmetric.AddOption)) { for _, attr := range sliceAttrs { s := attr.Value.AsStringSlice() // If the slice is empty, we should at least emit the metric without the attribute. // to not ignore the metric emission. if len(s) == 0 { collect(ctx) continue } for _, v := range s { kv := attribute.KeyValue{ Key: attr.Key, Value: attribute.StringValue(v), } o := []otelmetric.AddOption{ otelmetric.WithAttributeSet(attribute.NewSet(kv)), } collect(ctx, o...) } } } // explodeRecordInstrument explodes the metric into multiple metrics with different label values in Prometheus. func explodeRecordInstrument(ctx context.Context, sliceAttrs []attribute.KeyValue, collect func(ctx context.Context, opts ...otelmetric.RecordOption)) { for _, attr := range sliceAttrs { s := attr.Value.AsStringSlice() // If the slice is empty, we should at least emit the metric without the attribute. // to not ignore the metric emission. if len(s) == 0 { collect(ctx) continue } for _, v := range s { kv := attribute.KeyValue{ Key: attr.Key, Value: attribute.StringValue(v), } o := []otelmetric.RecordOption{ otelmetric.WithAttributeSet(attribute.NewSet(kv)), } collect(ctx, o...) } } }