systemtest/benchtest/profiles.go (179 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package benchtest import ( "compress/gzip" "context" "fmt" "io" "math/rand/v2" "net/http" "os" "strconv" "time" "github.com/google/pprof/profile" loadgencfg "github.com/elastic/apm-perf/loadgen/config" ) func fetchProfile(urlPath string, duration time.Duration) (*profile.Profile, error) { serverURL := loadgencfg.Config.ServerURL.String() req, err := http.NewRequest("GET", serverURL+urlPath, nil) if err != nil { return nil, err } if duration > 0 { query := req.URL.Query() query.Set("seconds", strconv.Itoa(int(duration.Seconds()))) req.URL.RawQuery = query.Encode() timeout := time.Duration(float64(duration) * 1.5) ctx := req.Context() ctx, cancel := context.WithTimeout(req.Context(), timeout) defer cancel() req = req.WithContext(ctx) } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("failed to fetch profile (%s): %s", resp.Status, body) } return profile.Parse(resp.Body) } type profiles struct { benchmarkNames []string cpu []*profile.Profile heap []*profile.Profile mutex []*profile.Profile block []*profile.Profile } func (p *profiles) init() error { return p.recordCumulatives() } func (p *profiles) record(benchmarkName string) <-chan error { record := func() error { p.benchmarkNames = append(p.benchmarkNames, benchmarkName) if err := p.recordCPU(); err != nil { return err } return p.recordCumulatives() } ch := make(chan error, 1) go func() { ch <- record() }() return ch } func (p *profiles) recordCPU() error { if benchConfig.CPUProfile == "" { return nil } // Limit profiling time to random 5% of overall time. // This should not seriously affect the profile quality, // since we merge the final profile form multiple sources, // but prevent profile size from swelling. var done bool const tickets = 20 duration := benchConfig.Benchtime / tickets for i := range tickets { if done || (rand.N(tickets-i)+i+1) < tickets { time.Sleep(duration) continue } profile, err := fetchProfile("/debug/pprof/profile", duration) if err != nil { return fmt.Errorf("failed to fetch CPU profile: %w", err) } // We don't need the address in the profile, so discard it to reduce the size. if err := profile.Aggregate(true, true, true, true, false); err != nil { return fmt.Errorf("failed to fetch CPU profile: %w", err) } profile = profile.Compact() p.cpu = append(p.cpu, profile) done = true } return nil } func (p *profiles) recordCumulatives() error { if err := p.recordCumulative(benchConfig.MemProfile, "/debug/pprof/heap", &p.heap); err != nil { return err } if err := p.recordCumulative(benchConfig.MutexProfile, "/debug/pprof/mutex", &p.mutex); err != nil { return err } if err := p.recordCumulative(benchConfig.BlockProfile, "/debug/pprof/block", &p.block); err != nil { return err } return nil } func (p *profiles) recordCumulative(flag string, urlPath string, out *[]*profile.Profile) error { if flag == "" { return nil } profile, err := fetchProfile(urlPath, 0) if err != nil { return err } *out = append(*out, profile) return nil } func (p *profiles) writeProfiles() error { if err := p.writeCumulative(benchConfig.MemProfile, p.heap); err != nil { return err } if err := p.writeCumulative(benchConfig.MutexProfile, p.mutex); err != nil { return err } if err := p.writeCumulative(benchConfig.BlockProfile, p.block); err != nil { return err } if err := p.writeDeltas(benchConfig.CPUProfile, p.cpu); err != nil { return err } return nil } func (p *profiles) writeCumulative(filename string, cumulative []*profile.Profile) error { if len(cumulative) == 0 { return nil } p0 := cumulative[0] deltas := make([]*profile.Profile, len(cumulative)-1) for i, p1 := range cumulative[1:] { delta, err := computeDeltaProfile(p0, p1) if err != nil { return err } deltas[i] = delta p0 = p1 } return p.writeDeltas(filename, deltas) } func (p *profiles) writeDeltas(filename string, deltas []*profile.Profile) error { if len(deltas) == 0 { return nil } merged, err := p.mergeBenchmarkProfiles(deltas) if err != nil { return err } f, err := os.Create(filename) if err != nil { return err } defer f.Close() w, err := gzip.NewWriterLevel(f, gzip.BestCompression) if err != nil { return err } defer w.Close() return merged.WriteUncompressed(w) } func (p *profiles) mergeBenchmarkProfiles(profiles []*profile.Profile) (*profile.Profile, error) { merged, err := profile.Merge(profiles) if err != nil { return nil, fmt.Errorf("error merging profiles: %w", err) } return merged, nil } func computeDeltaProfile(p0, p1 *profile.Profile) (*profile.Profile, error) { p0.Scale(-1) defer p0.Scale(-1) // return to initial state merged, err := profile.Merge([]*profile.Profile{p0, p1}) if err != nil { return nil, fmt.Errorf("error computing delta profile: %w", err) } merged.TimeNanos = p1.TimeNanos merged.DurationNanos = p1.TimeNanos - p0.TimeNanos return merged, nil }