pkg/export/gcm/promtest/prometheus.go (143 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package promtest import ( "context" "fmt" "net" "net/http" "os" "path/filepath" "strconv" "sync" "testing" "time" "github.com/efficientgo/e2e" e2emon "github.com/efficientgo/e2e/monitoring" "github.com/go-kit/log" "github.com/prometheus/client_golang/api" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/client_golang/prometheus/promhttp" dto "github.com/prometheus/client_model/go" "github.com/thanos-io/thanos/pkg/runutil" ) type promBackend struct { image string g *recordGatherer p *e2emon.Prometheus } func Prometheus(image string) Backend { return &promBackend{image: image} } func (p *promBackend) Ref() string { return "prometheus" } // recordGatherer is a prometheus.Gatherer capable to "play" the recorded metric state // with fixed timestamps to backfill data into Prometheus compatible system. type recordGatherer struct { i int plannedScrapes [][]*dto.MetricFamily mu sync.Mutex } func (g *recordGatherer) Gather() (ret []*dto.MetricFamily, _ error) { g.mu.Lock() ret = nil if g.i > -1 && g.i < len(g.plannedScrapes) { ret = g.plannedScrapes[g.i] g.i++ } g.mu.Unlock() return ret, nil } func (p *promBackend) start(t testing.TB, env e2e.Environment) (v1.API, map[string]string) { t.Helper() const name = "prometheus" m := http.NewServeMux() p.g = &recordGatherer{i: -1} m.Handle("/metrics", promhttp.HandlerFor(p.g, promhttp.HandlerOpts{ EnableOpenMetrics: true, })) // Listen on all addresses, since we need to connect to it from docker container. list, err := net.Listen("tcp", "0.0.0.0:0") if err != nil { t.Fatal(err) } _, port, err := net.SplitHostPort(list.Addr().String()) if err != nil { t.Fatal(err) } s := http.Server{Handler: m} go func() { _ = s.Serve(list) }() env.AddCloser(func() { _ = s.Close() }) p.p = newPrometheus(env, name, p.image, net.JoinHostPort(env.HostAddr(), port), nil) if err := e2e.StartAndWaitReady(p.p); err != nil { t.Fatalf("can't start Prometheus: %v", err) } cl, err := api.NewClient(api.Config{Address: "http://" + p.p.Endpoint("http")}) if err != nil { t.Fatalf("create Prometheus client: %s", err) } return v1.NewAPI(cl), map[string]string{"job": "test"} } func newPrometheus(env e2e.Environment, name string, image string, scrapeTargetAddress string, flagOverride map[string]string) *e2emon.Prometheus { ports := map[string]int{"http": 9090} f := env.Runnable(name).WithPorts(ports).Future() config := fmt.Sprintf(` global: external_labels: collector: %v scrape_configs: - job_name: 'test' scrape_interval: 5s scrape_timeout: 5s static_configs: - targets: [%s] metric_relabel_configs: - regex: instance action: labeldrop `, name, scrapeTargetAddress) if err := os.WriteFile(filepath.Join(f.Dir(), "prometheus.yml"), []byte(config), 0600); err != nil { return &e2emon.Prometheus{Runnable: e2e.NewFailedRunnable(name, fmt.Errorf("create prometheus config failed: %w", err))} } args := map[string]string{ "--web.listen-address": fmt.Sprintf(":%d", ports["http"]), "--config.file": filepath.Join(f.Dir(), "prometheus.yml"), "--storage.tsdb.path": f.Dir(), "--enable-feature=exemplar-storage": "", "--enable-feature=native-histograms": "", "--storage.tsdb.no-lockfile": "", "--storage.tsdb.retention.time": "1d", "--storage.tsdb.wal-compression": "", "--storage.tsdb.min-block-duration": "2h", "--storage.tsdb.max-block-duration": "2h", "--web.enable-lifecycle": "", "--log.format": "json", "--log.level": "info", } if flagOverride != nil { args = e2e.MergeFlagsWithoutRemovingEmpty(args, flagOverride) } p := e2emon.AsInstrumented(f.Init(e2e.StartOptions{ Image: image, Command: e2e.NewCommandWithoutEntrypoint("prometheus", e2e.BuildArgs(args)...), Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200), User: strconv.Itoa(os.Getuid()), }), "http") return &e2emon.Prometheus{ Runnable: p, Instrumented: p, } } func (p *promBackend) injectScrapes(t testing.TB, scrapeRecordings [][]*dto.MetricFamily, timeout time.Duration) { t.Helper() p.g.mu.Lock() p.g.i = 0 p.g.plannedScrapes = scrapeRecordings p.g.mu.Unlock() ctx, cancel := context.WithTimeout(t.Context(), timeout) t.Cleanup(cancel) if err := runutil.RetryWithLog(log.NewJSONLogger(os.Stderr), 10*time.Second, ctx.Done(), func() error { p.g.mu.Lock() iter := p.g.i p.g.mu.Unlock() if iter < len(p.g.plannedScrapes) { return fmt.Errorf("backend didn't scrape the target enough number of times, got %v, expected %v", iter, len(p.g.plannedScrapes)) } return nil }); err != nil { t.Fatal(t.Name(), err, "within expected time", timeout) } }