internal/monitor/otelexporters.go (135 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package monitor import ( "context" "fmt" "net/http" "strings" "time" cloudmetric "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric" "github.com/googlecloudplatform/gcsfuse/v2/cfg" "github.com/googlecloudplatform/gcsfuse/v2/common" "github.com/googlecloudplatform/gcsfuse/v2/internal/logger" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/contrib/detectors/gcp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" ) const serviceName = "gcsfuse" const cloudMonitoringMetricPrefix = "custom.googleapis.com/gcsfuse/" var allowedMetricPrefixes = []string{"fs/", "gcs/", "file_cache/"} // SetupOTelMetricExporters sets up the metrics exporters func SetupOTelMetricExporters(ctx context.Context, c *cfg.Config) (shutdownFn common.ShutdownFn) { shutdownFns := make([]common.ShutdownFn, 0) options := make([]metric.Option, 0) opts, shutdownFn := setupPrometheus(c.Metrics.PrometheusPort) options = append(options, opts...) shutdownFns = append(shutdownFns, shutdownFn) opts, shutdownFn = setupCloudMonitoring(c.Metrics.CloudMetricsExportIntervalSecs) options = append(options, opts...) shutdownFns = append(shutdownFns, shutdownFn) res, err := getResource(ctx) if err != nil { logger.Errorf("Error while fetching resource: %v", err) } else { options = append(options, metric.WithResource(res)) } options = append(options, metric.WithView(dropDisallowedMetricsView)) meterProvider := metric.NewMeterProvider(options...) shutdownFns = append(shutdownFns, meterProvider.Shutdown) otel.SetMeterProvider(meterProvider) return common.JoinShutdownFunc(shutdownFns...) } // dropUnwantedMetricsView is an OTel View that drops the metrics that don't match the allowed prefixes. func dropDisallowedMetricsView(i metric.Instrument) (metric.Stream, bool) { s := metric.Stream{Name: i.Name, Description: i.Description, Unit: i.Unit} for _, prefix := range allowedMetricPrefixes { if strings.HasPrefix(i.Name, prefix) { return s, true } } s.Aggregation = metric.AggregationDrop{} return s, true } func setupCloudMonitoring(secs int64) ([]metric.Option, common.ShutdownFn) { if secs <= 0 { return nil, nil } options := []cloudmetric.Option{ cloudmetric.WithMetricDescriptorTypeFormatter(metricFormatter), cloudmetric.WithFilteredResourceAttributes(func(kv attribute.KeyValue) bool { // Ensure that PID is available as a metric label on metrics explorer as it'll help distinguish between different mounts on the same node. return cloudmetric.DefaultResourceAttributesFilter(kv) || kv.Key == semconv.ProcessPIDKey }), } exporter, err := cloudmetric.New(options...) if err != nil { logger.Errorf("Error while creating Google Cloud exporter:%v", err) return nil, nil } r := metric.NewPeriodicReader(exporter, metric.WithInterval(time.Duration(secs)*time.Second)) return []metric.Option{metric.WithReader(r)}, r.Shutdown } func metricFormatter(m metricdata.Metrics) string { return cloudMonitoringMetricPrefix + strings.ReplaceAll(m.Name, ".", "/") } func setupPrometheus(port int64) ([]metric.Option, common.ShutdownFn) { if port <= 0 { return nil, nil } exporter, err := prometheus.New(prometheus.WithoutUnits(), prometheus.WithoutCounterSuffixes(), prometheus.WithoutScopeInfo(), prometheus.WithoutTargetInfo()) if err != nil { logger.Errorf("Error while creating prometheus exporter:%v", err) return nil, nil } shutdownCh := make(chan context.Context) done := make(chan interface{}) go serveMetrics(port, shutdownCh, done) return []metric.Option{metric.WithReader(exporter)}, func(ctx context.Context) error { shutdownCh <- ctx close(shutdownCh) <-done close(done) return nil } } func serveMetrics(port int64, shutdownCh <-chan context.Context, done chan<- interface{}) { logger.Infof("Serving metrics at localhost:%d/metrics", port) mux := http.NewServeMux() mux.Handle("/metrics", promhttp.Handler()) prometheusServer := &http.Server{ Addr: fmt.Sprintf(":%d", port), Handler: mux, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, MaxHeaderBytes: 1 << 20, } go func() { if err := prometheusServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Errorf("Failed to start Prometheus server: %v", err) } }() go func() { ctx := <-shutdownCh defer func() { done <- true }() logger.Info("Shutting down Prometheus exporter.") if err := prometheusServer.Shutdown(ctx); err != nil { logger.Errorf("Error while shutting down Prometheus exporter:%v", err) return } logger.Info("Prometheus exporter shutdown") }() logger.Info("Prometheus collector exporter started") } func getResource(ctx context.Context) (*resource.Resource, error) { return resource.New(ctx, // Use the GCP resource detector to detect information about the GCP platform resource.WithDetectors(gcp.NewDetector()), resource.WithTelemetrySDK(), resource.WithAttributes( semconv.ServiceName(serviceName), semconv.ServiceVersion(common.GetVersion()), ), ) }