otelcollector/fluent-bit/src/prometheus_collector_health.go (106 lines of code) (raw):

package main import ( "math" "os" "net/http" "sync" "time" "github.com/microsoft/ApplicationInsights-Go/appinsights" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" ) var ( // TimeseriesReceivedTotal adds up the number of timeseries received that is logged by ME in a minute TimeseriesReceivedTotal float64 = 0 // TimeseriesSentTotal adds up the number of timeseries sent that is logged by ME in a minute TimeseriesSentTotal float64 = 0 // BytesSentTotal adds up the number of timeseries sent that is logged by ME in a minute BytesSentTotal float64 = 0 // TimeseriesVolumeTicker tracks the minute-long period for adding up the number of timeseries and bytes logged by ME TimeseriesVolumeTicker *time.Ticker // TimeseriesVolumeMutex handles adding to the timeseries volume totals and setting these values as gauges for Prometheus metrics TimeseriesVolumeMutex = &sync.Mutex{} // ExportingFailedMutex handles if the otelcollector has logged that exporting failed ExportingFailedMutex = &sync.Mutex{} OtelCollectorExportingFailedCount = 0 // timeseriesReceivedMetric is the Prometheus metric measuring the number of timeseries scraped in a minute timeseriesReceivedMetric = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "timeseries_received_per_minute", Help: "Number of timeseries to be sent to storage", }, []string{"computer", "release", "controller_type"}, ) // timeseriesSentMetric is the Prometheus metric measuring the number of timeseries scraped in a minute timeseriesSentMetric = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "timeseries_sent_per_minute", Help: "Number of timeseries sent to storage", }, []string{"computer", "release", "controller_type"}, ) // bytesSentMetric is the Prometheus metric measuring the number of timeseries scraped in a minute bytesSentMetric = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "bytes_sent_per_minute", Help: "Number of bytes of timeseries sent to storage", }, []string{"computer", "release", "controller_type"}, ) // invalidCustomConfigMetric is true if the config provided failed validation and false otherwise invalidCustomConfigMetric = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "invalid_custom_prometheus_config", Help: "If an invalid custom prometheus config was given or not", }, []string{"computer", "release", "controller_type", "error"}, ) // exportingFailedMetric counts the number of times the otelcollector was unable to export to ME exportingFailedMetric = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "exporting_metrics_failed", Help: "If exporting metrics failed or not", }, []string{"computer", "release", "controller_type"}, ) ) const ( prometheusCollectorHealthInterval = 60 prometheusCollectorHealthPort = ":2234" ) // Expose Prometheus metrics about the health of the agent func ExposePrometheusCollectorHealthMetrics() { // A new registry excludes go_* and promhttp_* metrics for the endpoint r := prometheus.NewRegistry() r.MustRegister(timeseriesReceivedMetric) r.MustRegister(timeseriesSentMetric) r.MustRegister(bytesSentMetric) r.MustRegister(invalidCustomConfigMetric) r.MustRegister(exportingFailedMetric) handler := promhttp.HandlerFor(r, promhttp.HandlerOpts{}) http.Handle("/metrics", handler) go func() { TimeseriesVolumeTicker = time.NewTicker(time.Second * time.Duration(prometheusCollectorHealthInterval)) lastTickerStart := time.Now() for ; true; <-TimeseriesVolumeTicker.C { elapsed := time.Since(lastTickerStart) timePassedInMinutes := (float64(elapsed) / float64(time.Second)) / float64(prometheusCollectorHealthInterval) TimeseriesVolumeMutex.Lock() timeseriesReceivedRate := math.Round(TimeseriesReceivedTotal / timePassedInMinutes) timeseriesSentRate := math.Round(TimeseriesSentTotal / timePassedInMinutes) bytesSentRate := math.Round(BytesSentTotal / timePassedInMinutes) timeseriesReceivedMetric.With(prometheus.Labels{"computer":CommonProperties["computer"], "release":CommonProperties["helmreleasename"], "controller_type":CommonProperties["controllertype"]}).Set(timeseriesReceivedRate) timeseriesSentMetric.With(prometheus.Labels{"computer":CommonProperties["computer"], "release":CommonProperties["helmreleasename"], "controller_type":CommonProperties["controllertype"]}).Set(timeseriesSentRate) bytesSentMetric.With(prometheus.Labels{"computer":CommonProperties["computer"], "release":CommonProperties["helmreleasename"], "controller_type":CommonProperties["controllertype"]}).Set(bytesSentRate) TimeseriesReceivedTotal = 0.0 TimeseriesSentTotal = 0.0 BytesSentTotal = 0.0 TimeseriesVolumeMutex.Unlock() isInvalidCustomConfig := 0 invalidConfigErrorString := "" if os.Getenv("AZMON_INVALID_CUSTOM_PROMETHEUS_CONFIG") == "true" { isInvalidCustomConfig = 1 invalidConfigErrorString = os.Getenv("INVALID_CONFIG_FATAL_ERROR") } invalidCustomConfigMetric.With(prometheus.Labels{"computer":CommonProperties["computer"], "release":CommonProperties["helmreleasename"], "controller_type":CommonProperties["controllertype"], "error":invalidConfigErrorString}).Set(float64(isInvalidCustomConfig)) ExportingFailedMutex.Lock() exportingFailedMetric.With(prometheus.Labels{"computer":CommonProperties["computer"], "release":CommonProperties["helmreleasename"], "controller_type":CommonProperties["controllertype"]}).Add(float64(OtelCollectorExportingFailedCount)) OtelCollectorExportingFailedCount = 0 ExportingFailedMutex.Unlock() lastTickerStart = time.Now() } }() err := http.ListenAndServe(prometheusCollectorHealthPort, nil) if err != nil { Log("Error for Prometheus Collector Health endpoint: %s", err.Error()) exception := appinsights.NewExceptionTelemetry(err.Error()) TelemetryClient.Track(exception) } }