pkg/export/bench/app/example_app.go (273 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "context" "flag" "io" "log" "math/rand" "net/http" _ "net/http/pprof" "os" "os/signal" "syscall" "time" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" ) var ( addr = flag.String("listen-address", ":8080", "The address to listen on for HTTP requests.") cpuBurnOps = flag.Int("cpu-burn-ops", 0, "Operatins per second burning CPU. (Used to simulate high CPU utilization. Sensible values: 0-100.)") memBallastMBs = flag.Int("memory-ballast-mbs", 0, "Megabytes of memory ballast to allocate. (Used to simulate high memory utilization.)") maxCount = flag.Int("max-count", labelCombinations, "Maximum metric instance count for all metric types.") histogramCount = flag.Int("histogram-count", -1, "Number of unique instances per histogram metric.") gaugeCount = flag.Int("gauge-count", -1, "Number of unique instances per gauge metric.") counterCount = flag.Int("counter-count", -1, "Number of unique instances per counter metric.") summaryCount = flag.Int("summary-count", -1, "Number of unique instances per summary metric.") ) var ( availableLabels = map[string][]string{ "method": { "POST", "PUT", "GET", }, "status": { "200", "300", "400", "404", "500", }, "path": { "/", "/index", "/topics", "/topics:new", "/topics/<id>", "/topics/<id>/comment", "/topics/<id>/comment:create", "/topics/<id>/comment:edit", "/imprint", }, } labelCombinations = len(availableLabels["method"]) * len(availableLabels["status"]) * len(availableLabels["path"]) ) var ( metricIncomingRequestsPending = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "example_incoming_requests_pending", Help: "Number of incoming requests currently pending.", }, []string{"status", "method", "path"}, ) metricOutgoingRequestsPending = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "example_outgoing_requests_pending", Help: "Number of outgoing requests currently pending.", }, []string{"status", "method", "path"}, ) metricIncomingRequests = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "example_incoming_requests_total", Help: "Total number of incoming requests.", }, []string{"status", "method", "path"}, ) metricOutgoingRequests = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "example_outgoing_requests_total", Help: "Total number of outgoing requests.", }, []string{"status", "method", "path"}, ) metricIncomingRequestErrors = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "example_incoming_request_errors_total", Help: "Total number of errors on incoming requests.", }, []string{"status", "method", "path"}, ) metricOutgoingRequestErrors = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "example_outgoing_request_errors_total", Help: "Total number of errors on outgoing requests.", }, []string{"status", "method", "path"}, ) metricIncomingRequestDurationHistogram = prometheus.NewHistogramVec( //nolint:promlinter // Test metric. prometheus.HistogramOpts{ Name: "example_histogram_incoming_request_duration", Buckets: prometheus.LinearBuckets(0, 100, 8), Help: "Distribution of incoming request latencies.", }, []string{"status", "method", "path"}, ) metricOutgoingRequestDurationHistogram = prometheus.NewHistogramVec( //nolint:promlinter // Test metric. prometheus.HistogramOpts{ Name: "example_histogram_outgoing_request_duration", Buckets: prometheus.LinearBuckets(0, 100, 8), Help: "Distribution of outgoing request latencies.", }, []string{"status", "method", "path"}, ) metricIncomingRequestDurationSummary = prometheus.NewSummaryVec( //nolint:promlinter // Test metric. prometheus.SummaryOpts{ Name: "example_summary_incoming_request_duration", Help: "Summary of incoming request latencies.", }, []string{"status", "method", "path"}, ) metricOutgoingRequestDurationSummary = prometheus.NewSummaryVec( //nolint:promlinter // Test metric. prometheus.SummaryOpts{ Name: "example_summary_outgoing_request_duration", Help: "Summary of outgoing request latencies.", }, []string{"status", "method", "path"}, ) ) func main() { flag.Parse() metrics := prometheus.NewRegistry() metrics.MustRegister( collectors.NewGoCollector(), collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), metricIncomingRequestsPending, metricOutgoingRequestsPending, metricIncomingRequests, metricOutgoingRequests, metricIncomingRequestErrors, metricOutgoingRequestErrors, metricIncomingRequestDurationHistogram, metricOutgoingRequestDurationHistogram, metricIncomingRequestDurationSummary, metricOutgoingRequestDurationSummary, ) var memoryBallast []byte allocateMemoryBallast(&memoryBallast, *memBallastMBs*1000*1000) var g run.Group { // Termination handler. term := make(chan os.Signal, 1) cancel := make(chan struct{}) signal.Notify(term, os.Interrupt, syscall.SIGTERM) g.Add( func() error { select { case <-term: log.Println("Received SIGTERM, exiting gracefully...") case <-cancel: } return nil }, func(error) { close(cancel) }, ) } { server := &http.Server{Addr: *addr} http.Handle("/metrics", promhttp.HandlerFor(metrics, promhttp.HandlerOpts{Registry: metrics})) g.Add(func() error { return server.ListenAndServe() }, func(error) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) if err := server.Shutdown(ctx); err != nil { log.Printf("Server failed to shut down gracefully: %s", err) } cancel() }) } { ctx, cancel := context.WithCancel(context.Background()) g.Add( func() error { return burnCPU(ctx, *cpuBurnOps) }, func(error) { cancel() }, ) } { ctx, cancel := context.WithCancel(context.Background()) g.Add( func() error { return updateMetrics(ctx) }, func(error) { cancel() }, ) } if err := g.Run(); err != nil { log.Println("Exit with error", err) os.Exit(1) } } func allocateMemoryBallast(buf *[]byte, sz int) { // Fill memory ballast. Fill it with random values so it results in actual memory usage. *buf = make([]byte, sz) _, err := io.ReadFull(rand.New(rand.NewSource(0)), *buf) if err != nil { panic(err) } } // burnCPU burns the given percentage of CPU of a single core. func burnCPU(ctx context.Context, ops int) error { for { // Burn some CPU proportional to the input ops. // This must be fixed work, i.e. we cannot spin for a fraction of scheduling will // greatly affect how many times we spin, even without high CPU utilization. //nolint:revive // Intentionally empty block for range ops * 20000000 { } // Wait for some time inversely proportional to the input opts. // The constants are picked empirically. Spin and wait time must both depend // on the input ops for them to result in linearly scaleing CPU usage. select { case <-ctx.Done(): return nil case <-time.After(time.Duration(100-ops) * 5 * time.Millisecond): // default: } } } func updateMetrics(ctx context.Context) error { for { select { case <-ctx.Done(): return nil case <-time.After(100 * time.Millisecond): forNumInstances(*gaugeCount, func(labels prometheus.Labels) { metricIncomingRequestsPending.With(labels).Set(float64(rand.Intn(200))) metricOutgoingRequestsPending.With(labels).Set(float64(rand.Intn(200))) }) forNumInstances(*counterCount, func(labels prometheus.Labels) { metricIncomingRequests.With(labels).Add(float64(rand.Intn(200))) metricOutgoingRequests.With(labels).Add(float64(rand.Intn(100))) metricIncomingRequestErrors.With(labels).Add(float64(rand.Intn(15))) metricOutgoingRequestErrors.With(labels).Add(float64(rand.Intn(5))) }) forNumInstances(*histogramCount, func(labels prometheus.Labels) { metricIncomingRequestDurationHistogram.With(labels).Observe(rand.NormFloat64()*300 + 500) metricOutgoingRequestDurationHistogram.With(labels).Observe(rand.NormFloat64()*200 + 300) }) forNumInstances(*summaryCount, func(labels prometheus.Labels) { metricIncomingRequestDurationSummary.With(labels).Observe(rand.NormFloat64()*300 + 500) metricOutgoingRequestDurationSummary.With(labels).Observe(rand.NormFloat64()*200 + 300) }) } } } func forNumInstances(c int, f func(prometheus.Labels)) { if c < 0 { c = *maxCount } for _, path := range availableLabels["path"] { for _, status := range availableLabels["status"] { for _, method := range availableLabels["method"] { if c <= 0 { return } f(prometheus.Labels{ "path": path, "status": status, "method": method, }) c-- } } } }