router/pkg/metric/engine_metrics.go (142 lines of code) (raw):
package metric
import (
"context"
"errors"
"github.com/wundergraph/cosmo/router/pkg/statistics"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric"
"go.uber.org/zap"
)
const (
cosmoEngineMeterName = "cosmo.router.engine"
cosmoEngineMeterVersion = "0.0.1"
engineMetricBaseKey = "router.engine."
engineConnectionCountKey = engineMetricBaseKey + "connections"
engineSubscriptionCountKey = engineMetricBaseKey + "subscriptions"
engineTriggerCountKey = engineMetricBaseKey + "triggers"
engineMessagesSentKey = engineMetricBaseKey + "messages.sent"
)
type engineInstruments struct {
connectionCount otelmetric.Int64ObservableUpDownCounter
subscriptionCount otelmetric.Int64ObservableUpDownCounter
triggerCount otelmetric.Int64ObservableUpDownCounter
messagesSent otelmetric.Int64ObservableCounter
}
func (i *engineInstruments) toList() []otelmetric.Observable {
result := make([]otelmetric.Observable, 0)
if i.connectionCount != nil {
result = append(result, i.connectionCount)
}
if i.subscriptionCount != nil {
result = append(result, i.subscriptionCount)
}
if i.triggerCount != nil {
result = append(result, i.triggerCount)
}
if i.messagesSent != nil {
result = append(result, i.messagesSent)
}
return result
}
// EngineMetrics is a struct that holds the engine metrics.
type EngineMetrics struct {
instruments *engineInstruments
meter otelmetric.Meter
baseAttributes []attribute.KeyValue
instrumentRegistrations []otelmetric.Registration
logger *zap.Logger
}
// NewEngineMetrics creates a new EngineMetrics instance.
func NewEngineMetrics(
logger *zap.Logger,
baseAttributes []attribute.KeyValue,
provider *metric.MeterProvider,
stats statistics.EngineStatistics,
statConfig *EngineStatsConfig,
) (*EngineMetrics, error) {
if !statConfig.Enabled() {
return nil, nil
}
meter := provider.Meter(cosmoEngineMeterName, otelmetric.WithInstrumentationVersion(cosmoEngineMeterVersion))
instruments, err := setupInstruments(meter, statConfig)
if err != nil {
return nil, err
}
em := &EngineMetrics{
instruments: instruments,
meter: meter,
baseAttributes: baseAttributes,
logger: logger,
}
if err := em.registerObservers(stats); err != nil {
return nil, err
}
return em, nil
}
func setupInstruments(m otelmetric.Meter, statConfig *EngineStatsConfig) (*engineInstruments, error) {
var (
err error
connectionCount otelmetric.Int64ObservableUpDownCounter
subscriptionCount otelmetric.Int64ObservableUpDownCounter
triggerCount otelmetric.Int64ObservableUpDownCounter
messagesSent otelmetric.Int64ObservableCounter
)
if statConfig.Subscription {
connectionCount, err = m.Int64ObservableUpDownCounter(engineConnectionCountKey,
otelmetric.WithDescription("Number of connections in the engine. Contains both websocket and http connections"))
if err != nil {
return nil, err
}
subscriptionCount, err = m.Int64ObservableUpDownCounter(engineSubscriptionCountKey,
otelmetric.WithDescription("Number of subscriptions in the engine."))
if err != nil {
return nil, err
}
triggerCount, err = m.Int64ObservableUpDownCounter(engineTriggerCountKey,
otelmetric.WithDescription("Number of triggers in the engine."))
if err != nil {
return nil, err
}
messagesSent, err = m.Int64ObservableCounter(engineMessagesSentKey,
otelmetric.WithDescription("Number of subscription updates in the engine."))
if err != nil {
return nil, err
}
}
return &engineInstruments{
connectionCount: connectionCount,
subscriptionCount: subscriptionCount,
triggerCount: triggerCount,
messagesSent: messagesSent,
}, nil
}
func (e *EngineMetrics) registerObservers(stats statistics.EngineStatistics) error {
instrumentList := e.instruments.toList()
// Nothing to register
if len(instrumentList) == 0 {
return nil
}
rc, err := e.meter.RegisterCallback(func(_ context.Context, o otelmetric.Observer) error {
e.observeInstruments(o, stats)
return nil
}, instrumentList...)
if err != nil {
return err
}
e.instrumentRegistrations = append(e.instrumentRegistrations, rc)
return nil
}
func (e *EngineMetrics) observeInstruments(o otelmetric.Observer, stats statistics.EngineStatistics) {
report := stats.GetReport()
o.ObserveInt64(e.instruments.connectionCount, int64(report.Connections), otelmetric.WithAttributes(e.baseAttributes...))
o.ObserveInt64(e.instruments.subscriptionCount, int64(report.Subscriptions), otelmetric.WithAttributes(e.baseAttributes...))
o.ObserveInt64(e.instruments.triggerCount, int64(report.Triggers), otelmetric.WithAttributes(e.baseAttributes...))
o.ObserveInt64(e.instruments.messagesSent, int64(report.MessagesSent), otelmetric.WithAttributes(e.baseAttributes...))
}
func (e *EngineMetrics) Shutdown() error {
var err error
for _, reg := range e.instrumentRegistrations {
if regErr := reg.Unregister(); regErr != nil {
err = errors.Join(regErr)
}
}
return err
}