pkg/export/bench/app/example_app.go (273 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"flag"
"io"
"log"
"math/rand"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
addr = flag.String("listen-address", ":8080", "The address to listen on for HTTP requests.")
cpuBurnOps = flag.Int("cpu-burn-ops", 0, "Operatins per second burning CPU. (Used to simulate high CPU utilization. Sensible values: 0-100.)")
memBallastMBs = flag.Int("memory-ballast-mbs", 0, "Megabytes of memory ballast to allocate. (Used to simulate high memory utilization.)")
maxCount = flag.Int("max-count", labelCombinations, "Maximum metric instance count for all metric types.")
histogramCount = flag.Int("histogram-count", -1, "Number of unique instances per histogram metric.")
gaugeCount = flag.Int("gauge-count", -1, "Number of unique instances per gauge metric.")
counterCount = flag.Int("counter-count", -1, "Number of unique instances per counter metric.")
summaryCount = flag.Int("summary-count", -1, "Number of unique instances per summary metric.")
)
var (
availableLabels = map[string][]string{
"method": {
"POST",
"PUT",
"GET",
},
"status": {
"200",
"300",
"400",
"404",
"500",
},
"path": {
"/",
"/index",
"/topics",
"/topics:new",
"/topics/<id>",
"/topics/<id>/comment",
"/topics/<id>/comment:create",
"/topics/<id>/comment:edit",
"/imprint",
},
}
labelCombinations = len(availableLabels["method"]) * len(availableLabels["status"]) * len(availableLabels["path"])
)
var (
metricIncomingRequestsPending = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "example_incoming_requests_pending",
Help: "Number of incoming requests currently pending.",
},
[]string{"status", "method", "path"},
)
metricOutgoingRequestsPending = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "example_outgoing_requests_pending",
Help: "Number of outgoing requests currently pending.",
},
[]string{"status", "method", "path"},
)
metricIncomingRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "example_incoming_requests_total",
Help: "Total number of incoming requests.",
},
[]string{"status", "method", "path"},
)
metricOutgoingRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "example_outgoing_requests_total",
Help: "Total number of outgoing requests.",
},
[]string{"status", "method", "path"},
)
metricIncomingRequestErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "example_incoming_request_errors_total",
Help: "Total number of errors on incoming requests.",
},
[]string{"status", "method", "path"},
)
metricOutgoingRequestErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "example_outgoing_request_errors_total",
Help: "Total number of errors on outgoing requests.",
},
[]string{"status", "method", "path"},
)
metricIncomingRequestDurationHistogram = prometheus.NewHistogramVec(
//nolint:promlinter // Test metric.
prometheus.HistogramOpts{
Name: "example_histogram_incoming_request_duration",
Buckets: prometheus.LinearBuckets(0, 100, 8),
Help: "Distribution of incoming request latencies.",
},
[]string{"status", "method", "path"},
)
metricOutgoingRequestDurationHistogram = prometheus.NewHistogramVec(
//nolint:promlinter // Test metric.
prometheus.HistogramOpts{
Name: "example_histogram_outgoing_request_duration",
Buckets: prometheus.LinearBuckets(0, 100, 8),
Help: "Distribution of outgoing request latencies.",
},
[]string{"status", "method", "path"},
)
metricIncomingRequestDurationSummary = prometheus.NewSummaryVec(
//nolint:promlinter // Test metric.
prometheus.SummaryOpts{
Name: "example_summary_incoming_request_duration",
Help: "Summary of incoming request latencies.",
},
[]string{"status", "method", "path"},
)
metricOutgoingRequestDurationSummary = prometheus.NewSummaryVec(
//nolint:promlinter // Test metric.
prometheus.SummaryOpts{
Name: "example_summary_outgoing_request_duration",
Help: "Summary of outgoing request latencies.",
},
[]string{"status", "method", "path"},
)
)
func main() {
flag.Parse()
metrics := prometheus.NewRegistry()
metrics.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
metricIncomingRequestsPending,
metricOutgoingRequestsPending,
metricIncomingRequests,
metricOutgoingRequests,
metricIncomingRequestErrors,
metricOutgoingRequestErrors,
metricIncomingRequestDurationHistogram,
metricOutgoingRequestDurationHistogram,
metricIncomingRequestDurationSummary,
metricOutgoingRequestDurationSummary,
)
var memoryBallast []byte
allocateMemoryBallast(&memoryBallast, *memBallastMBs*1000*1000)
var g run.Group
{
// Termination handler.
term := make(chan os.Signal, 1)
cancel := make(chan struct{})
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
g.Add(
func() error {
select {
case <-term:
log.Println("Received SIGTERM, exiting gracefully...")
case <-cancel:
}
return nil
},
func(error) {
close(cancel)
},
)
}
{
server := &http.Server{Addr: *addr}
http.Handle("/metrics", promhttp.HandlerFor(metrics, promhttp.HandlerOpts{Registry: metrics}))
g.Add(func() error {
return server.ListenAndServe()
}, func(error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
if err := server.Shutdown(ctx); err != nil {
log.Printf("Server failed to shut down gracefully: %s", err)
}
cancel()
})
}
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(
func() error {
return burnCPU(ctx, *cpuBurnOps)
},
func(error) {
cancel()
},
)
}
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(
func() error {
return updateMetrics(ctx)
},
func(error) {
cancel()
},
)
}
if err := g.Run(); err != nil {
log.Println("Exit with error", err)
os.Exit(1)
}
}
func allocateMemoryBallast(buf *[]byte, sz int) {
// Fill memory ballast. Fill it with random values so it results in actual memory usage.
*buf = make([]byte, sz)
_, err := io.ReadFull(rand.New(rand.NewSource(0)), *buf)
if err != nil {
panic(err)
}
}
// burnCPU burns the given percentage of CPU of a single core.
func burnCPU(ctx context.Context, ops int) error {
for {
// Burn some CPU proportional to the input ops.
// This must be fixed work, i.e. we cannot spin for a fraction of scheduling will
// greatly affect how many times we spin, even without high CPU utilization.
//nolint:revive // Intentionally empty block
for range ops * 20000000 {
}
// Wait for some time inversely proportional to the input opts.
// The constants are picked empirically. Spin and wait time must both depend
// on the input ops for them to result in linearly scaleing CPU usage.
select {
case <-ctx.Done():
return nil
case <-time.After(time.Duration(100-ops) * 5 * time.Millisecond):
// default:
}
}
}
func updateMetrics(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(100 * time.Millisecond):
forNumInstances(*gaugeCount, func(labels prometheus.Labels) {
metricIncomingRequestsPending.With(labels).Set(float64(rand.Intn(200)))
metricOutgoingRequestsPending.With(labels).Set(float64(rand.Intn(200)))
})
forNumInstances(*counterCount, func(labels prometheus.Labels) {
metricIncomingRequests.With(labels).Add(float64(rand.Intn(200)))
metricOutgoingRequests.With(labels).Add(float64(rand.Intn(100)))
metricIncomingRequestErrors.With(labels).Add(float64(rand.Intn(15)))
metricOutgoingRequestErrors.With(labels).Add(float64(rand.Intn(5)))
})
forNumInstances(*histogramCount, func(labels prometheus.Labels) {
metricIncomingRequestDurationHistogram.With(labels).Observe(rand.NormFloat64()*300 + 500)
metricOutgoingRequestDurationHistogram.With(labels).Observe(rand.NormFloat64()*200 + 300)
})
forNumInstances(*summaryCount, func(labels prometheus.Labels) {
metricIncomingRequestDurationSummary.With(labels).Observe(rand.NormFloat64()*300 + 500)
metricOutgoingRequestDurationSummary.With(labels).Observe(rand.NormFloat64()*200 + 300)
})
}
}
}
func forNumInstances(c int, f func(prometheus.Labels)) {
if c < 0 {
c = *maxCount
}
for _, path := range availableLabels["path"] {
for _, status := range availableLabels["status"] {
for _, method := range availableLabels["method"] {
if c <= 0 {
return
}
f(prometheus.Labels{
"path": path,
"status": status,
"method": method,
})
c--
}
}
}
}