func()

in internal/benchrunner/runners/pipeline/benchmark.go [79:191]


func (r *runner) benchmarkPipeline(ctx context.Context, b *benchmark, entryPipeline string) (*BenchmarkResult, error) {
	// Run benchmark
	bench, err := r.benchmarkIngest(ctx, b, entryPipeline)
	if err != nil {
		return nil, fmt.Errorf("failed running benchmark: %w", err)
	}

	// Extract performance measurements
	processorKey := func(pipeline ingest.Pipeline, processor ingest.Processor) string {
		// Don't want to use pipeline processors time in benchmark, as they
		// aggregate the time of all the processors in their pipeline.
		if processor.Type == "pipeline" {
			return ""
		}
		return fmt.Sprintf("%s @ %s:%d", processor.Type, pipeline.Filename(), processor.FirstLine)
	}
	byAbsoluteTime := func(record ingest.StatsRecord) int64 {
		return record.TimeInMillis * int64(time.Millisecond)
	}
	byRelativeTime := func(record ingest.StatsRecord) int64 {
		if record.Count == 0 {
			return 0
		}
		return record.TimeInMillis * int64(time.Millisecond) / record.Count
	}
	asPercentageOfTotalTime := func(perf processorPerformance) BenchmarkValue {
		return BenchmarkValue{
			Name:        perf.key,
			Description: perf.key,
			Unit:        "%",
			Value:       time.Duration(perf.value).Seconds() * 100 / bench.elapsed.Seconds(),
		}
	}
	asDuration := func(perf processorPerformance) BenchmarkValue {
		return BenchmarkValue{
			Name:        perf.key,
			Description: perf.key,
			Value:       time.Duration(perf.value),
		}
	}
	nonZero := func(p processorPerformance) bool {
		// This removes pipeline processors (marked with key="") and zero values.
		return p.key != "" && p.value != 0
	}

	topAbsProc, err := bench.
		aggregate(processorKey, byAbsoluteTime).
		filter(nonZero).
		sort(descending).
		top(r.options.NumTopProcs).
		collect(asPercentageOfTotalTime)
	if err != nil {
		return nil, err
	}

	topRelProcs, err := bench.
		aggregate(processorKey, byRelativeTime).
		filter(nonZero).
		sort(descending).
		top(r.options.NumTopProcs).
		collect(asDuration)
	if err != nil {
		return nil, err
	}

	// Build result
	result := &BenchmarkResult{
		Type:        string(BenchType),
		Package:     r.options.Folder.Package,
		DataStream:  r.options.Folder.DataStream,
		Description: fmt.Sprintf("pipeline benchmark for %s/%s", r.options.Folder.Package, r.options.Folder.DataStream),
		Parameters: []BenchmarkValue{
			{
				Name:  "source_doc_count",
				Value: len(b.events),
			},
			{
				Name:  "doc_count",
				Value: bench.numDocs,
			},
		},
		Tests: []BenchmarkTest{
			{
				Name: "pipeline_performance",
				Results: []BenchmarkValue{
					{
						Name:        "processing_time",
						Description: "time elapsed in pipeline processors",
						Value:       bench.elapsed.Seconds(),
						Unit:        "s",
					},
					{
						Name:        "eps",
						Description: "processed events per second",
						Value:       float64(bench.numDocs) / bench.elapsed.Seconds(),
					},
				},
			},
			{
				Name:        "procs_by_total_time",
				Description: fmt.Sprintf("top %d processors by time spent", r.options.NumTopProcs),
				Results:     topAbsProc,
			},
			{
				Name:        "procs_by_avg_time_per_doc",
				Description: fmt.Sprintf("top %d processors by average time per document", r.options.NumTopProcs),
				Results:     topRelProcs,
			},
		},
	}

	return result, nil
}