systemtest/benchtest/clients.go (132 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package benchtest import ( "context" "crypto/tls" "io/fs" "net/url" "path/filepath" "testing" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.uber.org/zap" "golang.org/x/time/rate" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "github.com/elastic/apm-perf/loadgen" loadgencfg "github.com/elastic/apm-perf/loadgen/config" "github.com/elastic/apm-perf/loadgen/eventhandler" "go.elastic.co/apm/v2" "go.elastic.co/apm/v2/transport" ) // NewTracer returns a new Elastic APM tracer, configured // to send to the target APM Server. func NewTracer(tb testing.TB) *apm.Tracer { httpTransport, err := transport.NewHTTPTransport(transport.HTTPTransportOptions{ ServerURLs: []*url.URL{loadgencfg.Config.ServerURL}, SecretToken: loadgencfg.Config.SecretToken, }) if err != nil { tb.Fatal(err) } tracer, err := apm.NewTracerOptions(apm.TracerOptions{ Transport: httpTransport, }) if err != nil { tb.Fatal(err) } tb.Cleanup(tracer.Close) return tracer } // NewOTLPExporter returns a new OpenTelemetry Go exporter, configured // to export to the target APM Server. func NewOTLPExporter(tb testing.TB) *otlptrace.Exporter { serverURL := loadgencfg.Config.ServerURL secretToken := loadgencfg.Config.SecretToken endpoint := serverURL.Host if serverURL.Port() == "" { switch serverURL.Scheme { case "http": endpoint += ":80" case "https": endpoint += ":443" } } opts := []otlptracegrpc.Option{ otlptracegrpc.WithEndpoint(endpoint), otlptracegrpc.WithDialOption(grpc.WithBlock()), } if secretToken != "" { opts = append(opts, otlptracegrpc.WithHeaders(map[string]string{ "Authorization": "Bearer " + secretToken, })) } if serverURL.Scheme == "http" { opts = append(opts, otlptracegrpc.WithInsecure()) } else { tlsCredentials := credentials.NewTLS(&tls.Config{ InsecureSkipVerify: true, }) opts = append(opts, otlptracegrpc.WithTLSCredentials(tlsCredentials)) } exporter, err := otlptracegrpc.New(context.Background(), opts...) if err != nil { tb.Fatal(err) } tb.Cleanup(func() { exporter.Shutdown(context.Background()) }) return exporter } // NewEventHandler creates a eventhandler which loads the files matching the // passed regex. // // It has to use loadgen.NewEventHandler as it has access to the private `events` FS Storage. func NewEventHandler(tb testing.TB, p string, l *rate.Limiter) *eventhandler.Handler { serverCfg := loadgencfg.Config h, err := loadgen.NewEventHandler(loadgen.EventHandlerParams{ Logger: zap.NewNop(), Protocol: "apm/http", Path: p, URL: serverCfg.ServerURL.String(), Token: serverCfg.SecretToken, Limiter: l, RewriteIDs: serverCfg.RewriteIDs, RewriteTimestamps: serverCfg.RewriteTimestamps, Headers: serverCfg.Headers, }) if err != nil { tb.Fatal(err) } return h } // NewFSEventHandler creates an eventhandler which loads the files matching the // passed regex in fs. func NewFSEventHandler(tb testing.TB, p string, l *rate.Limiter, fs fs.FS) *eventhandler.Handler { serverCfg := loadgencfg.Config h, err := newFSEventHandler(loadgen.EventHandlerParams{ Logger: zap.NewNop(), Protocol: "apm/http", Path: p, URL: serverCfg.ServerURL.String(), Token: serverCfg.SecretToken, Limiter: l, RewriteIDs: serverCfg.RewriteIDs, RewriteTimestamps: serverCfg.RewriteTimestamps, Headers: serverCfg.Headers, }, fs) if err != nil { tb.Fatal(err) } return h } func newFSEventHandler(p loadgen.EventHandlerParams, fs fs.FS) (*eventhandler.Handler, error) { t, err := transport.NewHTTPTransport(transport.HTTPTransportOptions{}) if err != nil { return nil, err } transp := eventhandler.NewAPMTransport(p.Logger, t.Client, p.URL, p.Token, p.APIKey, p.Headers) cfg := eventhandler.Config{ Path: filepath.Join("events", p.Path), Transport: transp, Storage: fs, Limiter: p.Limiter, Rand: p.Rand, RewriteIDs: p.RewriteIDs, RewriteServiceNames: p.RewriteServiceNames, RewriteServiceNodeNames: p.RewriteServiceNodeNames, RewriteServiceTargetNames: p.RewriteServiceTargetNames, RewriteSpanNames: p.RewriteSpanNames, RewriteTransactionNames: p.RewriteTransactionNames, RewriteTransactionTypes: p.RewriteTransactionTypes, RewriteTimestamps: p.RewriteTimestamps, } return eventhandler.NewAPM(p.Logger, cfg) }