in internal/benchrunner/runners/pipeline/benchmark.go [79:191]
func (r *runner) benchmarkPipeline(ctx context.Context, b *benchmark, entryPipeline string) (*BenchmarkResult, error) {
// Run benchmark
bench, err := r.benchmarkIngest(ctx, b, entryPipeline)
if err != nil {
return nil, fmt.Errorf("failed running benchmark: %w", err)
}
// Extract performance measurements
processorKey := func(pipeline ingest.Pipeline, processor ingest.Processor) string {
// Don't want to use pipeline processors time in benchmark, as they
// aggregate the time of all the processors in their pipeline.
if processor.Type == "pipeline" {
return ""
}
return fmt.Sprintf("%s @ %s:%d", processor.Type, pipeline.Filename(), processor.FirstLine)
}
byAbsoluteTime := func(record ingest.StatsRecord) int64 {
return record.TimeInMillis * int64(time.Millisecond)
}
byRelativeTime := func(record ingest.StatsRecord) int64 {
if record.Count == 0 {
return 0
}
return record.TimeInMillis * int64(time.Millisecond) / record.Count
}
asPercentageOfTotalTime := func(perf processorPerformance) BenchmarkValue {
return BenchmarkValue{
Name: perf.key,
Description: perf.key,
Unit: "%",
Value: time.Duration(perf.value).Seconds() * 100 / bench.elapsed.Seconds(),
}
}
asDuration := func(perf processorPerformance) BenchmarkValue {
return BenchmarkValue{
Name: perf.key,
Description: perf.key,
Value: time.Duration(perf.value),
}
}
nonZero := func(p processorPerformance) bool {
// This removes pipeline processors (marked with key="") and zero values.
return p.key != "" && p.value != 0
}
topAbsProc, err := bench.
aggregate(processorKey, byAbsoluteTime).
filter(nonZero).
sort(descending).
top(r.options.NumTopProcs).
collect(asPercentageOfTotalTime)
if err != nil {
return nil, err
}
topRelProcs, err := bench.
aggregate(processorKey, byRelativeTime).
filter(nonZero).
sort(descending).
top(r.options.NumTopProcs).
collect(asDuration)
if err != nil {
return nil, err
}
// Build result
result := &BenchmarkResult{
Type: string(BenchType),
Package: r.options.Folder.Package,
DataStream: r.options.Folder.DataStream,
Description: fmt.Sprintf("pipeline benchmark for %s/%s", r.options.Folder.Package, r.options.Folder.DataStream),
Parameters: []BenchmarkValue{
{
Name: "source_doc_count",
Value: len(b.events),
},
{
Name: "doc_count",
Value: bench.numDocs,
},
},
Tests: []BenchmarkTest{
{
Name: "pipeline_performance",
Results: []BenchmarkValue{
{
Name: "processing_time",
Description: "time elapsed in pipeline processors",
Value: bench.elapsed.Seconds(),
Unit: "s",
},
{
Name: "eps",
Description: "processed events per second",
Value: float64(bench.numDocs) / bench.elapsed.Seconds(),
},
},
},
{
Name: "procs_by_total_time",
Description: fmt.Sprintf("top %d processors by time spent", r.options.NumTopProcs),
Results: topAbsProc,
},
{
Name: "procs_by_avg_time_per_doc",
Description: fmt.Sprintf("top %d processors by average time per document", r.options.NumTopProcs),
Results: topRelProcs,
},
},
}
return result, nil
}