internal/benchrunner/runners/pipeline/benchmark.go (280 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package pipeline
import (
"context"
"encoding/json"
"errors"
"fmt"
"sort"
"time"
"github.com/elastic/elastic-package/internal/elasticsearch/ingest"
)
type BenchmarkResult struct {
// XMLName is a zero-length field used as an annotation for XML marshaling.
XMLName struct{} `xml:"group" json:"-"`
// Type of benchmark
Type string `xml:"type" json:"type"`
// Package of the benchmark
Package string `xml:"package" json:"package"`
// DataStream of the benchmark
DataStream string `xml:"data_stream" json:"data_stream"`
// Description of the benchmark run.
Description string `xml:"description,omitempty" json:"description,omitempty"`
// Parameters used for this benchmark.
Parameters []BenchmarkValue `xml:"parameters,omitempty" json:"parameters,omitempty"`
// Tests holds the results for the benchmark.
Tests []BenchmarkTest `xml:"test" json:"test"`
}
// BenchmarkTest models a particular test performed during a benchmark.
type BenchmarkTest struct {
// Name of this test.
Name string `xml:"name" json:"name"`
// Detailed benchmark tests will be printed to the output but not
// included in file reports.
Detailed bool `xml:"-" json:"-"`
// Description of this test.
Description string `xml:"description,omitempty" json:"description,omitempty"`
// Parameters for this test.
Parameters []BenchmarkValue `xml:"parameters,omitempty" json:"parameters,omitempty"`
// Results of the test.
Results []BenchmarkValue `xml:"result" json:"result"`
}
// BenchmarkValue represents a value (result or parameter)
// with an optional associated unit.
type BenchmarkValue struct {
// Name of the value.
Name string `xml:"name" json:"name"`
// Description of the value.
Description string `xml:"description,omitempty" json:"description,omitempty"`
// Unit used for this value.
Unit string `xml:"unit,omitempty" json:"unit,omitempty"`
// Value is of any type, usually string or numeric.
Value interface{} `xml:"value,omitempty" json:"value,omitempty"`
}
// String returns a BenchmarkValue's value nicely-formatted.
func (p BenchmarkValue) String() (r string) {
if str, ok := p.Value.(fmt.Stringer); ok {
return str.String()
}
if float, ok := p.Value.(float64); ok {
r = fmt.Sprintf("%.02f", float)
} else {
r = fmt.Sprintf("%v", p.Value)
}
if p.Unit != "" {
r += p.Unit
}
return r
}
func (r *runner) benchmarkPipeline(ctx context.Context, b *benchmark, entryPipeline string) (*BenchmarkResult, error) {
// Run benchmark
bench, err := r.benchmarkIngest(ctx, b, entryPipeline)
if err != nil {
return nil, fmt.Errorf("failed running benchmark: %w", err)
}
// Extract performance measurements
processorKey := func(pipeline ingest.Pipeline, processor ingest.Processor) string {
// Don't want to use pipeline processors time in benchmark, as they
// aggregate the time of all the processors in their pipeline.
if processor.Type == "pipeline" {
return ""
}
return fmt.Sprintf("%s @ %s:%d", processor.Type, pipeline.Filename(), processor.FirstLine)
}
byAbsoluteTime := func(record ingest.StatsRecord) int64 {
return record.TimeInMillis * int64(time.Millisecond)
}
byRelativeTime := func(record ingest.StatsRecord) int64 {
if record.Count == 0 {
return 0
}
return record.TimeInMillis * int64(time.Millisecond) / record.Count
}
asPercentageOfTotalTime := func(perf processorPerformance) BenchmarkValue {
return BenchmarkValue{
Name: perf.key,
Description: perf.key,
Unit: "%",
Value: time.Duration(perf.value).Seconds() * 100 / bench.elapsed.Seconds(),
}
}
asDuration := func(perf processorPerformance) BenchmarkValue {
return BenchmarkValue{
Name: perf.key,
Description: perf.key,
Value: time.Duration(perf.value),
}
}
nonZero := func(p processorPerformance) bool {
// This removes pipeline processors (marked with key="") and zero values.
return p.key != "" && p.value != 0
}
topAbsProc, err := bench.
aggregate(processorKey, byAbsoluteTime).
filter(nonZero).
sort(descending).
top(r.options.NumTopProcs).
collect(asPercentageOfTotalTime)
if err != nil {
return nil, err
}
topRelProcs, err := bench.
aggregate(processorKey, byRelativeTime).
filter(nonZero).
sort(descending).
top(r.options.NumTopProcs).
collect(asDuration)
if err != nil {
return nil, err
}
// Build result
result := &BenchmarkResult{
Type: string(BenchType),
Package: r.options.Folder.Package,
DataStream: r.options.Folder.DataStream,
Description: fmt.Sprintf("pipeline benchmark for %s/%s", r.options.Folder.Package, r.options.Folder.DataStream),
Parameters: []BenchmarkValue{
{
Name: "source_doc_count",
Value: len(b.events),
},
{
Name: "doc_count",
Value: bench.numDocs,
},
},
Tests: []BenchmarkTest{
{
Name: "pipeline_performance",
Results: []BenchmarkValue{
{
Name: "processing_time",
Description: "time elapsed in pipeline processors",
Value: bench.elapsed.Seconds(),
Unit: "s",
},
{
Name: "eps",
Description: "processed events per second",
Value: float64(bench.numDocs) / bench.elapsed.Seconds(),
},
},
},
{
Name: "procs_by_total_time",
Description: fmt.Sprintf("top %d processors by time spent", r.options.NumTopProcs),
Results: topAbsProc,
},
{
Name: "procs_by_avg_time_per_doc",
Description: fmt.Sprintf("top %d processors by average time per document", r.options.NumTopProcs),
Results: topRelProcs,
},
},
}
return result, nil
}
type ingestResult struct {
pipelines []ingest.Pipeline
stats ingest.PipelineStatsMap
elapsed time.Duration
numDocs int
}
func (r *runner) benchmarkIngest(ctx context.Context, b *benchmark, entryPipeline string) (ingestResult, error) {
baseDocs := resizeDocs(b.events, b.config.NumDocs)
return r.runSingleBenchmark(ctx, entryPipeline, baseDocs)
}
type processorPerformance struct {
key string
value int64
}
type aggregation struct {
result []processorPerformance
err error
}
type (
keyFn func(ingest.Pipeline, ingest.Processor) string
valueFn func(record ingest.StatsRecord) int64
mapFn func(processorPerformance) BenchmarkValue
compareFn func(a, b processorPerformance) bool
filterFn func(processorPerformance) bool
)
func (ir ingestResult) aggregate(key keyFn, value valueFn) (agg aggregation) {
pipelines := make(map[string]ingest.Pipeline, len(ir.pipelines))
for _, p := range ir.pipelines {
pipelines[p.Name] = p
}
for pipelineName, pipelineStats := range ir.stats {
pipeline, ok := pipelines[pipelineName]
if !ok {
return aggregation{err: fmt.Errorf("unexpected pipeline '%s'", pipelineName)}
}
processors, err := pipeline.Processors()
if err != nil {
return aggregation{err: err}
}
if nSrc, nStats := len(processors), len(pipelineStats.Processors); nSrc != nStats {
return aggregation{err: fmt.Errorf("pipeline '%s' processor count mismatch. source=%d stats=%d", pipelineName, nSrc, nStats)}
}
for procId, procStats := range pipelineStats.Processors {
agg.result = append(agg.result, processorPerformance{
key: key(pipeline, processors[procId]),
value: value(procStats.Stats),
})
}
}
return agg
}
func (agg aggregation) sort(compare compareFn) aggregation {
if agg.err != nil {
return agg
}
sort.Slice(agg.result, func(i, j int) bool {
return compare(agg.result[i], agg.result[j])
})
return agg
}
func ascending(a, b processorPerformance) bool {
return a.value < b.value
}
func descending(a, b processorPerformance) bool {
return !ascending(a, b)
}
func (agg aggregation) top(n int) aggregation {
if n < len(agg.result) {
agg.result = agg.result[:n]
}
return agg
}
func (agg aggregation) filter(keep filterFn) aggregation {
if agg.err != nil {
return agg
}
o := 0
for _, entry := range agg.result {
if keep(entry) {
agg.result[o] = entry
o++
}
}
agg.result = agg.result[:o]
return agg
}
func (agg aggregation) collect(fn mapFn) ([]BenchmarkValue, error) {
if agg.err != nil {
return nil, agg.err
}
r := make([]BenchmarkValue, len(agg.result))
for idx := range r {
r[idx] = fn(agg.result[idx])
}
return r, nil
}
func (r *runner) runSingleBenchmark(ctx context.Context, entryPipeline string, docs []json.RawMessage) (ingestResult, error) {
if len(docs) == 0 {
return ingestResult{}, errors.New("no docs supplied for benchmark")
}
if _, err := ingest.SimulatePipeline(ctx, r.options.API, entryPipeline, docs, "test-generic-default"); err != nil {
return ingestResult{}, fmt.Errorf("simulate failed: %w", err)
}
stats, err := ingest.GetPipelineStats(r.options.API, r.pipelines)
if err != nil {
return ingestResult{}, fmt.Errorf("error fetching pipeline stats: %w", err)
}
var took time.Duration
for _, pSt := range stats {
took += time.Millisecond * time.Duration(pSt.TimeInMillis)
}
return ingestResult{
pipelines: r.pipelines,
stats: stats,
elapsed: took,
numDocs: len(docs),
}, nil
}
func resizeDocs(inputDocs []json.RawMessage, want int) []json.RawMessage {
n := len(inputDocs)
if n == 0 {
return nil
}
if want == 0 {
want = 1
}
result := make([]json.RawMessage, want)
for i := 0; i < want; i++ {
result[i] = inputDocs[i%n]
}
return result
}