pkg/export/gcm/promtest/promtest.go (218 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package promtest import ( "context" "errors" "fmt" "math/rand" "os" "strings" "testing" "time" "github.com/efficientgo/e2e" "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/google/go-cmp/cmp" "github.com/oklog/ulid" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/timestamp" "github.com/thanos-io/thanos/pkg/runutil" ) // GCMServiceAccountOrFail gets the Google SA JSON content from GCM_SECRET // environment variable or fails. func GCMServiceAccountOrFail(t testing.TB) []byte { // TODO(bwplotka): Move it to https://cloud.google.com/build CI. saJSON := []byte(os.Getenv("GCM_SECRET")) if len(saJSON) == 0 { t.Fatal("newExportGCM: no GCM_SECRET env var provided, can't run the test") } return saJSON } // ingestionTest represents tests that allows validating Prometheus "backend", so // scraper (agent/collector) and DB that serves PromQL e.g.: // * export pkg code in this repo and GCM. // * GMP collector (built export pkg attached to Prometheus fork) for ingestion and querying. // * GMP collector (built export pkg attached to Prometheus fork) and GCM for querying. // * Prometheus. // // This is essentially a simplified compliance tests focused on GCM case and with // quick feedback loop (compared to hours with https://github.com/prometheus/compliance/tree/main/promql). type ingestionTest struct { t testing.TB testID string backends map[string]backend expectationsPerBackend map[string]model.Matrix currTime time.Time } type backend struct { api v1.API extLset map[string]string b Backend } // NewIngestionTest takes testing T and returns test instance as well as registry // that can be used to register test metrics. // // NOTE(bwplotka): All test functionality fails tests on error (instead of returning // error). // //nolint:revive // Intentional unexported return func NewIngestionTest(t testing.TB, backends []Backend) *ingestionTest { t.Helper() // TODO(bwplotka): Consider lazy creation of docker env? e, err := e2e.New() t.Cleanup(e.Close) if err != nil { t.Fatal(err) } it := &ingestionTest{ t: t, // NOTE(bwplotka): Cardinality is obvious, but even for 100 test runs a day with // 100 cases, 10 samples each, that's "only" 10k series / 0.1 million samples a day. testID: fmt.Sprintf("%v: %v", t.Name(), ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano()))).String()), // 1h in the past as Monarch allows 24h, but Prometheus allows 2h (plus some buffer). // It should give buffer for a fair amount of samples from -1h to now. // TODO(bwplotka): This won't work with GMP collector that has 10m block size. // This means when injecting sample we have to wait a bit or move this -1h // to -8m rather. currTime: time.Now().Add(-1 * time.Hour), backends: map[string]backend{}, expectationsPerBackend: map[string]model.Matrix{}, } for _, b := range backends { api, extLset := b.start(t, e) it.backends[b.Ref()] = backend{b: b, api: api, extLset: extLset} } if len(it.backends) == 0 { t.Fatal("no backends specified, at least has to be passed e.g. promtest.Prometheus or promtest.LocalExportWithGCM") } return it } type Backend interface { Ref() string start(t testing.TB, env e2e.Environment) (api v1.API, extraLset map[string]string) injectScrapes(t testing.TB, scrapeRecordings [][]*dto.MetricFamily, timeout time.Duration) } type ExpectationsRecorder interface { Expect(val float64, metric prometheus.Metric, b Backend) ExpectationsRecorder // TODO(bwplotka): Add histogram support. } type Scrape func(after time.Duration) ExpectationsRecorder // RecordScrapes allows recording SDK and expected samples for interesting cases. func (it *ingestionTest) RecordScrapes(recordingFunc func(r prometheus.Registerer, scrape Scrape)) { fmt.Printf("%s Recording scrapes for test=%q\n", it.t.Name(), it.testID) r := prometheus.NewRegistry() var scrapeRecordings [][]*dto.MetricFamily recordingFunc(prometheus.WrapRegistererWith(map[string]string{"test": it.testID}, r), func(after time.Duration) ExpectationsRecorder { it.t.Helper() if after <= 0 { it.t.Fatal(errors.New("scrape 'after' parameter can't be negative or zero")) } if it.currTime.Add(after).After(time.Now()) { it.t.Fatal(errors.New("sum of all scrape 'after' parameters can be beyond 10 hours")) } it.currTime = it.currTime.Add(after) mfs, err := r.Gather() if err != nil { it.t.Fatal(err) } // Inject a fixed timestamps as a way to backfill/inject samples with // larger (e.g. 30 seconds) intervals without waiting. for _, mf := range mfs { for _, m := range mf.GetMetric() { m.TimestampMs = proto.Int64(timestamp.FromTime(it.currTime)) } } scrapeRecordings = append(scrapeRecordings, mfs) return &ingestionTestExpRecorder{it: it, currTime: it.currTime} }) // Once recorded, let's inject those to our backends. // This might take a while. // TODO(bwplotka): Potential for concurrency here. for _, b := range it.backends { fmt.Printf("%s Injecting samples to %v backend\n", it.t.Name(), b.b.Ref()) b.b.injectScrapes(it.t, scrapeRecordings, 2*time.Minute) } } type ingestionTestExpRecorder struct { it *ingestionTest currTime time.Time } func (ir *ingestionTestExpRecorder) Expect(val float64, metric prometheus.Metric, b Backend) ExpectationsRecorder { m := dto.Metric{} //nolint:errcheck metric.Write(&m) if m.GetSummary() != nil || m.GetHistogram() != nil { // TODO(bwplotka): Implement an alternative. ir.it.t.Fatal("It's not practical to use Expect against histograms and summaries.") } modelMetric := toModelMetric(metric) for _, ss := range ir.it.expectationsPerBackend[b.Ref()] { if ss.Metric.Equal(modelMetric) { ss.Values = append(ss.Values, model.SamplePair{ Timestamp: model.TimeFromUnixNano(ir.currTime.UnixNano()), Value: model.SampleValue(val), }) return ir } } // More labels, like external labels (cluster etc.) and test labels will be // injected during expectGCMResults call. The same with sorting. ir.it.expectationsPerBackend[b.Ref()] = append(ir.it.expectationsPerBackend[b.Ref()], &model.SampleStream{ Metric: modelMetric, Values: []model.SamplePair{{ Timestamp: model.TimeFromUnixNano(ir.currTime.UnixNano()), Value: model.SampleValue(val), }}, }) return ir } func (it *ingestionTest) preparedExpectedMatrix(exp model.Matrix, metric model.Metric, extLabels map[string]string) model.Matrix { m := metric.Clone() m["test"] = model.LabelValue(it.testID) for k, v := range extLabels { m[model.LabelName(k)] = model.LabelValue(v) } for _, e := range exp { if !e.Metric.Equal(metric) { continue } return model.Matrix{{ Metric: m, Values: e.Values, }} } return nil } // FatalOnUnexpectedPromQLResults fails the test if gathered expected samples for given non histogram, // non-summary metric does not match <metric>[2h] samples from given backend PromQL API for // instant query. func (it *ingestionTest) FatalOnUnexpectedPromQLResults(b Backend, metric prometheus.Metric, timeout time.Duration) { it.t.Helper() m := dto.Metric{} //nolint:errcheck metric.Write(&m) if m.GetSummary() != nil || m.GetHistogram() != nil { // TODO(bwplotka): Implement alternative. it.t.Fatal("It's not practical to use FatalOnUnexpectedPromQLResults against histograms and summaries.") } bMeta, ok := it.backends[b.Ref()] if !ok { it.t.Fatalf("%s backend not seen before? Did you pass it in NewIngestionTest?", b.Ref()) } modelMetric := toModelMetric(metric) exp := it.preparedExpectedMatrix(it.expectationsPerBackend[b.Ref()], modelMetric, bMeta.extLset) if exp == nil { it.t.Fatalf("expected metric %v, not found in expected Matrix. Did you use scrape(...).expect(...) method?", modelMetric.String()) } modelMetric["test"] = model.LabelValue(it.testID) query := fmt.Sprintf(`%s[10h]`, modelMetric.String()) fmt.Printf("%s Checking if PromQL instant query for %v matches expected samples for %v backend\n", it.t.Name(), query, b.Ref()) ctx, cancel := context.WithTimeout(context.Background(), timeout) it.t.Cleanup(cancel) var lastDiff string var sameDiffTimes int if err := runutil.RetryWithLog(log.NewJSONLogger(os.Stderr), 10*time.Second, ctx.Done(), func() error { value, warns, err := bMeta.api.Query(ctx, query, it.currTime.Add(1*time.Second)) if err != nil { return fmt.Errorf("instant query %s for %v %w", query, it.currTime.Add(1*time.Second), err) } if len(warns) > 0 { fmt.Println(it.t.Name(), "Warnings:", warns) } if value.Type() != model.ValMatrix { return fmt.Errorf("expected matrix, got %v", value.Type()) } if cmp.Equal(exp, value.(model.Matrix)) { return nil } diff := cmp.Diff(exp, value.(model.Matrix)) if lastDiff == diff { if sameDiffTimes > 3 { // Likely nothing will change, abort. fmt.Println(it.t.Name(), lastDiff) it.t.Error(errors.New("resulted Matrix is different than expected (see printed diff)")) return nil } sameDiffTimes++ } else { lastDiff = diff sameDiffTimes = 0 } return errors.New("resulted Matrix is different than expected (diff, if any, will be printed at the end)") }); err != nil { if lastDiff != "" { fmt.Println(it.t.Name(), lastDiff) } it.t.Error(err) } } func toModelMetric(metric prometheus.Metric) model.Metric { m := dto.Metric{} //nolint:errcheck metric.Write(&m) ret := model.Metric{} for _, p := range m.Label { ret[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue()) } // Write gives us only labels, not metric name. Since we use internal SDK methods // (although public) we have to hack Desc() to get full metric name. // TODO(bwplotka): Add public GetName() to Desc. name := strings.TrimPrefix(metric.Desc().String(), "Desc{fqName: \"") i := strings.Index(name, "\"") ret[model.MetricNameLabel] = model.LabelValue(name[:i]) return ret }