pkg/export/gcm/promtest/local_export.go (155 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promtest
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"testing"
"time"
gcm "cloud.google.com/go/monitoring/apiv3/v2"
"github.com/efficientgo/e2e"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"golang.org/x/oauth2"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"github.com/GoogleCloudPlatform/prometheus-engine/pkg/export"
"github.com/go-kit/log"
"golang.org/x/oauth2/google"
)
type localExportWithGCM struct {
gcmSA []byte
e *export.Exporter
// NOTE(bwplotka): Not guarded by mutex, so it has to be synced with Exporter.Export.
labelsByRef map[storage.SeriesRef]labels.Labels
}
// LocalExportWithGCM represents locally imported export pkg with GCM as a
// backend. In particular this backend is mimicking our GMP collector, while
// allowing quickest feedback loop for experiment and using IDE debuggers.
//
// In particular this backend uses following data models, which is what our
// Prometheus fork is doing when scraping any scrape target:
//
// * (Exposed to user) Go Prometheus SDK client types e.g. prometheus.NewCounter.
// * Go Prometheus SDK dto (Prometheus proto exposition format https://github.com/prometheus/client_model/blob/master/io/prometheus/client/metrics.proto).
// * Internal Prometheus parser (https://pkg.go.dev/github.com/prometheus/prometheus/pkg/textparse).
// * Internal Prometheus TSDB (head block) dto (e.g. records https://pkg.go.dev/github.com/prometheus/prometheus@v0.47.2/tsdb/record).
// * GCM monitoring API proto ingested by Monarch.
func LocalExportWithGCM(gcmSA []byte) Backend {
return &localExportWithGCM{gcmSA: gcmSA}
}
func (l *localExportWithGCM) Ref() string { return "export-pkg-with-gcm" }
func (l *localExportWithGCM) start(t testing.TB, _ e2e.Environment) (v1.API, map[string]string) {
t.Helper()
ctx, cancel := context.WithCancel(signals.SetupSignalHandler())
t.Cleanup(cancel)
creds, err := google.CredentialsFromJSON(ctx, l.gcmSA, gcm.DefaultAuthScopes()...)
if err != nil {
t.Fatalf("create credentials from JSON: %s", err)
}
l.labelsByRef = map[storage.SeriesRef]labels.Labels{}
cluster := "pe-github-action"
location := "europe-west3-a"
cl, err := api.NewClient(api.Config{
Address: fmt.Sprintf("https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus", creds.ProjectID),
Client: oauth2.NewClient(ctx, creds.TokenSource),
})
if err != nil {
t.Fatalf("create Prometheus client: %s", err)
}
exporterOpts := export.ExporterOpts{
UserAgentEnv: "pe-github-action-test",
Cluster: cluster,
Location: location,
ProjectID: creds.ProjectID,
CredentialsFromJSON: l.gcmSA,
}
exporterOpts.DefaultUnsetFields()
l.e, err = export.New(ctx, log.NewJSONLogger(os.Stderr), nil, exporterOpts, export.NopLease())
if err != nil {
t.Fatalf("create exporter: %v", err)
}
// Apply empty config, so resources labels are attached.
if err := l.e.ApplyConfig(&config.DefaultConfig, nil); err != nil {
t.Fatalf("apply config: %v", err)
}
l.e.SetLabelsByIDFunc(func(ref storage.SeriesRef) labels.Labels {
return l.labelsByRef[ref]
})
go func() {
if err := l.e.Run(); err != nil {
t.Logf("running exporter: %s", err)
}
}()
return v1.NewAPI(cl), map[string]string{
"cluster": cluster,
"location": location,
"project_id": creds.ProjectID,
}
}
func (l *localExportWithGCM) injectScrapes(t testing.TB, scrapeRecordings [][]*dto.MetricFamily, _ time.Duration) {
t.Helper()
for _, mfs := range scrapeRecordings {
// Encode gathered metric family as proto Prometheus exposition format, decode as internal
// Prometheus textparse format to have metrics how Prometheus would have
// before append. We don't use dto straight away due to quite complex code
// for generating multi counter metrics like legacy histograms and summaries.
b := bytes.Buffer{}
enc := expfmt.NewEncoder(&b, expfmt.NewFormat(expfmt.TypeProtoDelim))
for _, mf := range mfs {
if err := enc.Encode(mf); err != nil {
t.Fatal(err)
}
}
if closer, ok := enc.(expfmt.Closer); ok {
if err := closer.Close(); err != nil {
t.Fatal(err)
}
}
tp, err := textparse.New(b.Bytes(), string(expfmt.NewFormat(expfmt.TypeProtoDelim)), true)
if err != nil {
t.Fatal(err)
}
// Iterate over textparse parser results and mimic Prometheus scrape loop
// with exporter.Export injection.
// It's fine to start ref from 0 and clean labelsByRef for every Export invocation,
// as exporter does not need to further (after conversions).
l.labelsByRef = map[storage.SeriesRef]labels.Labels{}
ref := uint64(0)
var (
currMeta export.MetricMetadata
batch []record.RefSample
metadata = map[string]export.MetricMetadata{}
)
for {
et, err := tp.Next()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
t.Fatal(err)
}
switch et {
case textparse.EntryType:
_, currMeta.Type = tp.Type()
continue
case textparse.EntryHelp:
mName, mHelp := tp.Help()
currMeta.Metric, currMeta.Help = string(mName), string(mHelp)
continue
case textparse.EntryUnit:
// Proto format won't give us that anyway.
continue
case textparse.EntryComment:
continue
case textparse.EntryHistogram:
// TODO(bwplotka): Sparse histogram would be here TBD.
panic("not implemented")
default:
}
// TODO(bwplotka): Support exemplars and created timestamp.
t := timestamp.FromTime(time.Now())
_, parsedTimestamp, val := tp.Series()
if parsedTimestamp != nil {
t = *parsedTimestamp
}
metadata[currMeta.Metric] = currMeta
lset := labels.New()
_ = tp.Metric(&lset)
l.labelsByRef[storage.SeriesRef(ref)] = lset
batch = append(batch, record.RefSample{
Ref: chunks.HeadSeriesRef(ref), V: val, T: t,
})
ref++
}
l.e.Export(func(metric string) (export.MetricMetadata, bool) {
m, ok := metadata[metric]
return m, ok
}, batch, nil)
}
}