pkg/metricgen/otlp.go (124 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package metricgen import ( "context" "crypto/tls" "fmt" "net" "net/url" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/metric" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" "google.golang.org/grpc" "google.golang.org/grpc/credentials" grpcinsecure "google.golang.org/grpc/credentials/insecure" ) // SendOTLP sends specific metrics to the configured Elastic APM OTLP intake. // // Metrics are sent via the specified protocol. // // Metrics sent are: // - otlp(float64, value=1.0) func SendOTLP(ctx context.Context, opts ...ConfigOption) (EventStats, error) { cfg := newConfig(opts...) if err := cfg.Validate(); err != nil { return EventStats{}, fmt.Errorf("cannot validate OTLP Metrics configuration: %w", err) } var exporter sdkmetric.Exporter switch cfg.otlpProtocol { case grpcOTLPProtocol: e, cleanup, err := newOTLPMetricGRPCExporter(ctx, cfg) if err != nil { return EventStats{}, err } defer cleanup() exporter = e case httpOTLPProtocol: e, err := newOTLPMetricHTTPExporter(ctx, cfg) if err != nil { return EventStats{}, err } exporter = e } defer exporter.Shutdown(ctx) resource := resource.NewSchemaless( attribute.String("service.name", cfg.otlpServiceName), ) mp := sdkmetric.NewMeterProvider( sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter)), sdkmetric.WithResource(resource), ) stats := EventStats{} if err := generateMetrics(mp.Meter("metricgen"), &stats); err != nil { return stats, fmt.Errorf("cannot generate metrics: %w", err) } if err := mp.Shutdown(ctx); err != nil { return EventStats{}, fmt.Errorf("cannot shut down meter provider: %w", err) } return stats, nil } func generateMetrics(m metric.Meter, stats *EventStats) error { counter, _ := m.Float64Counter("otlp") counter.Add(context.Background(), 1) stats.Add(1) return nil } func newOTLPMetricHTTPExporter(ctx context.Context, cfg config) (*otlpmetrichttp.Exporter, error) { endpoint, err := otlpEndpoint(cfg.apmServerURL) if err != nil { return nil, err } tlsConfig := &tls.Config{InsecureSkipVerify: !cfg.verifyServerCert} opts := []otlpmetrichttp.Option{ otlpmetrichttp.WithEndpoint(endpoint.Host), otlpmetrichttp.WithTLSClientConfig(tlsConfig), } if endpoint.Scheme == "http" { opts = append(opts, otlpmetrichttp.WithInsecure()) } headers := map[string]string{"Authorization": "ApiKey " + cfg.apiKey} opts = append(opts, otlpmetrichttp.WithHeaders(headers)) return otlpmetrichttp.New(ctx, opts...) } func otlpEndpoint(s string) (*url.URL, error) { u, err := url.Parse(s) if err != nil { return &url.URL{}, fmt.Errorf("failed to parse endpoint: %w", err) } switch u.Scheme { case "http": if u.Port() == "" { u.Host = net.JoinHostPort(u.Host, "80") } case "https": if u.Port() == "" { u.Host = net.JoinHostPort(u.Host, "443") } default: return &url.URL{}, fmt.Errorf("endpoint must be prefixed with http:// or https://") } return u, nil } func newOTLPMetricGRPCExporter(ctx context.Context, cfg config) (*otlpmetricgrpc.Exporter, func(), error) { endpoint, err := otlpEndpoint(cfg.apmServerURL) if err != nil { return nil, func() {}, err } var transportCredentials credentials.TransportCredentials switch endpoint.Scheme { case "http": // If http:// is specified, then use insecure (plaintext). transportCredentials = grpcinsecure.NewCredentials() case "https": transportCredentials = credentials.NewTLS(&tls.Config{InsecureSkipVerify: !cfg.verifyServerCert}) } grpcConn, err := grpc.NewClient( endpoint.Host, grpc.WithTransportCredentials(transportCredentials), grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")), ) cleanup := func() { grpcConn.Close() } if err != nil { return nil, cleanup, fmt.Errorf("cannot create grpc dial context: %w", err) } opts := []otlpmetricgrpc.Option{otlpmetricgrpc.WithGRPCConn(grpcConn)} headers := map[string]string{"Authorization": "ApiKey " + cfg.apiKey} opts = append(opts, otlpmetricgrpc.WithHeaders(headers)) e, err := otlpmetricgrpc.New(ctx, opts...) return e, cleanup, err }