router/pkg/metric/meter.go (358 lines of code) (raw):
package metric
import (
"context"
"fmt"
"net/url"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/wundergraph/cosmo/router/pkg/otel/otelconfig"
"go.opentelemetry.io/otel/attribute"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.uber.org/zap"
_ "google.golang.org/grpc/encoding/gzip" // Required for gzip support over grpc
)
var (
// Please version the used meters if you change the buckets.
// 0kb-20MB
cloudOtelBytesBucketBounds = []float64{
0, 50, 100, 300, 500, 1000, 3000, 5000, 10000, 15000,
30000, 50000, 70000, 90000, 150000, 300000, 600000,
800000, 1000000, 5000000, 10000000, 20000000,
}
// Please version the used meters if you change the buckets.
// 0ms-10s
cloudOtelMsBucketsBounds = []float64{
0, 5, 7, 10, 15, 25, 50, 75, 100, 125, 150, 175, 200, 225,
250, 275, 300, 350, 400, 450, 500, 600, 700, 800, 900, 1000, 1250,
1500, 1750, 2000, 2250, 2500, 2750, 3000, 3500, 4000, 5000, 10000,
}
// Prometheus buckets with fewer buckets to reduce cardinality
promBytesBuckets = []float64{
512, // 512 B
1024, // 1 KB
4096, // 4 KB
8192, // 8 KB
16384, // 16 KB
65536, // 64 KB
262144, // 256 KB
524288, // 512 KB
1048576, // 1 MB
3145728, // 3 MB
}
prometheusMsBuckets = []float64{
10, // 10 ms
25, // 25 ms
50, // 50 ms
100, // 100 ms
250, // 250 ms
500, // 500 ms
1000, // 1000 ms
2500, // 2500ms
5000, // 5 s
10000, // 10 s
}
)
const (
defaultExportTimeout = 30 * time.Second
defaultExportInterval = 15 * time.Second
)
var (
// defaultCloudTemporalitySelector is a function that selects the temporality for a given instrument kind.
// Short story about when we choose delta and when we choose cumulative temporality:
//
// Delta temporalities are reported as completed intervals. They don't build upon each other.
// This makes them easier to query and aggregate because we don't have to think about resets.
// See here for more information: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#resets-and-gaps
//
// The downside is that missing data points will result in data loss and can't be averaged from the previous value.
// Delta temporality is more memory efficient for synchronous instruments because we don't have to store the last value.
// On the other hand, delta temporality is more expensive for asynchronous instruments because we have to store the last
// value of every permutation to calculate the delta.
//
// We choose delta temporality for synchronous instruments because we can easily sum the values over a time range.
// We choose cumulative temporality for asynchronous instruments because we can query the last cumulative value without extra work.
// See https://opentelemetry.io/docs/specs/otel/metrics/supplementary-guidelines/#aggregation-temporality for more information.
// and https://grafana.com/blog/2023/09/26/opentelemetry-metrics-a-guide-to-delta-vs.-cumulative-temporality-trade-offs/
//
defaultCloudTemporalitySelector = func(kind sdkmetric.InstrumentKind) metricdata.Temporality {
switch kind {
case sdkmetric.InstrumentKindCounter,
sdkmetric.InstrumentKindUpDownCounter,
sdkmetric.InstrumentKindHistogram:
return metricdata.DeltaTemporality
case
sdkmetric.InstrumentKindObservableGauge,
sdkmetric.InstrumentKindObservableCounter,
sdkmetric.InstrumentKindObservableUpDownCounter:
return metricdata.CumulativeTemporality
}
panic("unknown instrument kind")
}
cumulativeTemporalitySelector = func(kind sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}
)
func NewPrometheusMeterProvider(ctx context.Context, c *Config, serviceInstanceID string) (*sdkmetric.MeterProvider, *prometheus.Registry, error) {
var registry *prometheus.Registry
if c.Prometheus.TestRegistry != nil {
registry = c.Prometheus.TestRegistry
} else {
registry = prometheus.NewRegistry()
}
registry.MustRegister(collectors.NewGoCollector())
// Only available on Linux and Windows systems
registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
otelPromOpts := []otelprom.Option{
otelprom.WithoutUnits(),
otelprom.WithRegisterer(registry),
}
if c.Prometheus.ExcludeScopeInfo {
otelPromOpts = append(otelPromOpts, otelprom.WithoutScopeInfo())
}
promExporter, err := otelprom.New(otelPromOpts...)
if err != nil {
return nil, nil, err
}
opts, err := defaultPrometheusMetricOptions(
ctx,
serviceInstanceID,
c,
)
if err != nil {
return nil, nil, err
}
opts = append(opts, sdkmetric.WithReader(promExporter))
return sdkmetric.NewMeterProvider(opts...), registry, nil
}
func getTemporalitySelector(temporality otelconfig.ExporterTemporality, log *zap.Logger) func(kind sdkmetric.InstrumentKind) metricdata.Temporality {
// https://github.com/open-telemetry/opentelemetry-go/blob/main/internal/shared/otlp/otlpmetric/oconf/envconfig.go.tmpl#L166-L177
// See the above link for selectors for different temporalities
if temporality == otelconfig.DeltaTemporality {
deltaTemporalitySelector := func(kind sdkmetric.InstrumentKind) metricdata.Temporality {
switch kind {
case sdkmetric.InstrumentKindCounter,
sdkmetric.InstrumentKindObservableCounter,
sdkmetric.InstrumentKindHistogram:
return metricdata.DeltaTemporality
default:
return metricdata.CumulativeTemporality
}
}
return deltaTemporalitySelector
} else if temporality == otelconfig.CumulativeTemporality {
return cumulativeTemporalitySelector
} else if temporality == otelconfig.CustomCloudTemporality {
return defaultCloudTemporalitySelector
} else {
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader
// if the temporality is not configured, we fallback the to the default as per OTEL-SDK
log.Debug("The temporality selector falls back to the default.")
return cumulativeTemporalitySelector
}
}
func createOTELExporter(log *zap.Logger, exp *OpenTelemetryExporter) (sdkmetric.Exporter, error) {
// Parse the URL to get the host and port
// The stdlib url.Parse does not parse localhost alone, so we need to add the scheme
u, err := parseURL(exp.Endpoint)
if err != nil {
return nil, fmt.Errorf("invalid OpenTelemetry endpoint %q: %w", exp.Endpoint, err)
}
defaultEndpoint, err := url.Parse(otelconfig.DefaultEndpoint())
if err != nil {
return nil, fmt.Errorf("invalid default OpenTelemetry endpoint %q: %w", otelconfig.DefaultEndpoint(), err)
}
// if the exporter is configured to our cloud otel, then the temporality is set to the custom cloud temporality selector.
if u.Host == defaultEndpoint.Host {
exp.Temporality = otelconfig.CustomCloudTemporality
}
var exporter sdkmetric.Exporter
switch exp.Exporter {
case otelconfig.ExporterOLTPHTTP:
opts := []otlpmetrichttp.Option{
// Includes host and port
otlpmetrichttp.WithEndpoint(u.Host),
otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
otlpmetrichttp.WithTemporalitySelector(getTemporalitySelector(exp.Temporality, log)),
}
if u.Scheme != "https" {
opts = append(opts, otlpmetrichttp.WithInsecure())
}
if len(exp.Headers) > 0 {
opts = append(opts, otlpmetrichttp.WithHeaders(exp.Headers))
}
if len(exp.HTTPPath) > 0 {
opts = append(opts, otlpmetrichttp.WithURLPath(exp.HTTPPath))
}
exporter, err = otlpmetrichttp.New(
context.Background(),
opts...,
)
case otelconfig.ExporterOLTPGRPC:
opts := []otlpmetricgrpc.Option{
// Includes host and port
otlpmetricgrpc.WithEndpoint(u.Host),
otlpmetricgrpc.WithCompressor("gzip"),
otlpmetricgrpc.WithTemporalitySelector(getTemporalitySelector(exp.Temporality, log)),
}
if u.Scheme != "https" {
opts = append(opts, otlpmetricgrpc.WithInsecure())
}
if len(exp.Headers) > 0 {
opts = append(opts, otlpmetricgrpc.WithHeaders(exp.Headers))
}
exporter, err = otlpmetricgrpc.New(
context.Background(),
opts...,
)
default:
return nil, fmt.Errorf("unknown metrics exporter %s", exp.Exporter)
}
if err != nil {
return nil, err
}
log.Info("Metrics enabled", zap.String("exporter", string(exp.Exporter)), zap.String("endpoint", exp.Endpoint), zap.String("path", exp.HTTPPath))
return exporter, nil
}
func NewOtlpMeterProvider(ctx context.Context, log *zap.Logger, c *Config, serviceInstanceID string) (*sdkmetric.MeterProvider, error) {
opts, err := defaultOtlpMetricOptions(ctx, serviceInstanceID, c)
if err != nil {
return nil, err
}
if c.OpenTelemetry.TestReader != nil {
mp := sdkmetric.NewMeterProvider(append(opts, sdkmetric.WithReader(c.OpenTelemetry.TestReader))...)
// Set the global MeterProvider to the SDK metric provider.
otel.SetMeterProvider(mp)
return mp, nil
}
for _, exp := range c.OpenTelemetry.Exporters {
if exp.Disabled {
continue
}
exporter, err := createOTELExporter(log, exp)
if err != nil {
log.Error("creating OTEL metrics exporter", zap.Error(err))
return nil, err
}
opts = append(opts, sdkmetric.WithReader(
sdkmetric.NewPeriodicReader(exporter,
sdkmetric.WithTimeout(defaultExportTimeout),
sdkmetric.WithInterval(defaultExportInterval),
),
))
}
mp := sdkmetric.NewMeterProvider(opts...)
// Set the global MeterProvider to the SDK metric provider.
otel.SetMeterProvider(mp)
return mp, nil
}
// IsDefaultCloudExporterConfigured checks if the default cloud exporter is configured in the provided exporters.
func IsDefaultCloudExporterConfigured(c []*OpenTelemetryExporter) bool {
for _, exp := range c {
if isCloudExporter(exp) {
return true
}
}
return false
}
// isCloudExporter checks if the provided is the default cloud exporter.
func isCloudExporter(exp *OpenTelemetryExporter) bool {
u, err := parseURL(exp.Endpoint)
if err != nil {
return false
}
defaultEndpoint, err := url.Parse(otelconfig.DefaultEndpoint())
if err != nil {
return false
}
return u.Host == defaultEndpoint.Host
}
func getResource(ctx context.Context, serviceInstanceID string, c *Config) (*resource.Resource, error) {
r, err := resource.New(ctx,
resource.WithAttributes(semconv.ServiceNameKey.String(c.Name)),
resource.WithAttributes(semconv.ServiceVersionKey.String(c.Version)),
resource.WithAttributes(semconv.ServiceInstanceID(serviceInstanceID)),
resource.WithAttributes(c.ResourceAttributes...),
resource.WithProcessPID(),
resource.WithOSType(),
resource.WithTelemetrySDK(),
resource.WithHost(),
)
if err != nil {
return nil, err
}
return r, nil
}
func defaultPrometheusMetricOptions(ctx context.Context, serviceInstanceID string, c *Config) ([]sdkmetric.Option, error) {
r, err := getResource(ctx, serviceInstanceID, c)
if err != nil {
return nil, err
}
var opts []sdkmetric.Option
// Exclude attributes from metrics
attributeFilter := func(value attribute.KeyValue) bool {
if isKeyInSlice(value.Key, defaultExcludedOtelKeys) {
return false
}
name := sanitizeName(string(value.Key))
for _, re := range c.Prometheus.ExcludeMetricLabels {
if re.MatchString(name) {
return false
}
}
return true
}
msBucketHistogram := sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: prometheusMsBuckets,
}
bytesBucketHistogram := sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: promBytesBuckets,
}
var view sdkmetric.View = func(i sdkmetric.Instrument) (sdkmetric.Stream, bool) {
// In a custom View function, we need to explicitly copy the name, description, and unit.
s := sdkmetric.Stream{Name: i.Name, Description: i.Description, Unit: i.Unit}
// Filter out metrics that match the excludeMetrics regexes
for _, re := range c.Prometheus.ExcludeMetrics {
promName := sanitizeName(i.Name)
if re.MatchString(promName) {
// Drop the metric
s.Aggregation = sdkmetric.AggregationDrop{}
return s, true
}
}
// Filter out attributes that match the excludeMetricAttributes regexes
s.AttributeFilter = attributeFilter
// Use different histogram buckets for PrometheusConfig
if i.Unit == unitBytes && i.Kind == sdkmetric.InstrumentKindHistogram {
s.Aggregation = bytesBucketHistogram
} else if i.Unit == unitMilliseconds && i.Kind == sdkmetric.InstrumentKindHistogram {
s.Aggregation = msBucketHistogram
}
return s, true
}
opts = append(opts, sdkmetric.WithView(view))
opts = append(opts, // Record information about this application in a Resource.
sdkmetric.WithResource(r),
)
return opts, nil
}
func defaultOtlpMetricOptions(ctx context.Context, serviceInstanceID string, c *Config) ([]sdkmetric.Option, error) {
r, err := getResource(ctx, serviceInstanceID, c)
if err != nil {
return nil, err
}
msBucketHistogram := sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: cloudOtelMsBucketsBounds,
}
bytesBucketHistogram := sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: cloudOtelBytesBucketBounds,
}
attributeFilter := func(value attribute.KeyValue) bool {
for _, re := range c.OpenTelemetry.ExcludeMetricLabels {
if re.MatchString(string(value.Key)) {
return false
}
}
return true
}
var view sdkmetric.View = func(i sdkmetric.Instrument) (sdkmetric.Stream, bool) {
// In a custom View function, we need to explicitly copy the name, description, and unit.
s := sdkmetric.Stream{Name: i.Name, Description: i.Description, Unit: i.Unit}
// Filter out metrics that match the excludeMetrics regexes
for _, re := range c.OpenTelemetry.ExcludeMetrics {
if re.MatchString(i.Name) {
// Drop the metric
s.Aggregation = sdkmetric.AggregationDrop{}
return s, true
}
}
// Filter out attributes that match the excludeMetricAttributes regexes
s.AttributeFilter = attributeFilter
if i.Unit == unitBytes && i.Kind == sdkmetric.InstrumentKindHistogram {
s.Aggregation = bytesBucketHistogram
} else if i.Unit == unitMilliseconds && i.Kind == sdkmetric.InstrumentKindHistogram {
s.Aggregation = msBucketHistogram
}
return s, true
}
// Info: There can be only a single view per instrument. A view with less restriction might override a view.
return []sdkmetric.Option{
// Record information about this application in a Resource.
sdkmetric.WithResource(r),
sdkmetric.WithView(view),
}, nil
}
func isKeyInSlice(key attribute.Key, keys []attribute.Key) bool {
for _, k := range keys {
if k == key {
return true
}
}
return false
}
func parseURL(input string) (*url.URL, error) {
if !strings.Contains(input, "://") {
input = "http://" + input
}
return url.Parse(input)
}